Compare commits

...

2 Commits

Author SHA1 Message Date
Erik 2c801453ab Fix flaky test 2026-06-11 08:14:12 +02:00
Erik 1ea8c5d037 Prime condition durations from history 2026-06-10 10:06:33 +02:00
8 changed files with 963 additions and 39 deletions
+1 -1
View File
@@ -460,7 +460,7 @@ class EventTrigger(Trigger):
listener = TargetCalendarEventListener(
self._hass, target_selection, self._event_type, offset, run_action
)
return listener.async_setup()
return await listener.async_setup()
class EventStartedTrigger(EventTrigger):
+1 -1
View File
@@ -137,7 +137,7 @@ class TimeRemainingTrigger(Trigger):
state = self._hass.states.get(entity_id)
schedule_for_state(entity_id, state, state.context if state else None)
unsub = async_track_target_selector_state_change_event(
unsub = await async_track_target_selector_state_change_event(
self._hass,
self._target,
state_change_listener,
+1 -1
View File
@@ -153,7 +153,7 @@ class ItemTriggerBase(Trigger, abc.ABC):
functools.partial(self._handle_item_change, run_action=run_action),
self._handle_entities_updated,
)
return listener.async_setup()
return await listener.async_setup()
@callback
@abc.abstractmethod
+159 -11
View File
@@ -1,6 +1,7 @@
"""Offer reusable conditions."""
import abc
import asyncio
from collections import deque
from collections.abc import Callable, Container, Coroutine, Generator, Iterable, Mapping
from contextlib import contextmanager
@@ -56,7 +57,7 @@ from homeassistant.const import (
STATE_UNKNOWN,
WEEKDAYS,
)
from homeassistant.core import HomeAssistant, State, callback
from homeassistant.core import HomeAssistant, State, callback, split_entity_id
from homeassistant.exceptions import (
ConditionError,
ConditionErrorContainer,
@@ -87,6 +88,7 @@ from .automation import (
move_options_fields_to_top_level,
)
from .integration_platform import async_process_integration_platforms
from .recorder import get_instance
from .selector import (
NumericThresholdMode,
NumericThresholdSelector,
@@ -119,6 +121,16 @@ VALIDATE_CONFIG_FORMAT = "{}_validate_config"
_LOGGER = logging.getLogger(__name__)
# Upper bound on the best-effort recorder query used to prime `for:` durations
# at setup. If history can't be read within this window we fall back to the
# conservative live-state anchor rather than blocking condition setup.
HISTORY_PRIME_TIMEOUT = 10
# How far back the `for:` priming query reaches. Caps the cost of the query for
# very long `for:` durations; beyond this we rely on the live-state anchor, so
# such conditions may only become true once enough time has elapsed since setup.
MAX_HISTORY_PRIME_LOOKBACK = timedelta(hours=6)
_PLATFORM_ALIASES: dict[str | None, str | None] = {
"and": None,
"device": "device_automation",
@@ -493,6 +505,11 @@ class EntityConditionBase(Condition):
self._matcher = self._check_all_match_state
self._on_unload: list[Callable[[], None]] = []
self._valid_since: dict[str, datetime] = {}
# Entities whose `for:` anchor is currently being resolved from recorder
# history. While an entity is here the live listener leaves its anchor to
# the prime, except that an invalidation removes it (the run broke, so the
# in-flight history is stale and live tracking takes over).
self._priming: set[str] = set()
def entity_filter(self, entities: set[str]) -> set[str]:
"""Filter entities matching any of the domain specs."""
@@ -533,11 +550,19 @@ class EntityConditionBase(Condition):
and self._should_include(_state)
and self.is_valid_state(_state)
):
# While an entity is being primed from history, leave its anchor to
# the prime: the entity stayed valid, so the run is unbroken and the
# history start (which can be earlier than this update) is accurate.
if entity_id in self._priming:
return
# Only record the time if not already tracked, to avoid
# resetting the duration on unrelated state/attribute updates.
if entity_id not in self._valid_since:
self._valid_since[entity_id] = self._state_valid_since(_state)
else:
# An invalidation breaks the run, so any history being loaded for the
# entity is now stale; stop priming it and let live tracking own it.
self._priming.discard(entity_id)
self._valid_since.pop(entity_id, None)
@override
@@ -557,24 +582,147 @@ class EntityConditionBase(Condition):
self._update_valid_since(entity_id, to_state)
@callback
def _on_entities_update(added: set[str], removed: set[str]) -> None:
"""Handle changes to the tracked entity set."""
for entity_id in added:
self._update_valid_since(entity_id, self._hass.states.get(entity_id))
for entity_id in removed:
self._valid_since.pop(entity_id, None)
unsub = async_track_target_selector_state_change_event(
unsub = await async_track_target_selector_state_change_event(
self._hass,
self._target,
_state_change_listener,
self.entity_filter,
_on_entities_update,
self._async_on_entities_update,
primary_entities_only=self._primary_entities_only,
)
self._on_unload.append(unsub)
async def _async_on_entities_update(
self, added: set[str], removed: set[str]
) -> None:
"""Handle changes to the tracked entity set.
Removed entities stop being tracked immediately. Added entities are only
considered by the condition once their `for:` anchor has been resolved
(see `_async_prime_valid_since`); until then they are absent from
`_valid_since`. The target tracker awaits this for the initial entity set
at setup and runs it as a background task for later registry-driven
changes.
"""
for entity_id in removed:
self._priming.discard(entity_id)
self._valid_since.pop(entity_id, None)
await self._async_prime_valid_since(added)
async def _async_prime_valid_since(self, entity_ids: set[str]) -> None:
"""Resolve and store the `for:` anchor for newly tracked entities.
For each currently-valid entity the anchor is the start of its current
continuous run of validity, read from recorder history (bounded by
`MAX_HISTORY_PRIME_LOOKBACK`) and combined with the current state's
anchor, or the current state's anchor alone when the recorder is
unavailable or the read fails. An entity is added to `_valid_since` only
once this resolves, so a newly tracked entity does not participate in the
condition until its anchor is known — rather than briefly using a
conservative anchor that then changes.
While loading, an entity is held in `_priming`. A live change that keeps
it valid is ignored (the run is unbroken, history is accurate), but an
invalidation removes it from `_priming` so that we do not apply now-stale
history over the live tracking that observed the break.
"""
# Conservative anchor from the live state for each currently-valid entity.
anchors = {
entity_id: self._state_valid_since(_state)
for entity_id in entity_ids
if (_state := self._hass.states.get(entity_id)) is not None
and self._should_include(_state)
and self.is_valid_state(_state)
}
if not anchors:
return
self._priming.update(anchors)
try:
if "recorder" in self._hass.config.components:
await self._async_refine_anchors_from_history(anchors)
for entity_id, anchor in anchors.items():
# Skip entities a live change invalidated mid-load: they were
# removed from `_priming`, the run broke, and live tracking (which
# saw the break) owns them — applying this history would be stale.
if entity_id in self._priming:
self._valid_since[entity_id] = anchor
finally:
self._priming.difference_update(anchors)
async def _async_refine_anchors_from_history(
self, anchors: dict[str, datetime]
) -> None:
"""Move each anchor in `anchors` back to the true start of its run.
Mutates `anchors` in place.
"""
from sqlalchemy.exc import SQLAlchemyError # noqa: PLC0415
from homeassistant.components.recorder import history # noqa: PLC0415
if TYPE_CHECKING:
assert self._duration is not None
lookback = min(self._duration, MAX_HISTORY_PRIME_LOOKBACK)
start_time = dt_util.utcnow() - lookback
instance = get_instance(self._hass)
try:
async with asyncio.timeout(HISTORY_PRIME_TIMEOUT):
# The history query only sees committed rows. Wait for the
# recorder to flush its queue first.
if (commit_future := instance.async_get_commit_future()) is not None:
await commit_future
historical_states = await instance.async_add_executor_job(
ft.partial(
history.get_significant_states,
self._hass,
start_time,
entity_ids=list(anchors),
include_start_time_state=True,
# Mandatory: the default (True) drops attribute-only
# changes for entities outside SIGNIFICANT_DOMAINS, which
# are exactly the transitions attribute-based conditions
# depend on.
significant_changes_only=False,
minimal_response=False,
)
)
except (SQLAlchemyError, TimeoutError) as err:
# Best effort: keep the conservative anchors rather than failing.
_LOGGER.debug("Error priming condition durations from history: %s", err)
return
for entity_id, rows in historical_states.items():
valid_since = self._valid_since_from_history(
entity_id, cast(list[State], rows)
)
if valid_since is not None:
anchors[entity_id] = min(valid_since, anchors[entity_id])
def _valid_since_from_history(
self, entity_id: str, rows: Iterable[State]
) -> datetime | None:
"""Return when the current continuous run of valid states began.
Walks historical states oldest-first and returns the anchor time of the
first state in the most recent unbroken run of valid states, or None if
the most recently recorded state is not valid (e.g. the recorder lags
behind the live state machine).
"""
# Recorder rows are LazyState objects, which skip State.__init__ and so
# never populate the domain/object_id that the validity checks rely on.
domain, object_id = split_entity_id(entity_id)
valid_since: datetime | None = None
for _state in rows:
_state.domain = domain
_state.object_id = object_id
if self._should_include(_state) and self.is_valid_state(_state):
if valid_since is None:
valid_since = self._state_valid_since(_state)
else:
valid_since = None
return valid_since
@override
def _async_unload(self) -> None:
"""Unsubscribe from listeners."""
+74 -15
View File
@@ -1,7 +1,8 @@
"""Helpers for dealing with entity targets."""
import abc
from collections.abc import Callable
import asyncio
from collections.abc import Callable, Coroutine
import dataclasses
import logging
from logging import Logger
@@ -292,7 +293,7 @@ class TargetEntityChangeTracker(abc.ABC):
self._registry_unsubs: list[CALLBACK_TYPE] = []
def async_setup(self) -> Callable[[], None]:
async def async_setup(self) -> Callable[[], None]:
"""Set up the state change tracking."""
self._setup_registry_listeners()
self._handle_target_update()
@@ -304,18 +305,20 @@ class TargetEntityChangeTracker(abc.ABC):
"""Called when there's an update to tracked target entities."""
@callback
def _handle_target_update(self, event: Event[Any] | None = None) -> None:
"""Handle updates in the tracked targets."""
def _referenced_entities(self) -> set[str]:
"""Return the currently tracked, filtered entity ids."""
selected = async_extract_referenced_entity_ids(
self._hass,
self._target_selection,
expand_group=False,
primary_entities_only=self._primary_entities_only,
)
filtered_entities = self._entity_filter(
selected.referenced | selected.indirectly_referenced
)
self._handle_entities_update(filtered_entities)
return self._entity_filter(selected.referenced | selected.indirectly_referenced)
@callback
def _handle_target_update(self, event: Event[Any] | None = None) -> None:
"""Handle updates in the tracked targets."""
self._handle_entities_update(self._referenced_entities())
def _setup_registry_listeners(self) -> None:
"""Set up listeners for registry changes that require resubscription."""
@@ -356,11 +359,20 @@ class TargetStateChangeTracker(TargetEntityChangeTracker):
target_selection: TargetSelection,
action: Callable[[TargetStateChangedData], Any],
entity_filter: Callable[[set[str]], set[str]],
on_entities_update: Callable[[set[str], set[str]], None] | None = None,
on_entities_update: Callable[
[set[str], set[str]], Coroutine[Any, Any, None] | None
]
| None = None,
*,
primary_entities_only: bool = True,
) -> None:
"""Initialize the state change tracker."""
"""Initialize the state change tracker.
`on_entities_update` may be a plain callback or a coroutine function.
A coroutine is awaited for the initial entity set (so setup is
deterministic) and scheduled as a background task for later
registry-driven changes.
"""
super().__init__(
hass,
target_selection,
@@ -371,17 +383,55 @@ class TargetStateChangeTracker(TargetEntityChangeTracker):
self._on_entities_update = on_entities_update
self._state_change_unsub: CALLBACK_TYPE | None = None
self._tracked_entities: set[str] = set()
self._update_tasks: set[asyncio.Task[None]] = set()
async def async_setup(self) -> Callable[[], None]:
"""Set up tracking, awaiting the update for the initial entity set.
The initial update is awaited so that a coroutine `on_entities_update`
(e.g. one that loads history) completes before setup returns. Later
registry-driven updates instead arrive via the callback
`_handle_entities_update` and are scheduled as background tasks.
"""
self._setup_registry_listeners()
entities = self._referenced_entities()
if (coro := self._apply_entities_update(entities)) is not None:
await coro
return self._unsubscribe
@callback
def _handle_entities_update(self, tracked_entities: set[str]) -> None:
"""Handle the tracked entities."""
"""Handle a registry-driven change to the tracked entity set."""
if (coro := self._apply_entities_update(tracked_entities)) is None:
return
# Track the task so it can be cancelled on unsubscribe and so its
# exception is retrieved (and logged) rather than surfacing only via
# asyncio's "Task exception was never retrieved" at garbage collection.
task = self._hass.async_create_task(coro, "Target entity tracker update")
self._update_tasks.add(task)
task.add_done_callback(self._on_update_task_done)
def _on_update_task_done(self, task: asyncio.Task[None]) -> None:
"""Drop a finished update task and surface any unexpected error."""
self._update_tasks.discard(task)
if not task.cancelled() and (exc := task.exception()) is not None:
_LOGGER.error(
"Error handling tracked entities update: %s", exc, exc_info=exc
)
def _apply_entities_update(
self, tracked_entities: set[str]
) -> Coroutine[Any, Any, None] | None:
"""Resubscribe to state changes; return the update coroutine, if any."""
previous_entities = self._tracked_entities
self._tracked_entities = tracked_entities
result: Coroutine[Any, Any, None] | None = None
if self._on_entities_update is not None:
added = tracked_entities - previous_entities
removed = previous_entities - tracked_entities
if added or removed:
self._on_entities_update(added, removed)
result = self._on_entities_update(added, removed)
@callback
def state_change_listener(event: Event[EventStateChangedData]) -> None:
@@ -395,6 +445,7 @@ class TargetStateChangeTracker(TargetEntityChangeTracker):
self._state_change_unsub = async_track_state_change_event(
self._hass, tracked_entities, state_change_listener
)
return result
def _unsubscribe(self) -> None:
"""Unsubscribe from all events."""
@@ -402,14 +453,18 @@ class TargetStateChangeTracker(TargetEntityChangeTracker):
if self._state_change_unsub:
self._state_change_unsub()
self._state_change_unsub = None
for task in self._update_tasks:
task.cancel()
self._update_tasks.clear()
def async_track_target_selector_state_change_event(
async def async_track_target_selector_state_change_event(
hass: HomeAssistant,
target_selector_config: ConfigType,
action: Callable[[TargetStateChangedData], Any],
entity_filter: Callable[[set[str]], set[str]] = lambda x: x,
on_entities_update: Callable[[set[str], set[str]], None] | None = None,
on_entities_update: Callable[[set[str], set[str]], Coroutine[Any, Any, None] | None]
| None = None,
*,
primary_entities_only: bool = True,
) -> CALLBACK_TYPE:
@@ -419,6 +474,10 @@ def async_track_target_selector_state_change_event(
When `primary_entities_only` is True, indirect target
expansion (via device, area, and floor) skips entities
with an `entity_category` (config or diagnostic entities).
`on_entities_update` may be a coroutine function; it is awaited for the
initial entity set and scheduled as a task for later registry-driven
changes, so this function must itself be awaited.
"""
target_selection = TargetSelection(target_selector_config)
if not target_selection.has_any_target:
@@ -435,4 +494,4 @@ def async_track_target_selector_state_change_event(
on_entities_update,
primary_entities_only=primary_entities_only,
)
return tracker.async_setup()
return await tracker.async_setup()
+1 -1
View File
@@ -579,7 +579,7 @@ class EntityTriggerBase(Trigger):
),
)
unsub = async_track_target_selector_state_change_event(
unsub = await async_track_target_selector_state_change_event(
self._hass,
self._target,
state_change_listener,
+617 -4
View File
@@ -1,5 +1,6 @@
"""Test the condition helper."""
import asyncio
from collections.abc import Mapping
from contextlib import AbstractContextManager, nullcontext as does_not_raise
from dataclasses import dataclass, field
@@ -13,12 +14,14 @@ from freezegun import freeze_time
from freezegun.api import FrozenDateTimeFactory
import pytest
from pytest_unordered import unordered
from sqlalchemy.exc import SQLAlchemyError
import voluptuous as vol
from homeassistant.components.device_automation import (
DOMAIN as DEVICE_AUTOMATION_DOMAIN,
)
from homeassistant.components.light import DOMAIN as LIGHT_DOMAIN
from homeassistant.components.recorder import Recorder, get_instance, history
from homeassistant.components.sensor import SensorDeviceClass
from homeassistant.components.sun import DOMAIN as SUN_DOMAIN
from homeassistant.components.system_health import DOMAIN as SYSTEM_HEALTH_DOMAIN
@@ -60,6 +63,7 @@ from homeassistant.helpers.condition import (
BEHAVIOR_ALL,
BEHAVIOR_ANY,
CONDITIONS,
MAX_HISTORY_PRIME_LOOKBACK,
Condition,
ConditionChecker,
EntityConditionBase,
@@ -79,6 +83,7 @@ from homeassistant.util.unit_conversion import TemperatureConverter
from homeassistant.util.yaml.loader import parse_yaml
from tests.common import MockModule, MockPlatform, mock_integration, mock_platform
from tests.components.recorder.common import async_wait_recording_done
from tests.typing import WebSocketGenerator
@@ -5074,12 +5079,15 @@ async def test_state_condition_attr_duration_initial_state(
duration: int,
initially_met: bool,
) -> None:
"""Test attribute-based condition initialization from existing state.
"""Test attribute-based condition initialization without a recorder.
The condition uses last_updated (not last_changed) to determine how long
an attribute-based condition has been true. This is conservative: when
With no recorder available the condition falls back to anchoring `for:`
durations to the current state's last_updated. This is conservative: when
the main state changes but the tracked attribute stays the same,
last_updated is bumped and the effective duration resets.
last_updated is bumped and the effective duration resets (see the
`state_change_bumps_last_updated_not_met` case). The recorder-backed
variant in test_state_condition_attr_duration_initial_state_from_history
refines this from real history.
"""
for step in steps:
freezer.tick(timedelta(seconds=step.delay_before))
@@ -5313,6 +5321,611 @@ async def test_state_condition_attr_duration_unrelated_attr_update(
assert test.async_check() is True
async def _record_attr_steps(
hass: HomeAssistant,
freezer: FrozenDateTimeFactory,
start: datetime,
entity_id: str,
steps: list[_AttrInitStep],
) -> int:
"""Record a series of state writes into the recorder at controlled times.
Returns the number of seconds elapsed from `start` to the final write.
"""
elapsed = 0
for step in steps:
elapsed += step.delay_before
freezer.move_to(start + timedelta(seconds=elapsed))
hass.states.async_set(entity_id, step.state, step.attrs)
await hass.async_block_till_done()
await async_wait_recording_done(hass)
return elapsed
@pytest.mark.parametrize(
("steps", "wait_before_setup", "initially_met"),
[
# Valid the entire time → met (10s >= 5s).
([_AttrInitStep(STATE_ON, {"test_attr": True})], 10, True),
# Valid for less than the `for:` window → not met (3s < 5s).
([_AttrInitStep(STATE_ON, {"test_attr": True})], 3, False),
# The tracked attribute stayed valid across an unrelated main-state
# change. The OFF write bumps last_updated, but history shows the
# attribute never left the valid range → met. This is exactly the case
# the conservative last_updated anchor reports wrong (it returns False;
# see test_state_condition_attr_duration_initial_state).
(
[
_AttrInitStep(STATE_ON, {"test_attr": True}),
_AttrInitStep(STATE_OFF, {"test_attr": True}, delay_before=8),
],
2,
True,
),
# Invalid, then valid 6s before setup → met (6s >= 5s).
(
[
_AttrInitStep(STATE_ON, {"test_attr": False}),
_AttrInitStep(STATE_ON, {"test_attr": True}, delay_before=4),
],
6,
True,
),
# Invalid, then valid only 4s before setup → not met (4s < 5s).
(
[
_AttrInitStep(STATE_ON, {"test_attr": False}),
_AttrInitStep(STATE_ON, {"test_attr": True}, delay_before=6),
],
4,
False,
),
],
ids=[
"valid_long_enough",
"valid_too_short",
"valid_across_state_change",
"invalid_then_valid_met",
"invalid_then_valid_not_met",
],
)
async def test_state_condition_attr_duration_initial_state_from_history(
recorder_mock: Recorder,
hass: HomeAssistant,
steps: list[_AttrInitStep],
wait_before_setup: int,
initially_met: bool,
) -> None:
"""Test attribute-based `for:` priming from recorder history.
With the recorder available, the condition walks recent history to find
when the tracked value actually entered its current continuous valid run,
rather than conservatively anchoring to the current state's last_updated.
The `valid_across_state_change` case is the key improvement: an unrelated
main-state change no longer resets the duration.
"""
entity_id = "test.entity_1"
start = dt_util.utcnow()
with freeze_time(start) as freezer:
elapsed = await _record_attr_steps(hass, freezer, start, entity_id, steps)
freezer.move_to(start + timedelta(seconds=elapsed + wait_before_setup))
test = await _setup_attr_state_condition(
hass,
entity_ids=entity_id,
states={True},
condition_options={CONF_FOR: {"seconds": 5}},
)
assert test.async_check() is initially_met
async def test_state_condition_attr_duration_history_includes_attr_only_changes(
recorder_mock: Recorder, hass: HomeAssistant
) -> None:
"""Attribute-only invalidations inside the window must reset the timer.
The tracked value dips invalid and recovers through attribute-only changes
(the main state stays ON throughout). Those rows are only returned when the
history query passes significant_changes_only=False; were they dropped, the
window would look continuously valid and the condition would wrongly report
the `for:` duration as met.
"""
entity_id = "test.entity_1"
start = dt_util.utcnow()
steps = [
_AttrInitStep(STATE_ON, {"test_attr": True}),
_AttrInitStep(STATE_ON, {"test_attr": False}, delay_before=6),
_AttrInitStep(STATE_ON, {"test_attr": True}, delay_before=2),
]
with freeze_time(start) as freezer:
elapsed = await _record_attr_steps(hass, freezer, start, entity_id, steps)
freezer.move_to(start + timedelta(seconds=elapsed + 2))
test = await _setup_attr_state_condition(
hass,
entity_ids=entity_id,
states={True},
condition_options={CONF_FOR: {"seconds": 5}},
)
# Valid only for the last 2s (since the recovery at t=8); the dip to
# invalid at t=6 falls inside the 5s window → not met.
assert test.async_check() is False
@pytest.mark.parametrize(
("behavior", "expected"),
[(BEHAVIOR_ANY, True), (BEHAVIOR_ALL, False)],
)
async def test_state_condition_attr_duration_from_history_multiple_entities(
recorder_mock: Recorder,
hass: HomeAssistant,
behavior: str,
expected: bool,
) -> None:
"""History priming covers every targeted entity in a single query.
entity_1 has been valid long enough; entity_2 only recently became valid,
so `any` passes while `all` does not.
"""
start = dt_util.utcnow()
with freeze_time(start) as freezer:
hass.states.async_set("test.entity_1", STATE_ON, {"test_attr": True})
hass.states.async_set("test.entity_2", STATE_ON, {"test_attr": False})
await hass.async_block_till_done()
freezer.move_to(start + timedelta(seconds=7))
hass.states.async_set("test.entity_2", STATE_ON, {"test_attr": True})
await hass.async_block_till_done()
await async_wait_recording_done(hass)
freezer.move_to(start + timedelta(seconds=10))
test = await _setup_attr_state_condition(
hass,
entity_ids=["test.entity_1", "test.entity_2"],
states={True},
condition_options={ATTR_BEHAVIOR: behavior, CONF_FOR: {"seconds": 5}},
)
# entity_1 valid for 10s (met); entity_2 valid for only 3s (not met).
assert test.async_check() is expected
@pytest.mark.parametrize(
"history_error",
[SQLAlchemyError("boom"), TimeoutError()],
ids=["db_error", "timeout"],
)
async def test_state_condition_attr_duration_history_error_falls_back(
recorder_mock: Recorder,
hass: HomeAssistant,
history_error: Exception,
) -> None:
"""A failing/slow history query must not break setup; it falls back.
The tracked attribute stayed valid across an unrelated main-state change,
so a successful history read would report the duration as met. When the
query errors or times out, the condition keeps the conservative
last_updated anchor (set when the tracker was wired up) instead, which here
reports not met — and crucially, setup does not raise.
"""
entity_id = "test.entity_1"
start = dt_util.utcnow()
with freeze_time(start) as freezer:
hass.states.async_set(entity_id, STATE_ON, {"test_attr": True})
await hass.async_block_till_done()
freezer.move_to(start + timedelta(seconds=8))
hass.states.async_set(entity_id, STATE_OFF, {"test_attr": True})
await hass.async_block_till_done()
await async_wait_recording_done(hass)
freezer.move_to(start + timedelta(seconds=10))
with patch(
"homeassistant.components.recorder.history.get_significant_states",
side_effect=history_error,
):
test = await _setup_attr_state_condition(
hass,
entity_ids=entity_id,
states={True},
condition_options={CONF_FOR: {"seconds": 5}},
)
# Fell back to the conservative anchor (last_updated bumped at t=8),
# so the 5s `for:` is not satisfied 2s later.
assert test.async_check() is False
async def test_state_condition_attr_duration_history_lookback_capped(
recorder_mock: Recorder, hass: HomeAssistant
) -> None:
"""The history lookback is capped, regardless of a longer `for:` duration."""
entity_id = "test.entity_1"
start = dt_util.utcnow()
with freeze_time(start):
hass.states.async_set(entity_id, STATE_ON, {"test_attr": True})
await hass.async_block_till_done()
await async_wait_recording_done(hass)
captured: dict[str, datetime] = {}
def _capture(hass_: HomeAssistant, start_time: datetime, **kwargs: Any) -> dict:
captured["start_time"] = start_time
return {}
with patch(
"homeassistant.components.recorder.history.get_significant_states",
side_effect=_capture,
):
await _setup_attr_state_condition(
hass,
entity_ids=entity_id,
states={True},
condition_options={CONF_FOR: {"hours": 8}},
)
# The 8h `for:` is clamped to the 6h cap.
assert dt_util.utcnow() - captured["start_time"] == MAX_HISTORY_PRIME_LOOKBACK
async def test_state_condition_attr_duration_history_long_for_uses_live_anchor(
recorder_mock: Recorder, hass: HomeAssistant
) -> None:
"""A `for:` longer than the lookback cap still uses the live anchor.
The entity has been valid for 10h (longer than the 6h history cap). History
alone can only prove the last 6h, but the live state's last_updated (10h
ago, never changed) proves the full run, so the 8h `for:` is met. This
requires combining history with the live anchor rather than overriding it.
"""
entity_id = "test.entity_1"
start = dt_util.utcnow()
with freeze_time(start) as freezer:
hass.states.async_set(entity_id, STATE_ON, {"test_attr": True})
await hass.async_block_till_done()
await async_wait_recording_done(hass)
freezer.move_to(start + timedelta(hours=10))
test = await _setup_attr_state_condition(
hass,
entity_ids=entity_id,
states={True},
condition_options={CONF_FOR: {"hours": 8}},
)
assert test.async_check() is True
async def test_state_condition_attr_duration_history_loaded_for_added_entity(
recorder_mock: Recorder, hass: HomeAssistant
) -> None:
"""History is loaded for an entity added to the target after setup.
The entity is only tracked once it gains the targeted label. Resolving its
anchor runs in a background task, and the entity is not counted until that
completes (no interim conservative anchor). Once loaded, its anchor comes
from history just like the initial set: the attribute stayed valid across an
unrelated main-state change, so the duration is met even though the live
last_updated anchor alone (bumped by the OFF write) would report not met.
"""
label_reg = lr.async_get(hass)
label = label_reg.async_create("Test Late History")
entity_reg = er.async_get(hass)
entry = entity_reg.async_get_or_create(
domain="test", platform="test", unique_id="late_history"
)
entity_id = entry.entity_id
start = dt_util.utcnow()
with freeze_time(start) as freezer:
# Valid since t=0; unrelated main-state change at t=8 keeps the attr valid.
hass.states.async_set(entity_id, STATE_ON, {"test_attr": True})
await hass.async_block_till_done()
freezer.move_to(start + timedelta(seconds=8))
hass.states.async_set(entity_id, STATE_OFF, {"test_attr": True})
await hass.async_block_till_done()
await async_wait_recording_done(hass)
# The entity has no label yet, so it is not tracked at setup.
freezer.move_to(start + timedelta(seconds=10))
test = await _setup_attr_state_condition_with_target(
hass,
target={ATTR_LABEL_ID: label.label_id},
states={True},
condition_options={CONF_FOR: {"seconds": 5}},
)
assert test.async_check() is False
# Adding the label tracks the entity, but its anchor is resolved in a
# background task. Until that completes the entity has no _valid_since
# entry and is not counted yet — even though it will be met once loaded.
# Hold the recorder flush open so the load deterministically cannot
# finish before the intermediate check.
instance = get_instance(hass)
gate: asyncio.Future[None] = hass.loop.create_future()
with patch.object(instance, "async_get_commit_future", return_value=gate):
entity_reg.async_update_entity(entity_id, labels={label.label_id})
assert test.async_check() is False
# Release the flush; the query runs and the anchor is stored.
gate.set_result(None)
await hass.async_block_till_done()
# History loaded: continuously valid for 10s → 5s `for:` is met.
assert test.async_check() is True
async def test_state_condition_attr_duration_not_counted_while_history_loads(
recorder_mock: Recorder, hass: HomeAssistant
) -> None:
"""The known gap: a new entity is not counted while its history loads.
Resolving a newly tracked entity's `for:` anchor is asynchronous. Until the
recorder read completes the entity has no `_valid_since` entry, so the
condition does not count it — even though it will be met once loaded. This
holds the recorder read open to observe that window deterministically.
"""
label_reg = lr.async_get(hass)
label = label_reg.async_create("Loading Gap")
entity_reg = er.async_get(hass)
entry = entity_reg.async_get_or_create(
domain="test", platform="test", unique_id="loading_gap"
)
entity_id = entry.entity_id
start = dt_util.utcnow()
with freeze_time(start) as freezer:
hass.states.async_set(entity_id, STATE_ON, {"test_attr": True})
await hass.async_block_till_done()
await async_wait_recording_done(hass)
# Valid for 10s by the time it is added, so once loaded the 5s `for:`
# is met.
freezer.move_to(start + timedelta(seconds=10))
test = await _setup_attr_state_condition_with_target(
hass,
target={ATTR_LABEL_ID: label.label_id},
states={True},
condition_options={CONF_FOR: {"seconds": 5}},
)
assert test.async_check() is False
# Hold the recorder flush open so the background history load can't
# finish, then add the entity to the target.
instance = get_instance(hass)
gate: asyncio.Future[None] = hass.loop.create_future()
with patch.object(instance, "async_get_commit_future", return_value=gate):
entity_reg.async_update_entity(entity_id, labels={label.label_id})
# Let the prime task start and block on the held flush.
await asyncio.sleep(0)
# Load in flight → no anchor yet → entity not counted.
assert test.async_check() is False
# Release the flush; the query runs and the anchor is stored.
gate.set_result(None)
await hass.async_block_till_done()
# Loaded now → met.
assert test.async_check() is True
async def test_state_condition_attr_duration_benign_change_during_load_keeps_history(
recorder_mock: Recorder, hass: HomeAssistant
) -> None:
"""A valid live change during the load does not discard history.
If the entity stays valid while its history loads, the run is unbroken and
history's earlier run-start is still accurate, so it is applied. An unrelated
attribute write must not reset the anchor to "now".
"""
label_reg = lr.async_get(hass)
label = label_reg.async_create("Benign During Load")
entity_reg = er.async_get(hass)
entry = entity_reg.async_get_or_create(
domain="test", platform="test", unique_id="benign_during_load"
)
entity_id = entry.entity_id
start = dt_util.utcnow()
with freeze_time(start) as freezer:
# Valid since t=0; history anchors here, well past the 5s `for:`.
hass.states.async_set(entity_id, STATE_ON, {"test_attr": True})
await hass.async_block_till_done()
await async_wait_recording_done(hass)
freezer.move_to(start + timedelta(seconds=10))
test = await _setup_attr_state_condition_with_target(
hass,
target={ATTR_LABEL_ID: label.label_id},
states={True},
condition_options={CONF_FOR: {"seconds": 5}},
)
instance = get_instance(hass)
gate: asyncio.Future[None] = hass.loop.create_future()
with patch.object(instance, "async_get_commit_future", return_value=gate):
entity_reg.async_update_entity(entity_id, labels={label.label_id})
await asyncio.sleep(0) # prime task blocks on the held flush
# Unrelated attribute write while loading: still valid (run unbroken).
hass.states.async_set(
entity_id, STATE_ON, {"test_attr": True, "other": "x"}
)
await asyncio.sleep(0)
# Advance so the change-time anchor alone would satisfy the 5s `for:`.
# The entity must still not be counted while its history is loading —
# the live listener leaves primed entities alone, so there is no
# interim anchor for the change to set.
freezer.move_to(start + timedelta(seconds=18))
assert test.async_check() is False
gate.set_result(None)
await hass.async_block_till_done()
# History was applied despite the benign change → valid since t=0 → met.
assert test.async_check() is True
async def test_state_condition_attr_duration_invalidation_during_load_discards_history(
recorder_mock: Recorder, hass: HomeAssistant
) -> None:
"""An invalidation during the load discards the (now stale) history.
The commit flush only guarantees history up to the flush point; a dip that
commits after it is invisible to the query, so history would still show the
old continuous run. The live listener saw the break, so on revalidation the
anchor comes from live tracking (the post-dip time), not history.
"""
label_reg = lr.async_get(hass)
label = label_reg.async_create("Dip During Load")
entity_reg = er.async_get(hass)
entry = entity_reg.async_get_or_create(
domain="test", platform="test", unique_id="dip_during_load"
)
entity_id = entry.entity_id
start = dt_util.utcnow()
with freeze_time(start) as freezer:
# Valid since t=0; history alone would (stalely) anchor here and report
# the 5s `for:` as met.
hass.states.async_set(entity_id, STATE_ON, {"test_attr": True})
await hass.async_block_till_done()
await async_wait_recording_done(hass)
freezer.move_to(start + timedelta(seconds=10))
test = await _setup_attr_state_condition_with_target(
hass,
target={ATTR_LABEL_ID: label.label_id},
states={True},
condition_options={CONF_FOR: {"seconds": 5}},
)
instance = get_instance(hass)
gate: asyncio.Future[None] = hass.loop.create_future()
with patch.object(instance, "async_get_commit_future", return_value=gate):
entity_reg.async_update_entity(entity_id, labels={label.label_id})
await asyncio.sleep(0) # prime task blocks on the held flush
# Dip invalid then valid again while loading: the run broke.
hass.states.async_set(entity_id, STATE_ON, {"test_attr": False})
await asyncio.sleep(0)
hass.states.async_set(entity_id, STATE_ON, {"test_attr": True})
await asyncio.sleep(0)
gate.set_result(None)
await hass.async_block_till_done()
# Stale history was discarded; the anchor is the post-dip time (now), so
# the 5s `for:` is not yet met.
assert test.async_check() is False
async def test_state_condition_attr_duration_history_flushes_before_query(
recorder_mock: Recorder, hass: HomeAssistant
) -> None:
"""Pending recorder writes are flushed before the history query.
`get_significant_states` only sees committed rows. A state change that
already happened but is still queued in the recorder would be missed by
both the query and the live listener (which only sees changes after it
subscribes), so the queue must be flushed before querying.
"""
entity_id = "test.entity_1"
hass.states.async_set(entity_id, STATE_ON, {"test_attr": True})
await hass.async_block_till_done()
call_order: list[str] = []
instance = get_instance(hass)
real_commit_future = instance.async_get_commit_future
real_query = history.get_significant_states
def _spy_commit_future() -> Any:
call_order.append("flush")
return real_commit_future()
def _spy_query(*args: Any, **kwargs: Any) -> Any:
call_order.append("query")
return real_query(*args, **kwargs)
with (
patch.object(instance, "async_get_commit_future", _spy_commit_future),
patch(
"homeassistant.components.recorder.history.get_significant_states",
_spy_query,
),
):
await _setup_attr_state_condition(
hass,
entity_ids=entity_id,
states={True},
condition_options={CONF_FOR: {"seconds": 5}},
)
assert call_order == ["flush", "query"]
async def test_state_condition_multi_state_duration_uses_history(
recorder_mock: Recorder, hass: HomeAssistant
) -> None:
"""A multi-state condition reads history to anchor across in-set toggles.
A transition within the valid set (here ON->OFF) bumps `last_changed` even
though the condition stays valid, so `last_changed` alone is too
conservative; history finds the true start of the run.
"""
entity_id = "test.entity_1"
start = dt_util.utcnow()
with freeze_time(start) as freezer:
hass.states.async_set(entity_id, STATE_ON)
await hass.async_block_till_done()
# Toggle within the valid set: still valid, but last_changed jumps to t=8.
freezer.move_to(start + timedelta(seconds=8))
hass.states.async_set(entity_id, STATE_OFF)
await hass.async_block_till_done()
await async_wait_recording_done(hass)
freezer.move_to(start + timedelta(seconds=10))
test = await _setup_state_condition(
hass,
states={STATE_ON, STATE_OFF},
target_config={CONF_ENTITY_ID: [entity_id]},
condition_options={CONF_FOR: {"seconds": 5}},
)
# Valid (ON or OFF) for 10s. last_changed alone (t=8) would report not
# met; history anchors to the start of the run, so the 5s `for:` is met.
assert test.async_check() is True
async def test_state_condition_single_state_duration_skips_history(
recorder_mock: Recorder,
hass: HomeAssistant,
freezer: FrozenDateTimeFactory,
) -> None:
"""A single-state condition uses last_changed directly and reads no history.
`_needs_duration_tracking` is False for single-state, no-value_source
conditions, so setup never sets up tracking or queries the recorder.
"""
hass.states.async_set("test.entity_1", STATE_ON)
await hass.async_block_till_done()
with patch(
"homeassistant.components.recorder.history.get_significant_states",
return_value={},
) as mock_history:
test = await _setup_state_condition(
hass,
states=STATE_ON,
target_config={CONF_ENTITY_ID: ["test.entity_1"]},
condition_options={CONF_FOR: {"seconds": 5}},
)
mock_history.assert_not_called()
# The anchor comes straight from state.last_changed, so the duration is met.
freezer.tick(timedelta(seconds=6))
assert test.async_check() is True
class _AttributeBackedStateCondition(EntityConditionBase):
"""Test condition that reads an attribute directly in `is_valid_state`.
+109 -5
View File
@@ -1,5 +1,7 @@
"""Test service helpers."""
import asyncio
import pytest
from homeassistant.components.group import Group
@@ -544,7 +546,7 @@ async def test_async_track_target_selector_state_change_event_empty_selector(
"""Handle state change events."""
with pytest.raises(HomeAssistantError) as excinfo:
target.async_track_target_selector_state_change_event(
await target.async_track_target_selector_state_change_event(
hass, {}, state_change_callback
)
assert str(excinfo.value) == (
@@ -626,7 +628,7 @@ async def test_async_track_target_selector_state_change_event(
ATTR_FLOOR_ID: floor,
ATTR_LABEL_ID: label,
}
unsub = target.async_track_target_selector_state_change_event(
unsub = await target.async_track_target_selector_state_change_event(
hass, selector_config, state_change_callback
)
@@ -762,7 +764,7 @@ async def test_async_track_target_selector_state_change_event_filter(
ATTR_ENTITY_ID: targeted_entity,
ATTR_LABEL_ID: label,
}
unsub = target.async_track_target_selector_state_change_event(
unsub = await target.async_track_target_selector_state_change_event(
hass, selector_config, state_change_callback, entity_filter
)
@@ -835,7 +837,7 @@ async def test_async_track_target_selector_state_change_event_on_entities_update
hass.states.async_set(entity_b.entity_id, STATE_ON)
await hass.async_block_till_done()
unsub = target.async_track_target_selector_state_change_event(
unsub = await target.async_track_target_selector_state_change_event(
hass,
{ATTR_LABEL_ID: label.label_id},
state_change_callback,
@@ -889,6 +891,108 @@ async def test_async_track_target_selector_state_change_event_on_entities_update
assert len(entity_updates) == 0
async def test_async_track_target_selector_update_task_error_is_logged(
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
) -> None:
"""An error in a registry-driven async on_entities_update is logged.
Such updates run as background tasks; an unexpected exception must be
retrieved and logged rather than left for asyncio to surface at GC time.
"""
@callback
def state_change_callback(event: target.TargetStateChangedData) -> None:
"""Handle state change events."""
async def on_entities_update(added: set[str], removed: set[str]) -> None:
raise ValueError("boom")
entity_reg = er.async_get(hass)
label_reg = lr.async_get(hass)
label = label_reg.async_create("Error Test")
entity = entity_reg.async_get_or_create(
domain="light", platform="test", unique_id="err_a"
)
hass.states.async_set(entity.entity_id, STATE_ON)
await hass.async_block_till_done()
# No entity has the label yet, so the awaited initial update does not fire
# and setup succeeds.
unsub = await target.async_track_target_selector_state_change_event(
hass,
{ATTR_LABEL_ID: label.label_id},
state_change_callback,
on_entities_update=on_entities_update,
)
# A registry change adds the entity, so the async callback runs as a task
# and raises; the error is logged, not swallowed.
entity_reg.async_update_entity(entity.entity_id, labels={label.label_id})
await hass.async_block_till_done()
assert "Error handling tracked entities update" in caplog.text
assert "boom" in caplog.text
unsub()
async def test_async_track_target_selector_cancels_update_task_on_unsubscribe(
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Unsubscribing cancels an in-flight registry-driven update task."""
started = asyncio.Event()
release = asyncio.Event() # intentionally never set
cancelled = False
@callback
def state_change_callback(event: target.TargetStateChangedData) -> None:
"""Handle state change events."""
async def on_entities_update(added: set[str], removed: set[str]) -> None:
nonlocal cancelled
started.set()
try:
await release.wait()
except asyncio.CancelledError:
cancelled = True
raise
entity_reg = er.async_get(hass)
label_reg = lr.async_get(hass)
label = label_reg.async_create("Cancel Test")
entity = entity_reg.async_get_or_create(
domain="light", platform="test", unique_id="cancel_a"
)
hass.states.async_set(entity.entity_id, STATE_ON)
await hass.async_block_till_done()
# No entity has the label yet, so the awaited initial update does not fire.
unsub = await target.async_track_target_selector_state_change_event(
hass,
{ATTR_LABEL_ID: label.label_id},
state_change_callback,
on_entities_update=on_entities_update,
)
# Registry change starts the update task, which blocks indefinitely.
entity_reg.async_update_entity(entity.entity_id, labels={label.label_id})
await started.wait()
# Unsubscribing cancels the in-flight task.
unsub()
await asyncio.sleep(0)
assert cancelled is True
# A cancellation is not an error.
assert "Error handling tracked entities update" not in caplog.text
# Drain (a no-op once cancelled; releases the task if cancellation regressed).
release.set()
await hass.async_block_till_done()
async def test_async_track_target_selector_no_on_entities_update(
hass: HomeAssistant,
) -> None:
@@ -904,7 +1008,7 @@ async def test_async_track_target_selector_no_on_entities_update(
await hass.async_block_till_done()
# No on_entities_update — should work without errors
unsub = target.async_track_target_selector_state_change_event(
unsub = await target.async_track_target_selector_state_change_event(
hass,
{ATTR_ENTITY_ID: entity_id},
state_change_callback,