mirror of
https://github.com/home-assistant/core.git
synced 2026-06-11 11:41:42 +02:00
Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 2c801453ab | |||
| 1ea8c5d037 |
@@ -460,7 +460,7 @@ class EventTrigger(Trigger):
|
||||
listener = TargetCalendarEventListener(
|
||||
self._hass, target_selection, self._event_type, offset, run_action
|
||||
)
|
||||
return listener.async_setup()
|
||||
return await listener.async_setup()
|
||||
|
||||
|
||||
class EventStartedTrigger(EventTrigger):
|
||||
|
||||
@@ -137,7 +137,7 @@ class TimeRemainingTrigger(Trigger):
|
||||
state = self._hass.states.get(entity_id)
|
||||
schedule_for_state(entity_id, state, state.context if state else None)
|
||||
|
||||
unsub = async_track_target_selector_state_change_event(
|
||||
unsub = await async_track_target_selector_state_change_event(
|
||||
self._hass,
|
||||
self._target,
|
||||
state_change_listener,
|
||||
|
||||
@@ -153,7 +153,7 @@ class ItemTriggerBase(Trigger, abc.ABC):
|
||||
functools.partial(self._handle_item_change, run_action=run_action),
|
||||
self._handle_entities_updated,
|
||||
)
|
||||
return listener.async_setup()
|
||||
return await listener.async_setup()
|
||||
|
||||
@callback
|
||||
@abc.abstractmethod
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
"""Offer reusable conditions."""
|
||||
|
||||
import abc
|
||||
import asyncio
|
||||
from collections import deque
|
||||
from collections.abc import Callable, Container, Coroutine, Generator, Iterable, Mapping
|
||||
from contextlib import contextmanager
|
||||
@@ -56,7 +57,7 @@ from homeassistant.const import (
|
||||
STATE_UNKNOWN,
|
||||
WEEKDAYS,
|
||||
)
|
||||
from homeassistant.core import HomeAssistant, State, callback
|
||||
from homeassistant.core import HomeAssistant, State, callback, split_entity_id
|
||||
from homeassistant.exceptions import (
|
||||
ConditionError,
|
||||
ConditionErrorContainer,
|
||||
@@ -87,6 +88,7 @@ from .automation import (
|
||||
move_options_fields_to_top_level,
|
||||
)
|
||||
from .integration_platform import async_process_integration_platforms
|
||||
from .recorder import get_instance
|
||||
from .selector import (
|
||||
NumericThresholdMode,
|
||||
NumericThresholdSelector,
|
||||
@@ -119,6 +121,16 @@ VALIDATE_CONFIG_FORMAT = "{}_validate_config"
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
# Upper bound on the best-effort recorder query used to prime `for:` durations
|
||||
# at setup. If history can't be read within this window we fall back to the
|
||||
# conservative live-state anchor rather than blocking condition setup.
|
||||
HISTORY_PRIME_TIMEOUT = 10
|
||||
|
||||
# How far back the `for:` priming query reaches. Caps the cost of the query for
|
||||
# very long `for:` durations; beyond this we rely on the live-state anchor, so
|
||||
# such conditions may only become true once enough time has elapsed since setup.
|
||||
MAX_HISTORY_PRIME_LOOKBACK = timedelta(hours=6)
|
||||
|
||||
_PLATFORM_ALIASES: dict[str | None, str | None] = {
|
||||
"and": None,
|
||||
"device": "device_automation",
|
||||
@@ -493,6 +505,11 @@ class EntityConditionBase(Condition):
|
||||
self._matcher = self._check_all_match_state
|
||||
self._on_unload: list[Callable[[], None]] = []
|
||||
self._valid_since: dict[str, datetime] = {}
|
||||
# Entities whose `for:` anchor is currently being resolved from recorder
|
||||
# history. While an entity is here the live listener leaves its anchor to
|
||||
# the prime, except that an invalidation removes it (the run broke, so the
|
||||
# in-flight history is stale and live tracking takes over).
|
||||
self._priming: set[str] = set()
|
||||
|
||||
def entity_filter(self, entities: set[str]) -> set[str]:
|
||||
"""Filter entities matching any of the domain specs."""
|
||||
@@ -533,11 +550,19 @@ class EntityConditionBase(Condition):
|
||||
and self._should_include(_state)
|
||||
and self.is_valid_state(_state)
|
||||
):
|
||||
# While an entity is being primed from history, leave its anchor to
|
||||
# the prime: the entity stayed valid, so the run is unbroken and the
|
||||
# history start (which can be earlier than this update) is accurate.
|
||||
if entity_id in self._priming:
|
||||
return
|
||||
# Only record the time if not already tracked, to avoid
|
||||
# resetting the duration on unrelated state/attribute updates.
|
||||
if entity_id not in self._valid_since:
|
||||
self._valid_since[entity_id] = self._state_valid_since(_state)
|
||||
else:
|
||||
# An invalidation breaks the run, so any history being loaded for the
|
||||
# entity is now stale; stop priming it and let live tracking own it.
|
||||
self._priming.discard(entity_id)
|
||||
self._valid_since.pop(entity_id, None)
|
||||
|
||||
@override
|
||||
@@ -557,24 +582,147 @@ class EntityConditionBase(Condition):
|
||||
|
||||
self._update_valid_since(entity_id, to_state)
|
||||
|
||||
@callback
|
||||
def _on_entities_update(added: set[str], removed: set[str]) -> None:
|
||||
"""Handle changes to the tracked entity set."""
|
||||
for entity_id in added:
|
||||
self._update_valid_since(entity_id, self._hass.states.get(entity_id))
|
||||
for entity_id in removed:
|
||||
self._valid_since.pop(entity_id, None)
|
||||
|
||||
unsub = async_track_target_selector_state_change_event(
|
||||
unsub = await async_track_target_selector_state_change_event(
|
||||
self._hass,
|
||||
self._target,
|
||||
_state_change_listener,
|
||||
self.entity_filter,
|
||||
_on_entities_update,
|
||||
self._async_on_entities_update,
|
||||
primary_entities_only=self._primary_entities_only,
|
||||
)
|
||||
self._on_unload.append(unsub)
|
||||
|
||||
async def _async_on_entities_update(
|
||||
self, added: set[str], removed: set[str]
|
||||
) -> None:
|
||||
"""Handle changes to the tracked entity set.
|
||||
|
||||
Removed entities stop being tracked immediately. Added entities are only
|
||||
considered by the condition once their `for:` anchor has been resolved
|
||||
(see `_async_prime_valid_since`); until then they are absent from
|
||||
`_valid_since`. The target tracker awaits this for the initial entity set
|
||||
at setup and runs it as a background task for later registry-driven
|
||||
changes.
|
||||
"""
|
||||
for entity_id in removed:
|
||||
self._priming.discard(entity_id)
|
||||
self._valid_since.pop(entity_id, None)
|
||||
await self._async_prime_valid_since(added)
|
||||
|
||||
async def _async_prime_valid_since(self, entity_ids: set[str]) -> None:
|
||||
"""Resolve and store the `for:` anchor for newly tracked entities.
|
||||
|
||||
For each currently-valid entity the anchor is the start of its current
|
||||
continuous run of validity, read from recorder history (bounded by
|
||||
`MAX_HISTORY_PRIME_LOOKBACK`) and combined with the current state's
|
||||
anchor, or the current state's anchor alone when the recorder is
|
||||
unavailable or the read fails. An entity is added to `_valid_since` only
|
||||
once this resolves, so a newly tracked entity does not participate in the
|
||||
condition until its anchor is known — rather than briefly using a
|
||||
conservative anchor that then changes.
|
||||
|
||||
While loading, an entity is held in `_priming`. A live change that keeps
|
||||
it valid is ignored (the run is unbroken, history is accurate), but an
|
||||
invalidation removes it from `_priming` so that we do not apply now-stale
|
||||
history over the live tracking that observed the break.
|
||||
"""
|
||||
# Conservative anchor from the live state for each currently-valid entity.
|
||||
anchors = {
|
||||
entity_id: self._state_valid_since(_state)
|
||||
for entity_id in entity_ids
|
||||
if (_state := self._hass.states.get(entity_id)) is not None
|
||||
and self._should_include(_state)
|
||||
and self.is_valid_state(_state)
|
||||
}
|
||||
if not anchors:
|
||||
return
|
||||
|
||||
self._priming.update(anchors)
|
||||
try:
|
||||
if "recorder" in self._hass.config.components:
|
||||
await self._async_refine_anchors_from_history(anchors)
|
||||
for entity_id, anchor in anchors.items():
|
||||
# Skip entities a live change invalidated mid-load: they were
|
||||
# removed from `_priming`, the run broke, and live tracking (which
|
||||
# saw the break) owns them — applying this history would be stale.
|
||||
if entity_id in self._priming:
|
||||
self._valid_since[entity_id] = anchor
|
||||
finally:
|
||||
self._priming.difference_update(anchors)
|
||||
|
||||
async def _async_refine_anchors_from_history(
|
||||
self, anchors: dict[str, datetime]
|
||||
) -> None:
|
||||
"""Move each anchor in `anchors` back to the true start of its run.
|
||||
|
||||
Mutates `anchors` in place.
|
||||
"""
|
||||
from sqlalchemy.exc import SQLAlchemyError # noqa: PLC0415
|
||||
|
||||
from homeassistant.components.recorder import history # noqa: PLC0415
|
||||
|
||||
if TYPE_CHECKING:
|
||||
assert self._duration is not None
|
||||
lookback = min(self._duration, MAX_HISTORY_PRIME_LOOKBACK)
|
||||
start_time = dt_util.utcnow() - lookback
|
||||
instance = get_instance(self._hass)
|
||||
try:
|
||||
async with asyncio.timeout(HISTORY_PRIME_TIMEOUT):
|
||||
# The history query only sees committed rows. Wait for the
|
||||
# recorder to flush its queue first.
|
||||
if (commit_future := instance.async_get_commit_future()) is not None:
|
||||
await commit_future
|
||||
historical_states = await instance.async_add_executor_job(
|
||||
ft.partial(
|
||||
history.get_significant_states,
|
||||
self._hass,
|
||||
start_time,
|
||||
entity_ids=list(anchors),
|
||||
include_start_time_state=True,
|
||||
# Mandatory: the default (True) drops attribute-only
|
||||
# changes for entities outside SIGNIFICANT_DOMAINS, which
|
||||
# are exactly the transitions attribute-based conditions
|
||||
# depend on.
|
||||
significant_changes_only=False,
|
||||
minimal_response=False,
|
||||
)
|
||||
)
|
||||
except (SQLAlchemyError, TimeoutError) as err:
|
||||
# Best effort: keep the conservative anchors rather than failing.
|
||||
_LOGGER.debug("Error priming condition durations from history: %s", err)
|
||||
return
|
||||
|
||||
for entity_id, rows in historical_states.items():
|
||||
valid_since = self._valid_since_from_history(
|
||||
entity_id, cast(list[State], rows)
|
||||
)
|
||||
if valid_since is not None:
|
||||
anchors[entity_id] = min(valid_since, anchors[entity_id])
|
||||
|
||||
def _valid_since_from_history(
|
||||
self, entity_id: str, rows: Iterable[State]
|
||||
) -> datetime | None:
|
||||
"""Return when the current continuous run of valid states began.
|
||||
|
||||
Walks historical states oldest-first and returns the anchor time of the
|
||||
first state in the most recent unbroken run of valid states, or None if
|
||||
the most recently recorded state is not valid (e.g. the recorder lags
|
||||
behind the live state machine).
|
||||
"""
|
||||
# Recorder rows are LazyState objects, which skip State.__init__ and so
|
||||
# never populate the domain/object_id that the validity checks rely on.
|
||||
domain, object_id = split_entity_id(entity_id)
|
||||
valid_since: datetime | None = None
|
||||
for _state in rows:
|
||||
_state.domain = domain
|
||||
_state.object_id = object_id
|
||||
if self._should_include(_state) and self.is_valid_state(_state):
|
||||
if valid_since is None:
|
||||
valid_since = self._state_valid_since(_state)
|
||||
else:
|
||||
valid_since = None
|
||||
return valid_since
|
||||
|
||||
@override
|
||||
def _async_unload(self) -> None:
|
||||
"""Unsubscribe from listeners."""
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
"""Helpers for dealing with entity targets."""
|
||||
|
||||
import abc
|
||||
from collections.abc import Callable
|
||||
import asyncio
|
||||
from collections.abc import Callable, Coroutine
|
||||
import dataclasses
|
||||
import logging
|
||||
from logging import Logger
|
||||
@@ -292,7 +293,7 @@ class TargetEntityChangeTracker(abc.ABC):
|
||||
|
||||
self._registry_unsubs: list[CALLBACK_TYPE] = []
|
||||
|
||||
def async_setup(self) -> Callable[[], None]:
|
||||
async def async_setup(self) -> Callable[[], None]:
|
||||
"""Set up the state change tracking."""
|
||||
self._setup_registry_listeners()
|
||||
self._handle_target_update()
|
||||
@@ -304,18 +305,20 @@ class TargetEntityChangeTracker(abc.ABC):
|
||||
"""Called when there's an update to tracked target entities."""
|
||||
|
||||
@callback
|
||||
def _handle_target_update(self, event: Event[Any] | None = None) -> None:
|
||||
"""Handle updates in the tracked targets."""
|
||||
def _referenced_entities(self) -> set[str]:
|
||||
"""Return the currently tracked, filtered entity ids."""
|
||||
selected = async_extract_referenced_entity_ids(
|
||||
self._hass,
|
||||
self._target_selection,
|
||||
expand_group=False,
|
||||
primary_entities_only=self._primary_entities_only,
|
||||
)
|
||||
filtered_entities = self._entity_filter(
|
||||
selected.referenced | selected.indirectly_referenced
|
||||
)
|
||||
self._handle_entities_update(filtered_entities)
|
||||
return self._entity_filter(selected.referenced | selected.indirectly_referenced)
|
||||
|
||||
@callback
|
||||
def _handle_target_update(self, event: Event[Any] | None = None) -> None:
|
||||
"""Handle updates in the tracked targets."""
|
||||
self._handle_entities_update(self._referenced_entities())
|
||||
|
||||
def _setup_registry_listeners(self) -> None:
|
||||
"""Set up listeners for registry changes that require resubscription."""
|
||||
@@ -356,11 +359,20 @@ class TargetStateChangeTracker(TargetEntityChangeTracker):
|
||||
target_selection: TargetSelection,
|
||||
action: Callable[[TargetStateChangedData], Any],
|
||||
entity_filter: Callable[[set[str]], set[str]],
|
||||
on_entities_update: Callable[[set[str], set[str]], None] | None = None,
|
||||
on_entities_update: Callable[
|
||||
[set[str], set[str]], Coroutine[Any, Any, None] | None
|
||||
]
|
||||
| None = None,
|
||||
*,
|
||||
primary_entities_only: bool = True,
|
||||
) -> None:
|
||||
"""Initialize the state change tracker."""
|
||||
"""Initialize the state change tracker.
|
||||
|
||||
`on_entities_update` may be a plain callback or a coroutine function.
|
||||
A coroutine is awaited for the initial entity set (so setup is
|
||||
deterministic) and scheduled as a background task for later
|
||||
registry-driven changes.
|
||||
"""
|
||||
super().__init__(
|
||||
hass,
|
||||
target_selection,
|
||||
@@ -371,17 +383,55 @@ class TargetStateChangeTracker(TargetEntityChangeTracker):
|
||||
self._on_entities_update = on_entities_update
|
||||
self._state_change_unsub: CALLBACK_TYPE | None = None
|
||||
self._tracked_entities: set[str] = set()
|
||||
self._update_tasks: set[asyncio.Task[None]] = set()
|
||||
|
||||
async def async_setup(self) -> Callable[[], None]:
|
||||
"""Set up tracking, awaiting the update for the initial entity set.
|
||||
|
||||
The initial update is awaited so that a coroutine `on_entities_update`
|
||||
(e.g. one that loads history) completes before setup returns. Later
|
||||
registry-driven updates instead arrive via the callback
|
||||
`_handle_entities_update` and are scheduled as background tasks.
|
||||
"""
|
||||
self._setup_registry_listeners()
|
||||
entities = self._referenced_entities()
|
||||
if (coro := self._apply_entities_update(entities)) is not None:
|
||||
await coro
|
||||
return self._unsubscribe
|
||||
|
||||
@callback
|
||||
def _handle_entities_update(self, tracked_entities: set[str]) -> None:
|
||||
"""Handle the tracked entities."""
|
||||
"""Handle a registry-driven change to the tracked entity set."""
|
||||
if (coro := self._apply_entities_update(tracked_entities)) is None:
|
||||
return
|
||||
# Track the task so it can be cancelled on unsubscribe and so its
|
||||
# exception is retrieved (and logged) rather than surfacing only via
|
||||
# asyncio's "Task exception was never retrieved" at garbage collection.
|
||||
task = self._hass.async_create_task(coro, "Target entity tracker update")
|
||||
self._update_tasks.add(task)
|
||||
task.add_done_callback(self._on_update_task_done)
|
||||
|
||||
def _on_update_task_done(self, task: asyncio.Task[None]) -> None:
|
||||
"""Drop a finished update task and surface any unexpected error."""
|
||||
self._update_tasks.discard(task)
|
||||
if not task.cancelled() and (exc := task.exception()) is not None:
|
||||
_LOGGER.error(
|
||||
"Error handling tracked entities update: %s", exc, exc_info=exc
|
||||
)
|
||||
|
||||
def _apply_entities_update(
|
||||
self, tracked_entities: set[str]
|
||||
) -> Coroutine[Any, Any, None] | None:
|
||||
"""Resubscribe to state changes; return the update coroutine, if any."""
|
||||
previous_entities = self._tracked_entities
|
||||
self._tracked_entities = tracked_entities
|
||||
|
||||
result: Coroutine[Any, Any, None] | None = None
|
||||
if self._on_entities_update is not None:
|
||||
added = tracked_entities - previous_entities
|
||||
removed = previous_entities - tracked_entities
|
||||
if added or removed:
|
||||
self._on_entities_update(added, removed)
|
||||
result = self._on_entities_update(added, removed)
|
||||
|
||||
@callback
|
||||
def state_change_listener(event: Event[EventStateChangedData]) -> None:
|
||||
@@ -395,6 +445,7 @@ class TargetStateChangeTracker(TargetEntityChangeTracker):
|
||||
self._state_change_unsub = async_track_state_change_event(
|
||||
self._hass, tracked_entities, state_change_listener
|
||||
)
|
||||
return result
|
||||
|
||||
def _unsubscribe(self) -> None:
|
||||
"""Unsubscribe from all events."""
|
||||
@@ -402,14 +453,18 @@ class TargetStateChangeTracker(TargetEntityChangeTracker):
|
||||
if self._state_change_unsub:
|
||||
self._state_change_unsub()
|
||||
self._state_change_unsub = None
|
||||
for task in self._update_tasks:
|
||||
task.cancel()
|
||||
self._update_tasks.clear()
|
||||
|
||||
|
||||
def async_track_target_selector_state_change_event(
|
||||
async def async_track_target_selector_state_change_event(
|
||||
hass: HomeAssistant,
|
||||
target_selector_config: ConfigType,
|
||||
action: Callable[[TargetStateChangedData], Any],
|
||||
entity_filter: Callable[[set[str]], set[str]] = lambda x: x,
|
||||
on_entities_update: Callable[[set[str], set[str]], None] | None = None,
|
||||
on_entities_update: Callable[[set[str], set[str]], Coroutine[Any, Any, None] | None]
|
||||
| None = None,
|
||||
*,
|
||||
primary_entities_only: bool = True,
|
||||
) -> CALLBACK_TYPE:
|
||||
@@ -419,6 +474,10 @@ def async_track_target_selector_state_change_event(
|
||||
When `primary_entities_only` is True, indirect target
|
||||
expansion (via device, area, and floor) skips entities
|
||||
with an `entity_category` (config or diagnostic entities).
|
||||
|
||||
`on_entities_update` may be a coroutine function; it is awaited for the
|
||||
initial entity set and scheduled as a task for later registry-driven
|
||||
changes, so this function must itself be awaited.
|
||||
"""
|
||||
target_selection = TargetSelection(target_selector_config)
|
||||
if not target_selection.has_any_target:
|
||||
@@ -435,4 +494,4 @@ def async_track_target_selector_state_change_event(
|
||||
on_entities_update,
|
||||
primary_entities_only=primary_entities_only,
|
||||
)
|
||||
return tracker.async_setup()
|
||||
return await tracker.async_setup()
|
||||
|
||||
@@ -579,7 +579,7 @@ class EntityTriggerBase(Trigger):
|
||||
),
|
||||
)
|
||||
|
||||
unsub = async_track_target_selector_state_change_event(
|
||||
unsub = await async_track_target_selector_state_change_event(
|
||||
self._hass,
|
||||
self._target,
|
||||
state_change_listener,
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
"""Test the condition helper."""
|
||||
|
||||
import asyncio
|
||||
from collections.abc import Mapping
|
||||
from contextlib import AbstractContextManager, nullcontext as does_not_raise
|
||||
from dataclasses import dataclass, field
|
||||
@@ -13,12 +14,14 @@ from freezegun import freeze_time
|
||||
from freezegun.api import FrozenDateTimeFactory
|
||||
import pytest
|
||||
from pytest_unordered import unordered
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.components.device_automation import (
|
||||
DOMAIN as DEVICE_AUTOMATION_DOMAIN,
|
||||
)
|
||||
from homeassistant.components.light import DOMAIN as LIGHT_DOMAIN
|
||||
from homeassistant.components.recorder import Recorder, get_instance, history
|
||||
from homeassistant.components.sensor import SensorDeviceClass
|
||||
from homeassistant.components.sun import DOMAIN as SUN_DOMAIN
|
||||
from homeassistant.components.system_health import DOMAIN as SYSTEM_HEALTH_DOMAIN
|
||||
@@ -60,6 +63,7 @@ from homeassistant.helpers.condition import (
|
||||
BEHAVIOR_ALL,
|
||||
BEHAVIOR_ANY,
|
||||
CONDITIONS,
|
||||
MAX_HISTORY_PRIME_LOOKBACK,
|
||||
Condition,
|
||||
ConditionChecker,
|
||||
EntityConditionBase,
|
||||
@@ -79,6 +83,7 @@ from homeassistant.util.unit_conversion import TemperatureConverter
|
||||
from homeassistant.util.yaml.loader import parse_yaml
|
||||
|
||||
from tests.common import MockModule, MockPlatform, mock_integration, mock_platform
|
||||
from tests.components.recorder.common import async_wait_recording_done
|
||||
from tests.typing import WebSocketGenerator
|
||||
|
||||
|
||||
@@ -5074,12 +5079,15 @@ async def test_state_condition_attr_duration_initial_state(
|
||||
duration: int,
|
||||
initially_met: bool,
|
||||
) -> None:
|
||||
"""Test attribute-based condition initialization from existing state.
|
||||
"""Test attribute-based condition initialization without a recorder.
|
||||
|
||||
The condition uses last_updated (not last_changed) to determine how long
|
||||
an attribute-based condition has been true. This is conservative: when
|
||||
With no recorder available the condition falls back to anchoring `for:`
|
||||
durations to the current state's last_updated. This is conservative: when
|
||||
the main state changes but the tracked attribute stays the same,
|
||||
last_updated is bumped and the effective duration resets.
|
||||
last_updated is bumped and the effective duration resets (see the
|
||||
`state_change_bumps_last_updated_not_met` case). The recorder-backed
|
||||
variant in test_state_condition_attr_duration_initial_state_from_history
|
||||
refines this from real history.
|
||||
"""
|
||||
for step in steps:
|
||||
freezer.tick(timedelta(seconds=step.delay_before))
|
||||
@@ -5313,6 +5321,611 @@ async def test_state_condition_attr_duration_unrelated_attr_update(
|
||||
assert test.async_check() is True
|
||||
|
||||
|
||||
async def _record_attr_steps(
|
||||
hass: HomeAssistant,
|
||||
freezer: FrozenDateTimeFactory,
|
||||
start: datetime,
|
||||
entity_id: str,
|
||||
steps: list[_AttrInitStep],
|
||||
) -> int:
|
||||
"""Record a series of state writes into the recorder at controlled times.
|
||||
|
||||
Returns the number of seconds elapsed from `start` to the final write.
|
||||
"""
|
||||
elapsed = 0
|
||||
for step in steps:
|
||||
elapsed += step.delay_before
|
||||
freezer.move_to(start + timedelta(seconds=elapsed))
|
||||
hass.states.async_set(entity_id, step.state, step.attrs)
|
||||
await hass.async_block_till_done()
|
||||
await async_wait_recording_done(hass)
|
||||
return elapsed
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("steps", "wait_before_setup", "initially_met"),
|
||||
[
|
||||
# Valid the entire time → met (10s >= 5s).
|
||||
([_AttrInitStep(STATE_ON, {"test_attr": True})], 10, True),
|
||||
# Valid for less than the `for:` window → not met (3s < 5s).
|
||||
([_AttrInitStep(STATE_ON, {"test_attr": True})], 3, False),
|
||||
# The tracked attribute stayed valid across an unrelated main-state
|
||||
# change. The OFF write bumps last_updated, but history shows the
|
||||
# attribute never left the valid range → met. This is exactly the case
|
||||
# the conservative last_updated anchor reports wrong (it returns False;
|
||||
# see test_state_condition_attr_duration_initial_state).
|
||||
(
|
||||
[
|
||||
_AttrInitStep(STATE_ON, {"test_attr": True}),
|
||||
_AttrInitStep(STATE_OFF, {"test_attr": True}, delay_before=8),
|
||||
],
|
||||
2,
|
||||
True,
|
||||
),
|
||||
# Invalid, then valid 6s before setup → met (6s >= 5s).
|
||||
(
|
||||
[
|
||||
_AttrInitStep(STATE_ON, {"test_attr": False}),
|
||||
_AttrInitStep(STATE_ON, {"test_attr": True}, delay_before=4),
|
||||
],
|
||||
6,
|
||||
True,
|
||||
),
|
||||
# Invalid, then valid only 4s before setup → not met (4s < 5s).
|
||||
(
|
||||
[
|
||||
_AttrInitStep(STATE_ON, {"test_attr": False}),
|
||||
_AttrInitStep(STATE_ON, {"test_attr": True}, delay_before=6),
|
||||
],
|
||||
4,
|
||||
False,
|
||||
),
|
||||
],
|
||||
ids=[
|
||||
"valid_long_enough",
|
||||
"valid_too_short",
|
||||
"valid_across_state_change",
|
||||
"invalid_then_valid_met",
|
||||
"invalid_then_valid_not_met",
|
||||
],
|
||||
)
|
||||
async def test_state_condition_attr_duration_initial_state_from_history(
|
||||
recorder_mock: Recorder,
|
||||
hass: HomeAssistant,
|
||||
steps: list[_AttrInitStep],
|
||||
wait_before_setup: int,
|
||||
initially_met: bool,
|
||||
) -> None:
|
||||
"""Test attribute-based `for:` priming from recorder history.
|
||||
|
||||
With the recorder available, the condition walks recent history to find
|
||||
when the tracked value actually entered its current continuous valid run,
|
||||
rather than conservatively anchoring to the current state's last_updated.
|
||||
The `valid_across_state_change` case is the key improvement: an unrelated
|
||||
main-state change no longer resets the duration.
|
||||
"""
|
||||
entity_id = "test.entity_1"
|
||||
start = dt_util.utcnow()
|
||||
with freeze_time(start) as freezer:
|
||||
elapsed = await _record_attr_steps(hass, freezer, start, entity_id, steps)
|
||||
|
||||
freezer.move_to(start + timedelta(seconds=elapsed + wait_before_setup))
|
||||
test = await _setup_attr_state_condition(
|
||||
hass,
|
||||
entity_ids=entity_id,
|
||||
states={True},
|
||||
condition_options={CONF_FOR: {"seconds": 5}},
|
||||
)
|
||||
assert test.async_check() is initially_met
|
||||
|
||||
|
||||
async def test_state_condition_attr_duration_history_includes_attr_only_changes(
|
||||
recorder_mock: Recorder, hass: HomeAssistant
|
||||
) -> None:
|
||||
"""Attribute-only invalidations inside the window must reset the timer.
|
||||
|
||||
The tracked value dips invalid and recovers through attribute-only changes
|
||||
(the main state stays ON throughout). Those rows are only returned when the
|
||||
history query passes significant_changes_only=False; were they dropped, the
|
||||
window would look continuously valid and the condition would wrongly report
|
||||
the `for:` duration as met.
|
||||
"""
|
||||
entity_id = "test.entity_1"
|
||||
start = dt_util.utcnow()
|
||||
steps = [
|
||||
_AttrInitStep(STATE_ON, {"test_attr": True}),
|
||||
_AttrInitStep(STATE_ON, {"test_attr": False}, delay_before=6),
|
||||
_AttrInitStep(STATE_ON, {"test_attr": True}, delay_before=2),
|
||||
]
|
||||
with freeze_time(start) as freezer:
|
||||
elapsed = await _record_attr_steps(hass, freezer, start, entity_id, steps)
|
||||
|
||||
freezer.move_to(start + timedelta(seconds=elapsed + 2))
|
||||
test = await _setup_attr_state_condition(
|
||||
hass,
|
||||
entity_ids=entity_id,
|
||||
states={True},
|
||||
condition_options={CONF_FOR: {"seconds": 5}},
|
||||
)
|
||||
# Valid only for the last 2s (since the recovery at t=8); the dip to
|
||||
# invalid at t=6 falls inside the 5s window → not met.
|
||||
assert test.async_check() is False
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("behavior", "expected"),
|
||||
[(BEHAVIOR_ANY, True), (BEHAVIOR_ALL, False)],
|
||||
)
|
||||
async def test_state_condition_attr_duration_from_history_multiple_entities(
|
||||
recorder_mock: Recorder,
|
||||
hass: HomeAssistant,
|
||||
behavior: str,
|
||||
expected: bool,
|
||||
) -> None:
|
||||
"""History priming covers every targeted entity in a single query.
|
||||
|
||||
entity_1 has been valid long enough; entity_2 only recently became valid,
|
||||
so `any` passes while `all` does not.
|
||||
"""
|
||||
start = dt_util.utcnow()
|
||||
with freeze_time(start) as freezer:
|
||||
hass.states.async_set("test.entity_1", STATE_ON, {"test_attr": True})
|
||||
hass.states.async_set("test.entity_2", STATE_ON, {"test_attr": False})
|
||||
await hass.async_block_till_done()
|
||||
|
||||
freezer.move_to(start + timedelta(seconds=7))
|
||||
hass.states.async_set("test.entity_2", STATE_ON, {"test_attr": True})
|
||||
await hass.async_block_till_done()
|
||||
await async_wait_recording_done(hass)
|
||||
|
||||
freezer.move_to(start + timedelta(seconds=10))
|
||||
test = await _setup_attr_state_condition(
|
||||
hass,
|
||||
entity_ids=["test.entity_1", "test.entity_2"],
|
||||
states={True},
|
||||
condition_options={ATTR_BEHAVIOR: behavior, CONF_FOR: {"seconds": 5}},
|
||||
)
|
||||
# entity_1 valid for 10s (met); entity_2 valid for only 3s (not met).
|
||||
assert test.async_check() is expected
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"history_error",
|
||||
[SQLAlchemyError("boom"), TimeoutError()],
|
||||
ids=["db_error", "timeout"],
|
||||
)
|
||||
async def test_state_condition_attr_duration_history_error_falls_back(
|
||||
recorder_mock: Recorder,
|
||||
hass: HomeAssistant,
|
||||
history_error: Exception,
|
||||
) -> None:
|
||||
"""A failing/slow history query must not break setup; it falls back.
|
||||
|
||||
The tracked attribute stayed valid across an unrelated main-state change,
|
||||
so a successful history read would report the duration as met. When the
|
||||
query errors or times out, the condition keeps the conservative
|
||||
last_updated anchor (set when the tracker was wired up) instead, which here
|
||||
reports not met — and crucially, setup does not raise.
|
||||
"""
|
||||
entity_id = "test.entity_1"
|
||||
start = dt_util.utcnow()
|
||||
with freeze_time(start) as freezer:
|
||||
hass.states.async_set(entity_id, STATE_ON, {"test_attr": True})
|
||||
await hass.async_block_till_done()
|
||||
freezer.move_to(start + timedelta(seconds=8))
|
||||
hass.states.async_set(entity_id, STATE_OFF, {"test_attr": True})
|
||||
await hass.async_block_till_done()
|
||||
await async_wait_recording_done(hass)
|
||||
|
||||
freezer.move_to(start + timedelta(seconds=10))
|
||||
with patch(
|
||||
"homeassistant.components.recorder.history.get_significant_states",
|
||||
side_effect=history_error,
|
||||
):
|
||||
test = await _setup_attr_state_condition(
|
||||
hass,
|
||||
entity_ids=entity_id,
|
||||
states={True},
|
||||
condition_options={CONF_FOR: {"seconds": 5}},
|
||||
)
|
||||
|
||||
# Fell back to the conservative anchor (last_updated bumped at t=8),
|
||||
# so the 5s `for:` is not satisfied 2s later.
|
||||
assert test.async_check() is False
|
||||
|
||||
|
||||
async def test_state_condition_attr_duration_history_lookback_capped(
|
||||
recorder_mock: Recorder, hass: HomeAssistant
|
||||
) -> None:
|
||||
"""The history lookback is capped, regardless of a longer `for:` duration."""
|
||||
entity_id = "test.entity_1"
|
||||
start = dt_util.utcnow()
|
||||
with freeze_time(start):
|
||||
hass.states.async_set(entity_id, STATE_ON, {"test_attr": True})
|
||||
await hass.async_block_till_done()
|
||||
await async_wait_recording_done(hass)
|
||||
|
||||
captured: dict[str, datetime] = {}
|
||||
|
||||
def _capture(hass_: HomeAssistant, start_time: datetime, **kwargs: Any) -> dict:
|
||||
captured["start_time"] = start_time
|
||||
return {}
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.recorder.history.get_significant_states",
|
||||
side_effect=_capture,
|
||||
):
|
||||
await _setup_attr_state_condition(
|
||||
hass,
|
||||
entity_ids=entity_id,
|
||||
states={True},
|
||||
condition_options={CONF_FOR: {"hours": 8}},
|
||||
)
|
||||
|
||||
# The 8h `for:` is clamped to the 6h cap.
|
||||
assert dt_util.utcnow() - captured["start_time"] == MAX_HISTORY_PRIME_LOOKBACK
|
||||
|
||||
|
||||
async def test_state_condition_attr_duration_history_long_for_uses_live_anchor(
|
||||
recorder_mock: Recorder, hass: HomeAssistant
|
||||
) -> None:
|
||||
"""A `for:` longer than the lookback cap still uses the live anchor.
|
||||
|
||||
The entity has been valid for 10h (longer than the 6h history cap). History
|
||||
alone can only prove the last 6h, but the live state's last_updated (10h
|
||||
ago, never changed) proves the full run, so the 8h `for:` is met. This
|
||||
requires combining history with the live anchor rather than overriding it.
|
||||
"""
|
||||
entity_id = "test.entity_1"
|
||||
start = dt_util.utcnow()
|
||||
with freeze_time(start) as freezer:
|
||||
hass.states.async_set(entity_id, STATE_ON, {"test_attr": True})
|
||||
await hass.async_block_till_done()
|
||||
await async_wait_recording_done(hass)
|
||||
|
||||
freezer.move_to(start + timedelta(hours=10))
|
||||
test = await _setup_attr_state_condition(
|
||||
hass,
|
||||
entity_ids=entity_id,
|
||||
states={True},
|
||||
condition_options={CONF_FOR: {"hours": 8}},
|
||||
)
|
||||
assert test.async_check() is True
|
||||
|
||||
|
||||
async def test_state_condition_attr_duration_history_loaded_for_added_entity(
|
||||
recorder_mock: Recorder, hass: HomeAssistant
|
||||
) -> None:
|
||||
"""History is loaded for an entity added to the target after setup.
|
||||
|
||||
The entity is only tracked once it gains the targeted label. Resolving its
|
||||
anchor runs in a background task, and the entity is not counted until that
|
||||
completes (no interim conservative anchor). Once loaded, its anchor comes
|
||||
from history just like the initial set: the attribute stayed valid across an
|
||||
unrelated main-state change, so the duration is met even though the live
|
||||
last_updated anchor alone (bumped by the OFF write) would report not met.
|
||||
"""
|
||||
label_reg = lr.async_get(hass)
|
||||
label = label_reg.async_create("Test Late History")
|
||||
entity_reg = er.async_get(hass)
|
||||
entry = entity_reg.async_get_or_create(
|
||||
domain="test", platform="test", unique_id="late_history"
|
||||
)
|
||||
entity_id = entry.entity_id
|
||||
|
||||
start = dt_util.utcnow()
|
||||
with freeze_time(start) as freezer:
|
||||
# Valid since t=0; unrelated main-state change at t=8 keeps the attr valid.
|
||||
hass.states.async_set(entity_id, STATE_ON, {"test_attr": True})
|
||||
await hass.async_block_till_done()
|
||||
freezer.move_to(start + timedelta(seconds=8))
|
||||
hass.states.async_set(entity_id, STATE_OFF, {"test_attr": True})
|
||||
await hass.async_block_till_done()
|
||||
await async_wait_recording_done(hass)
|
||||
|
||||
# The entity has no label yet, so it is not tracked at setup.
|
||||
freezer.move_to(start + timedelta(seconds=10))
|
||||
test = await _setup_attr_state_condition_with_target(
|
||||
hass,
|
||||
target={ATTR_LABEL_ID: label.label_id},
|
||||
states={True},
|
||||
condition_options={CONF_FOR: {"seconds": 5}},
|
||||
)
|
||||
assert test.async_check() is False
|
||||
|
||||
# Adding the label tracks the entity, but its anchor is resolved in a
|
||||
# background task. Until that completes the entity has no _valid_since
|
||||
# entry and is not counted yet — even though it will be met once loaded.
|
||||
# Hold the recorder flush open so the load deterministically cannot
|
||||
# finish before the intermediate check.
|
||||
instance = get_instance(hass)
|
||||
gate: asyncio.Future[None] = hass.loop.create_future()
|
||||
with patch.object(instance, "async_get_commit_future", return_value=gate):
|
||||
entity_reg.async_update_entity(entity_id, labels={label.label_id})
|
||||
assert test.async_check() is False
|
||||
|
||||
# Release the flush; the query runs and the anchor is stored.
|
||||
gate.set_result(None)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# History loaded: continuously valid for 10s → 5s `for:` is met.
|
||||
assert test.async_check() is True
|
||||
|
||||
|
||||
async def test_state_condition_attr_duration_not_counted_while_history_loads(
|
||||
recorder_mock: Recorder, hass: HomeAssistant
|
||||
) -> None:
|
||||
"""The known gap: a new entity is not counted while its history loads.
|
||||
|
||||
Resolving a newly tracked entity's `for:` anchor is asynchronous. Until the
|
||||
recorder read completes the entity has no `_valid_since` entry, so the
|
||||
condition does not count it — even though it will be met once loaded. This
|
||||
holds the recorder read open to observe that window deterministically.
|
||||
"""
|
||||
label_reg = lr.async_get(hass)
|
||||
label = label_reg.async_create("Loading Gap")
|
||||
entity_reg = er.async_get(hass)
|
||||
entry = entity_reg.async_get_or_create(
|
||||
domain="test", platform="test", unique_id="loading_gap"
|
||||
)
|
||||
entity_id = entry.entity_id
|
||||
|
||||
start = dt_util.utcnow()
|
||||
with freeze_time(start) as freezer:
|
||||
hass.states.async_set(entity_id, STATE_ON, {"test_attr": True})
|
||||
await hass.async_block_till_done()
|
||||
await async_wait_recording_done(hass)
|
||||
|
||||
# Valid for 10s by the time it is added, so once loaded the 5s `for:`
|
||||
# is met.
|
||||
freezer.move_to(start + timedelta(seconds=10))
|
||||
test = await _setup_attr_state_condition_with_target(
|
||||
hass,
|
||||
target={ATTR_LABEL_ID: label.label_id},
|
||||
states={True},
|
||||
condition_options={CONF_FOR: {"seconds": 5}},
|
||||
)
|
||||
assert test.async_check() is False
|
||||
|
||||
# Hold the recorder flush open so the background history load can't
|
||||
# finish, then add the entity to the target.
|
||||
instance = get_instance(hass)
|
||||
gate: asyncio.Future[None] = hass.loop.create_future()
|
||||
with patch.object(instance, "async_get_commit_future", return_value=gate):
|
||||
entity_reg.async_update_entity(entity_id, labels={label.label_id})
|
||||
# Let the prime task start and block on the held flush.
|
||||
await asyncio.sleep(0)
|
||||
# Load in flight → no anchor yet → entity not counted.
|
||||
assert test.async_check() is False
|
||||
|
||||
# Release the flush; the query runs and the anchor is stored.
|
||||
gate.set_result(None)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# Loaded now → met.
|
||||
assert test.async_check() is True
|
||||
|
||||
|
||||
async def test_state_condition_attr_duration_benign_change_during_load_keeps_history(
|
||||
recorder_mock: Recorder, hass: HomeAssistant
|
||||
) -> None:
|
||||
"""A valid live change during the load does not discard history.
|
||||
|
||||
If the entity stays valid while its history loads, the run is unbroken and
|
||||
history's earlier run-start is still accurate, so it is applied. An unrelated
|
||||
attribute write must not reset the anchor to "now".
|
||||
"""
|
||||
label_reg = lr.async_get(hass)
|
||||
label = label_reg.async_create("Benign During Load")
|
||||
entity_reg = er.async_get(hass)
|
||||
entry = entity_reg.async_get_or_create(
|
||||
domain="test", platform="test", unique_id="benign_during_load"
|
||||
)
|
||||
entity_id = entry.entity_id
|
||||
|
||||
start = dt_util.utcnow()
|
||||
with freeze_time(start) as freezer:
|
||||
# Valid since t=0; history anchors here, well past the 5s `for:`.
|
||||
hass.states.async_set(entity_id, STATE_ON, {"test_attr": True})
|
||||
await hass.async_block_till_done()
|
||||
await async_wait_recording_done(hass)
|
||||
|
||||
freezer.move_to(start + timedelta(seconds=10))
|
||||
test = await _setup_attr_state_condition_with_target(
|
||||
hass,
|
||||
target={ATTR_LABEL_ID: label.label_id},
|
||||
states={True},
|
||||
condition_options={CONF_FOR: {"seconds": 5}},
|
||||
)
|
||||
|
||||
instance = get_instance(hass)
|
||||
gate: asyncio.Future[None] = hass.loop.create_future()
|
||||
with patch.object(instance, "async_get_commit_future", return_value=gate):
|
||||
entity_reg.async_update_entity(entity_id, labels={label.label_id})
|
||||
await asyncio.sleep(0) # prime task blocks on the held flush
|
||||
|
||||
# Unrelated attribute write while loading: still valid (run unbroken).
|
||||
hass.states.async_set(
|
||||
entity_id, STATE_ON, {"test_attr": True, "other": "x"}
|
||||
)
|
||||
await asyncio.sleep(0)
|
||||
|
||||
# Advance so the change-time anchor alone would satisfy the 5s `for:`.
|
||||
# The entity must still not be counted while its history is loading —
|
||||
# the live listener leaves primed entities alone, so there is no
|
||||
# interim anchor for the change to set.
|
||||
freezer.move_to(start + timedelta(seconds=18))
|
||||
assert test.async_check() is False
|
||||
|
||||
gate.set_result(None)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# History was applied despite the benign change → valid since t=0 → met.
|
||||
assert test.async_check() is True
|
||||
|
||||
|
||||
async def test_state_condition_attr_duration_invalidation_during_load_discards_history(
|
||||
recorder_mock: Recorder, hass: HomeAssistant
|
||||
) -> None:
|
||||
"""An invalidation during the load discards the (now stale) history.
|
||||
|
||||
The commit flush only guarantees history up to the flush point; a dip that
|
||||
commits after it is invisible to the query, so history would still show the
|
||||
old continuous run. The live listener saw the break, so on revalidation the
|
||||
anchor comes from live tracking (the post-dip time), not history.
|
||||
"""
|
||||
label_reg = lr.async_get(hass)
|
||||
label = label_reg.async_create("Dip During Load")
|
||||
entity_reg = er.async_get(hass)
|
||||
entry = entity_reg.async_get_or_create(
|
||||
domain="test", platform="test", unique_id="dip_during_load"
|
||||
)
|
||||
entity_id = entry.entity_id
|
||||
|
||||
start = dt_util.utcnow()
|
||||
with freeze_time(start) as freezer:
|
||||
# Valid since t=0; history alone would (stalely) anchor here and report
|
||||
# the 5s `for:` as met.
|
||||
hass.states.async_set(entity_id, STATE_ON, {"test_attr": True})
|
||||
await hass.async_block_till_done()
|
||||
await async_wait_recording_done(hass)
|
||||
|
||||
freezer.move_to(start + timedelta(seconds=10))
|
||||
test = await _setup_attr_state_condition_with_target(
|
||||
hass,
|
||||
target={ATTR_LABEL_ID: label.label_id},
|
||||
states={True},
|
||||
condition_options={CONF_FOR: {"seconds": 5}},
|
||||
)
|
||||
|
||||
instance = get_instance(hass)
|
||||
gate: asyncio.Future[None] = hass.loop.create_future()
|
||||
with patch.object(instance, "async_get_commit_future", return_value=gate):
|
||||
entity_reg.async_update_entity(entity_id, labels={label.label_id})
|
||||
await asyncio.sleep(0) # prime task blocks on the held flush
|
||||
|
||||
# Dip invalid then valid again while loading: the run broke.
|
||||
hass.states.async_set(entity_id, STATE_ON, {"test_attr": False})
|
||||
await asyncio.sleep(0)
|
||||
hass.states.async_set(entity_id, STATE_ON, {"test_attr": True})
|
||||
await asyncio.sleep(0)
|
||||
|
||||
gate.set_result(None)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# Stale history was discarded; the anchor is the post-dip time (now), so
|
||||
# the 5s `for:` is not yet met.
|
||||
assert test.async_check() is False
|
||||
|
||||
|
||||
async def test_state_condition_attr_duration_history_flushes_before_query(
|
||||
recorder_mock: Recorder, hass: HomeAssistant
|
||||
) -> None:
|
||||
"""Pending recorder writes are flushed before the history query.
|
||||
|
||||
`get_significant_states` only sees committed rows. A state change that
|
||||
already happened but is still queued in the recorder would be missed by
|
||||
both the query and the live listener (which only sees changes after it
|
||||
subscribes), so the queue must be flushed before querying.
|
||||
"""
|
||||
entity_id = "test.entity_1"
|
||||
hass.states.async_set(entity_id, STATE_ON, {"test_attr": True})
|
||||
await hass.async_block_till_done()
|
||||
|
||||
call_order: list[str] = []
|
||||
instance = get_instance(hass)
|
||||
real_commit_future = instance.async_get_commit_future
|
||||
real_query = history.get_significant_states
|
||||
|
||||
def _spy_commit_future() -> Any:
|
||||
call_order.append("flush")
|
||||
return real_commit_future()
|
||||
|
||||
def _spy_query(*args: Any, **kwargs: Any) -> Any:
|
||||
call_order.append("query")
|
||||
return real_query(*args, **kwargs)
|
||||
|
||||
with (
|
||||
patch.object(instance, "async_get_commit_future", _spy_commit_future),
|
||||
patch(
|
||||
"homeassistant.components.recorder.history.get_significant_states",
|
||||
_spy_query,
|
||||
),
|
||||
):
|
||||
await _setup_attr_state_condition(
|
||||
hass,
|
||||
entity_ids=entity_id,
|
||||
states={True},
|
||||
condition_options={CONF_FOR: {"seconds": 5}},
|
||||
)
|
||||
|
||||
assert call_order == ["flush", "query"]
|
||||
|
||||
|
||||
async def test_state_condition_multi_state_duration_uses_history(
|
||||
recorder_mock: Recorder, hass: HomeAssistant
|
||||
) -> None:
|
||||
"""A multi-state condition reads history to anchor across in-set toggles.
|
||||
|
||||
A transition within the valid set (here ON->OFF) bumps `last_changed` even
|
||||
though the condition stays valid, so `last_changed` alone is too
|
||||
conservative; history finds the true start of the run.
|
||||
"""
|
||||
entity_id = "test.entity_1"
|
||||
start = dt_util.utcnow()
|
||||
with freeze_time(start) as freezer:
|
||||
hass.states.async_set(entity_id, STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
# Toggle within the valid set: still valid, but last_changed jumps to t=8.
|
||||
freezer.move_to(start + timedelta(seconds=8))
|
||||
hass.states.async_set(entity_id, STATE_OFF)
|
||||
await hass.async_block_till_done()
|
||||
await async_wait_recording_done(hass)
|
||||
|
||||
freezer.move_to(start + timedelta(seconds=10))
|
||||
test = await _setup_state_condition(
|
||||
hass,
|
||||
states={STATE_ON, STATE_OFF},
|
||||
target_config={CONF_ENTITY_ID: [entity_id]},
|
||||
condition_options={CONF_FOR: {"seconds": 5}},
|
||||
)
|
||||
|
||||
# Valid (ON or OFF) for 10s. last_changed alone (t=8) would report not
|
||||
# met; history anchors to the start of the run, so the 5s `for:` is met.
|
||||
assert test.async_check() is True
|
||||
|
||||
|
||||
async def test_state_condition_single_state_duration_skips_history(
|
||||
recorder_mock: Recorder,
|
||||
hass: HomeAssistant,
|
||||
freezer: FrozenDateTimeFactory,
|
||||
) -> None:
|
||||
"""A single-state condition uses last_changed directly and reads no history.
|
||||
|
||||
`_needs_duration_tracking` is False for single-state, no-value_source
|
||||
conditions, so setup never sets up tracking or queries the recorder.
|
||||
"""
|
||||
hass.states.async_set("test.entity_1", STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.recorder.history.get_significant_states",
|
||||
return_value={},
|
||||
) as mock_history:
|
||||
test = await _setup_state_condition(
|
||||
hass,
|
||||
states=STATE_ON,
|
||||
target_config={CONF_ENTITY_ID: ["test.entity_1"]},
|
||||
condition_options={CONF_FOR: {"seconds": 5}},
|
||||
)
|
||||
|
||||
mock_history.assert_not_called()
|
||||
|
||||
# The anchor comes straight from state.last_changed, so the duration is met.
|
||||
freezer.tick(timedelta(seconds=6))
|
||||
assert test.async_check() is True
|
||||
|
||||
|
||||
class _AttributeBackedStateCondition(EntityConditionBase):
|
||||
"""Test condition that reads an attribute directly in `is_valid_state`.
|
||||
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
"""Test service helpers."""
|
||||
|
||||
import asyncio
|
||||
|
||||
import pytest
|
||||
|
||||
from homeassistant.components.group import Group
|
||||
@@ -544,7 +546,7 @@ async def test_async_track_target_selector_state_change_event_empty_selector(
|
||||
"""Handle state change events."""
|
||||
|
||||
with pytest.raises(HomeAssistantError) as excinfo:
|
||||
target.async_track_target_selector_state_change_event(
|
||||
await target.async_track_target_selector_state_change_event(
|
||||
hass, {}, state_change_callback
|
||||
)
|
||||
assert str(excinfo.value) == (
|
||||
@@ -626,7 +628,7 @@ async def test_async_track_target_selector_state_change_event(
|
||||
ATTR_FLOOR_ID: floor,
|
||||
ATTR_LABEL_ID: label,
|
||||
}
|
||||
unsub = target.async_track_target_selector_state_change_event(
|
||||
unsub = await target.async_track_target_selector_state_change_event(
|
||||
hass, selector_config, state_change_callback
|
||||
)
|
||||
|
||||
@@ -762,7 +764,7 @@ async def test_async_track_target_selector_state_change_event_filter(
|
||||
ATTR_ENTITY_ID: targeted_entity,
|
||||
ATTR_LABEL_ID: label,
|
||||
}
|
||||
unsub = target.async_track_target_selector_state_change_event(
|
||||
unsub = await target.async_track_target_selector_state_change_event(
|
||||
hass, selector_config, state_change_callback, entity_filter
|
||||
)
|
||||
|
||||
@@ -835,7 +837,7 @@ async def test_async_track_target_selector_state_change_event_on_entities_update
|
||||
hass.states.async_set(entity_b.entity_id, STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
unsub = target.async_track_target_selector_state_change_event(
|
||||
unsub = await target.async_track_target_selector_state_change_event(
|
||||
hass,
|
||||
{ATTR_LABEL_ID: label.label_id},
|
||||
state_change_callback,
|
||||
@@ -889,6 +891,108 @@ async def test_async_track_target_selector_state_change_event_on_entities_update
|
||||
assert len(entity_updates) == 0
|
||||
|
||||
|
||||
async def test_async_track_target_selector_update_task_error_is_logged(
|
||||
hass: HomeAssistant,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
"""An error in a registry-driven async on_entities_update is logged.
|
||||
|
||||
Such updates run as background tasks; an unexpected exception must be
|
||||
retrieved and logged rather than left for asyncio to surface at GC time.
|
||||
"""
|
||||
|
||||
@callback
|
||||
def state_change_callback(event: target.TargetStateChangedData) -> None:
|
||||
"""Handle state change events."""
|
||||
|
||||
async def on_entities_update(added: set[str], removed: set[str]) -> None:
|
||||
raise ValueError("boom")
|
||||
|
||||
entity_reg = er.async_get(hass)
|
||||
label_reg = lr.async_get(hass)
|
||||
label = label_reg.async_create("Error Test")
|
||||
entity = entity_reg.async_get_or_create(
|
||||
domain="light", platform="test", unique_id="err_a"
|
||||
)
|
||||
hass.states.async_set(entity.entity_id, STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# No entity has the label yet, so the awaited initial update does not fire
|
||||
# and setup succeeds.
|
||||
unsub = await target.async_track_target_selector_state_change_event(
|
||||
hass,
|
||||
{ATTR_LABEL_ID: label.label_id},
|
||||
state_change_callback,
|
||||
on_entities_update=on_entities_update,
|
||||
)
|
||||
|
||||
# A registry change adds the entity, so the async callback runs as a task
|
||||
# and raises; the error is logged, not swallowed.
|
||||
entity_reg.async_update_entity(entity.entity_id, labels={label.label_id})
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert "Error handling tracked entities update" in caplog.text
|
||||
assert "boom" in caplog.text
|
||||
|
||||
unsub()
|
||||
|
||||
|
||||
async def test_async_track_target_selector_cancels_update_task_on_unsubscribe(
|
||||
hass: HomeAssistant,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
"""Unsubscribing cancels an in-flight registry-driven update task."""
|
||||
started = asyncio.Event()
|
||||
release = asyncio.Event() # intentionally never set
|
||||
cancelled = False
|
||||
|
||||
@callback
|
||||
def state_change_callback(event: target.TargetStateChangedData) -> None:
|
||||
"""Handle state change events."""
|
||||
|
||||
async def on_entities_update(added: set[str], removed: set[str]) -> None:
|
||||
nonlocal cancelled
|
||||
started.set()
|
||||
try:
|
||||
await release.wait()
|
||||
except asyncio.CancelledError:
|
||||
cancelled = True
|
||||
raise
|
||||
|
||||
entity_reg = er.async_get(hass)
|
||||
label_reg = lr.async_get(hass)
|
||||
label = label_reg.async_create("Cancel Test")
|
||||
entity = entity_reg.async_get_or_create(
|
||||
domain="light", platform="test", unique_id="cancel_a"
|
||||
)
|
||||
hass.states.async_set(entity.entity_id, STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# No entity has the label yet, so the awaited initial update does not fire.
|
||||
unsub = await target.async_track_target_selector_state_change_event(
|
||||
hass,
|
||||
{ATTR_LABEL_ID: label.label_id},
|
||||
state_change_callback,
|
||||
on_entities_update=on_entities_update,
|
||||
)
|
||||
|
||||
# Registry change starts the update task, which blocks indefinitely.
|
||||
entity_reg.async_update_entity(entity.entity_id, labels={label.label_id})
|
||||
await started.wait()
|
||||
|
||||
# Unsubscribing cancels the in-flight task.
|
||||
unsub()
|
||||
await asyncio.sleep(0)
|
||||
|
||||
assert cancelled is True
|
||||
# A cancellation is not an error.
|
||||
assert "Error handling tracked entities update" not in caplog.text
|
||||
|
||||
# Drain (a no-op once cancelled; releases the task if cancellation regressed).
|
||||
release.set()
|
||||
await hass.async_block_till_done()
|
||||
|
||||
|
||||
async def test_async_track_target_selector_no_on_entities_update(
|
||||
hass: HomeAssistant,
|
||||
) -> None:
|
||||
@@ -904,7 +1008,7 @@ async def test_async_track_target_selector_no_on_entities_update(
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# No on_entities_update — should work without errors
|
||||
unsub = target.async_track_target_selector_state_change_event(
|
||||
unsub = await target.async_track_target_selector_state_change_event(
|
||||
hass,
|
||||
{ATTR_ENTITY_ID: entity_id},
|
||||
state_change_callback,
|
||||
|
||||
Reference in New Issue
Block a user