Compare commits

...

1 Commits

Author SHA1 Message Date
Erik a43fbeffbd Add tests showing races in entity triggers 2026-06-11 08:43:52 +02:00
+695 -3
View File
@@ -20,12 +20,14 @@ from homeassistant.components.tag import DOMAIN as TAG_DOMAIN
from homeassistant.components.text import DOMAIN as TEXT_DOMAIN
from homeassistant.const import (
ATTR_DEVICE_CLASS,
ATTR_LABEL_ID,
ATTR_UNIT_OF_MEASUREMENT,
CONF_ENTITY_ID,
CONF_FOR,
CONF_OPTIONS,
CONF_PLATFORM,
CONF_TARGET,
EVENT_STATE_CHANGED,
STATE_OFF,
STATE_ON,
STATE_UNAVAILABLE,
@@ -35,13 +37,20 @@ from homeassistant.const import (
from homeassistant.core import (
CALLBACK_TYPE,
Context,
Event,
EventStateChangedData,
HomeAssistant,
ServiceCall,
State,
callback,
)
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import config_validation as cv, trigger
from homeassistant.helpers import (
config_validation as cv,
entity_registry as er,
label_registry as lr,
trigger,
)
from homeassistant.helpers.automation import (
DomainSpec,
move_top_level_schema_fields_to_options,
@@ -2121,6 +2130,77 @@ async def test_numerical_state_attribute_changed_trigger_thresholds(
assert len(service_calls) == (1 if expected_fires else 0)
async def test_numerical_state_trigger_threshold_entity_same_loop_iteration(
hass: HomeAssistant, service_calls: list[ServiceCall]
) -> None:
"""Test threshold entity changing in the same loop iteration.
The threshold is read from the live state machine when the state change
event is evaluated, one event loop iteration after it was fired. We do
not guarantee exact sequencing between the threshold entity and the
targeted entity: a threshold change in the same iteration as a tracked
value change is applied to the evaluation of that change, even though
the threshold changed after the tracked value.
"""
async def async_get_triggers(hass: HomeAssistant) -> dict[str, type[Trigger]]:
return {
"attribute_changed": make_entity_numerical_state_changed_trigger(
{"test": DomainSpec(value_source="test_attribute")}
),
}
mock_integration(hass, MockModule("test"))
mock_platform(hass, "test.trigger", Mock(async_get_triggers=async_get_triggers))
hass.states.async_set("sensor.threshold", "100")
hass.states.async_set("test.test_entity", "on", {"test_attribute": 50})
await async_setup_component(
hass,
automation.DOMAIN,
{
automation.DOMAIN: {
"trigger": {
CONF_PLATFORM: "test.attribute_changed",
CONF_TARGET: {CONF_ENTITY_ID: "test.test_entity"},
CONF_OPTIONS: {
"threshold": {
"type": "above",
"value": {"entity": "sensor.threshold"},
}
},
},
"action": {
"service": "test.automation",
"data_template": {CONF_ENTITY_ID: "{{ trigger.entity_id }}"},
},
}
},
)
assert len(service_calls) == 0
# The tracked value rises above the threshold (100) and the threshold
# rises above the tracked value in the same event loop iteration. The
# value change is evaluated against the live threshold (200), so the
# trigger does not fire, even though the threshold was 100 when the
# value changed.
hass.states.async_set("test.test_entity", "on", {"test_attribute": 150})
hass.states.async_set("sensor.threshold", "200")
await hass.async_block_till_done()
assert len(service_calls) == 0
# The tracked value changes but stays below the threshold (200), and
# the threshold drops below the tracked value in the same event loop
# iteration. The value change is evaluated against the live threshold
# (100), so the trigger fires, even though the threshold was 200 when
# the value changed. Threshold changes alone never fire the trigger.
hass.states.async_set("test.test_entity", "on", {"test_attribute": 190})
hass.states.async_set("sensor.threshold", "100")
await hass.async_block_till_done()
assert len(service_calls) == 1
async def test_numerical_state_attribute_changed_entity_limit_unit_validation(
hass: HomeAssistant, service_calls: list[ServiceCall]
) -> None:
@@ -3812,8 +3892,13 @@ async def _arm_off_to_on_trigger(
behavior: str,
calls: list[dict[str, Any]],
duration: dict[str, int] | None,
target: dict[str, Any] | None = None,
) -> CALLBACK_TYPE:
"""Set up _OffToOnTrigger via async_initialize_triggers."""
"""Set up _OffToOnTrigger via async_initialize_triggers.
The trigger targets `entity_ids` directly, unless an explicit `target`
configuration is given.
"""
async def async_get_triggers(
hass: HomeAssistant,
@@ -3829,7 +3914,7 @@ async def _arm_off_to_on_trigger(
trigger_config = {
CONF_PLATFORM: "test.off_to_on",
CONF_TARGET: {CONF_ENTITY_ID: entity_ids},
CONF_TARGET: target if target is not None else {CONF_ENTITY_ID: entity_ids},
CONF_OPTIONS: options,
}
@@ -3957,6 +4042,174 @@ async def test_entity_trigger_all_requires_all(
unsub()
async def test_entity_trigger_first_same_loop_iteration(
hass: HomeAssistant,
) -> None:
"""Test behavior first when two entities change in the same loop iteration.
The trigger's state change listener is dispatched via loop.call_soon, one
event loop iteration after the state change event is fired. If a second
tracked entity changes state in the same iteration as the first, both
events are evaluated against the live state machine, which already
includes both changes.
This test documents existing unwanted behavior: entity_a was the first
entity to turn on, so the trigger should fire exactly once for entity_a,
but both events count two matching entities and the trigger never fires.
"""
entity_a = "test.entity_a"
entity_b = "test.entity_b"
hass.states.async_set(entity_a, STATE_OFF)
hass.states.async_set(entity_b, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_off_to_on_trigger(
hass, [entity_a, entity_b], BEHAVIOR_FIRST, calls, duration=None
)
# Both entities turn on in the same event loop iteration, before the
# trigger's deferred listener has a chance to run.
hass.states.async_set(entity_a, STATE_ON)
hass.states.async_set(entity_b, STATE_ON)
await hass.async_block_till_done()
# Unwanted: the trigger should fire exactly once, for entity_a, which
# was the first entity to turn on.
assert len(calls) == 0
unsub()
async def test_entity_trigger_first_no_fire_on_swap_same_loop_iteration(
hass: HomeAssistant,
) -> None:
"""Test behavior first when entities swap states in the same loop iteration.
When one entity is already on and, within a single event loop iteration,
a second entity turns on and the first turns off, the number of matching
entities goes 1 -> 2 -> 1 and never reaches zero, so the trigger should
not fire for the second entity: it was not the first to turn on.
This test documents existing unwanted behavior: entity_b's event is
evaluated against the live state machine, which already shows entity_a
off, counting exactly one match — and the trigger fires spuriously.
"""
entity_a = "test.entity_a"
entity_b = "test.entity_b"
hass.states.async_set(entity_a, STATE_OFF)
hass.states.async_set(entity_b, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_off_to_on_trigger(
hass, [entity_a, entity_b], BEHAVIOR_FIRST, calls, duration=None
)
# A turns on first — the trigger fires for it.
hass.states.async_set(entity_a, STATE_ON)
await hass.async_block_till_done()
assert len(calls) == 1
calls.clear()
# B turns on and A turns off in the same event loop iteration.
hass.states.async_set(entity_b, STATE_ON)
hass.states.async_set(entity_a, STATE_OFF)
await hass.async_block_till_done()
# Unwanted: B was never the first matching entity, the trigger should
# not fire.
assert len(calls) == 1
assert calls[0]["entity_id"] == entity_b
unsub()
async def test_entity_trigger_all_same_loop_iteration(
hass: HomeAssistant,
) -> None:
"""Test behavior all when two entities change in the same loop iteration.
Both entities turn on in a single event loop iteration. Only the second
event completes the all-match, so the trigger should fire exactly once,
for the second entity.
This test documents existing unwanted behavior: both events are
evaluated against the live state machine, which already shows both
entities on, and the trigger fires twice.
"""
entity_a = "test.entity_a"
entity_b = "test.entity_b"
hass.states.async_set(entity_a, STATE_OFF)
hass.states.async_set(entity_b, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_off_to_on_trigger(
hass, [entity_a, entity_b], BEHAVIOR_ALL, calls, duration=None
)
# Both entities turn on in the same event loop iteration, before the
# trigger's deferred listener has a chance to run.
hass.states.async_set(entity_a, STATE_ON)
hass.states.async_set(entity_b, STATE_ON)
await hass.async_block_till_done()
# Unwanted: the all-match was completed by entity_b turning on, so the
# trigger should fire exactly once, for entity_b — but it fires twice.
assert len(calls) == 2
assert calls[0]["entity_id"] == entity_a
assert calls[1]["entity_id"] == entity_b
unsub()
async def test_entity_trigger_first_resubscribe_same_loop_iteration(
hass: HomeAssistant, entity_registry: er.EntityRegistry
) -> None:
"""Test behavior first when a registry update causes resubscription.
A registry update makes the target tracker resubscribe its state change
listener. A state change event fired in the same event loop iteration,
but not yet dispatched, should still be delivered to the trigger.
This test documents existing unwanted behavior: the tracker unsubscribes
its old listener before subscribing the new one, and as the only
subscriber this tears down the shared state change tracker — dropping
the event which was fired but not yet dispatched. The trigger never
fires: entity_a's event is lost, and entity_b's event counts two
matching entities in the live state machine.
"""
entity_a = "test.entity_a"
entity_b = "test.entity_b"
hass.states.async_set(entity_a, STATE_OFF)
hass.states.async_set(entity_b, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_off_to_on_trigger(
hass, [entity_a, entity_b], BEHAVIOR_FIRST, calls, duration=None
)
# entity_a turns on and an unrelated registry entry is created in the
# same event loop iteration. The registry update makes the target tracker
# resubscribe before entity_a's state change event is dispatched.
hass.states.async_set(entity_a, STATE_ON)
entity_registry.async_get_or_create("test", "test", "unrelated")
await hass.async_block_till_done()
# Unwanted: entity_a was the first entity to turn on, the trigger
# should fire — but its event was dropped during resubscription.
assert len(calls) == 0
# entity_b turning on must not fire the trigger: it is not the first.
hass.states.async_set(entity_b, STATE_ON)
await hass.async_block_till_done()
assert len(calls) == 0
unsub()
async def test_entity_trigger_first_requires_exactly_one(
hass: HomeAssistant,
) -> None:
@@ -4225,6 +4478,445 @@ async def test_entity_trigger_duration_each_entity_off_cancels_only_that_entity(
unsub()
@pytest.mark.parametrize("behavior", [BEHAVIOR_FIRST, BEHAVIOR_ALL])
async def test_entity_trigger_blip_same_loop_iteration(
hass: HomeAssistant, behavior: str
) -> None:
"""Test a same-iteration blip (off→on→off) without a duration.
The on-event should fire the trigger — consistent with behavior each
and with the same blip spread over multiple iterations.
This test documents existing unwanted behavior: the on-event is
evaluated against the live state machine, which already shows the
entity off again, so the trigger never fires.
"""
entity_id = "test.entity_1"
hass.states.async_set(entity_id, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_off_to_on_trigger(
hass, [entity_id], behavior, calls, duration=None
)
hass.states.async_set(entity_id, STATE_ON)
hass.states.async_set(entity_id, STATE_OFF)
await hass.async_block_till_done()
# Unwanted: the trigger should fire once, for the on-event.
assert len(calls) == 0
unsub()
@pytest.mark.parametrize("behavior", [BEHAVIOR_FIRST, BEHAVIOR_ALL])
async def test_entity_trigger_blip_same_loop_iteration_with_duration(
hass: HomeAssistant, freezer: FrozenDateTimeFactory, behavior: str
) -> None:
"""Test a same-iteration blip (off→on→off) with a duration.
The state did not hold for the duration, so the trigger must not fire.
(Currently the duration timer is not even armed: the on-event is
evaluated against the live state machine, which already shows the
entity off again.)
"""
entity_id = "test.entity_1"
hass.states.async_set(entity_id, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_off_to_on_trigger(
hass, [entity_id], behavior, calls, duration={"seconds": 5}
)
hass.states.async_set(entity_id, STATE_ON)
hass.states.async_set(entity_id, STATE_OFF)
await hass.async_block_till_done()
# Advance past the duration — should NOT fire
freezer.tick(datetime.timedelta(seconds=6))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 0
unsub()
@pytest.mark.parametrize(
("behavior", "expected_calls"),
[
pytest.param(BEHAVIOR_EACH, 0, id="each"),
pytest.param(BEHAVIOR_FIRST, 1, id="first"),
pytest.param(BEHAVIOR_ALL, 1, id="all"),
],
)
async def test_entity_trigger_duration_cancelled_by_dip_same_loop_iteration(
hass: HomeAssistant,
freezer: FrozenDateTimeFactory,
behavior: str,
expected_calls: int,
) -> None:
"""Test a same-iteration dip through unavailable cancels the timer.
The dip breaks the continuity of the matching state, and the recovery
event cannot restart the timer because the transition out of
unavailable is excluded, so the trigger should never fire.
For behavior each the cancel check uses the dip event's own state and
the timer is correctly cancelled. For first/all this test documents
existing unwanted behavior: the cancel check counts matches against
the live state machine, which already shows the recovery, so the timer
survives and the trigger fires.
"""
entity_id = "test.entity_1"
hass.states.async_set(entity_id, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_off_to_on_trigger(
hass, [entity_id], behavior, calls, duration={"seconds": 5}
)
hass.states.async_set(entity_id, STATE_ON)
await hass.async_block_till_done()
assert len(calls) == 0
# Dip through unavailable and recover in the same event loop iteration
freezer.tick(datetime.timedelta(seconds=2))
async_fire_time_changed(hass)
hass.states.async_set(entity_id, STATE_UNAVAILABLE)
hass.states.async_set(entity_id, STATE_ON)
await hass.async_block_till_done()
# Advance past the original duration — the trigger should not fire
freezer.tick(datetime.timedelta(seconds=10))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == expected_calls
unsub()
async def test_entity_trigger_duration_cancelled_after_resubscription(
hass: HomeAssistant,
freezer: FrozenDateTimeFactory,
entity_registry: er.EntityRegistry,
) -> None:
"""Test the duration cancel check after the target tracker resubscribed.
After a registry-driven resubscription the tracker's state change
listener runs after the duration cancel listener, so the cancel check
cannot rely on the tracked states view alone already including the
event being checked.
"""
entity_a = "test.entity_a"
entity_b = "test.entity_b"
hass.states.async_set(entity_a, STATE_OFF)
hass.states.async_set(entity_b, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_off_to_on_trigger(
hass, [entity_a, entity_b], BEHAVIOR_FIRST, calls, duration={"seconds": 5}
)
hass.states.async_set(entity_a, STATE_ON)
await hass.async_block_till_done()
assert len(calls) == 0
# An unrelated registry entry makes the target tracker resubscribe
entity_registry.async_get_or_create("test", "test", "unrelated")
await hass.async_block_till_done()
# Turning A off must cancel the timer
freezer.tick(datetime.timedelta(seconds=2))
async_fire_time_changed(hass)
hass.states.async_set(entity_a, STATE_OFF)
await hass.async_block_till_done()
# Advance past the original duration — should NOT fire
freezer.tick(datetime.timedelta(seconds=10))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 0
unsub()
async def test_entity_trigger_duration_first_match_handover_after_resubscription(
hass: HomeAssistant,
freezer: FrozenDateTimeFactory,
entity_registry: er.EntityRegistry,
) -> None:
"""Test the duration check sees states changed after a resubscription.
While the timer is waiting, the target tracker resubscribes due to a
registry update, then a second entity turns on and the first turns off.
At least one entity matches throughout, so the timer must survive and
the trigger fire. The cancel check evaluates the second entity through
the targeted states mapping captured when the timer was armed, so the
mapping must keep receiving updates across the resubscription.
"""
entity_a = "test.entity_a"
entity_b = "test.entity_b"
hass.states.async_set(entity_a, STATE_OFF)
hass.states.async_set(entity_b, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_off_to_on_trigger(
hass, [entity_a, entity_b], BEHAVIOR_FIRST, calls, duration={"seconds": 5}
)
hass.states.async_set(entity_a, STATE_ON)
await hass.async_block_till_done()
assert len(calls) == 0
# An unrelated registry entry makes the target tracker resubscribe
entity_registry.async_get_or_create("test", "test", "unrelated")
await hass.async_block_till_done()
# The match hands over from A to B: at least one entity matches at all
# times, so the timer keeps running.
freezer.tick(datetime.timedelta(seconds=1))
async_fire_time_changed(hass)
hass.states.async_set(entity_b, STATE_ON)
await hass.async_block_till_done()
freezer.tick(datetime.timedelta(seconds=1))
async_fire_time_changed(hass)
hass.states.async_set(entity_a, STATE_OFF)
await hass.async_block_till_done()
# Advance past the duration — the trigger fires for the arming event
freezer.tick(datetime.timedelta(seconds=4))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 1
assert calls[0]["entity_id"] == entity_a
unsub()
@pytest.mark.parametrize("behavior", [BEHAVIOR_EACH, BEHAVIOR_FIRST, BEHAVIOR_ALL])
async def test_entity_trigger_duration_not_cancelled_by_attribute_change(
hass: HomeAssistant, freezer: FrozenDateTimeFactory, behavior: str
) -> None:
"""Test an attribute-only change mid-wait does not cancel the timer.
The entity's state stays valid across the state change event, so the
pending duration timer must keep running and fire at its original
deadline; only a change away from a matching state cancels.
"""
entity_id = "test.entity_1"
hass.states.async_set(entity_id, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_off_to_on_trigger(
hass, [entity_id], behavior, calls, duration={"seconds": 5}
)
hass.states.async_set(entity_id, STATE_ON)
await hass.async_block_till_done()
assert len(calls) == 0
# An attribute-only change keeps the state valid
freezer.tick(datetime.timedelta(seconds=2))
async_fire_time_changed(hass)
hass.states.async_set(entity_id, STATE_ON, {"brightness": 10})
await hass.async_block_till_done()
assert len(calls) == 0
# The timer fires at the original deadline
freezer.tick(datetime.timedelta(seconds=4))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 1
assert calls[0]["entity_id"] == entity_id
unsub()
async def test_entity_trigger_duration_each_cancelled_when_entity_leaves_target(
hass: HomeAssistant,
freezer: FrozenDateTimeFactory,
entity_registry: er.EntityRegistry,
) -> None:
"""Test an each duration timer when its entity is untargeted mid-wait.
A pending `for:` wait should not outlive the entity's membership of the
target: when a registry change removes the entity from the target, the
timer should be cancelled.
This test documents existing unwanted behavior: the duration timer
keeps running and the trigger fires for an entity which is no longer
targeted.
"""
label_registry = lr.async_get(hass)
label = label_registry.async_create("Test Each Removal")
entry = entity_registry.async_get_or_create("test", "test", "labeled")
entity_registry.async_update_entity(entry.entity_id, labels={label.label_id})
hass.states.async_set(entry.entity_id, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_off_to_on_trigger(
hass,
[],
BEHAVIOR_EACH,
calls,
duration={"seconds": 5},
target={ATTR_LABEL_ID: label.label_id},
)
hass.states.async_set(entry.entity_id, STATE_ON)
await hass.async_block_till_done()
assert len(calls) == 0
# Removing the label removes the entity from the target mid-wait
freezer.tick(datetime.timedelta(seconds=2))
async_fire_time_changed(hass)
entity_registry.async_update_entity(entry.entity_id, labels=set())
await hass.async_block_till_done()
# Advance past the original duration. Unwanted: the trigger fires for
# the no-longer-targeted entity.
freezer.tick(datetime.timedelta(seconds=10))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 1
assert calls[0]["entity_id"] == entry.entity_id
unsub()
async def test_entity_trigger_duration_all_survives_entity_leaving_target(
hass: HomeAssistant,
freezer: FrozenDateTimeFactory,
entity_registry: er.EntityRegistry,
) -> None:
"""Test a pending all timer when an entity is removed from the target.
Once an entity is removed from the target it should no longer gate the
all-match: the timer should keep running and fire if the remaining
targeted entities stay matching, even if the removed entity changes to
a non-matching state.
This test documents existing unwanted behavior: the duration cancel
check still tracks the entity set frozen when the timer was armed, so
the removed entity turning off cancels the timer and the trigger does
not fire.
"""
label_registry = lr.async_get(hass)
label = label_registry.async_create("Test All Removal")
entry_a = entity_registry.async_get_or_create("test", "test", "labeled_a")
entry_b = entity_registry.async_get_or_create("test", "test", "labeled_b")
entity_registry.async_update_entity(entry_a.entity_id, labels={label.label_id})
entity_registry.async_update_entity(entry_b.entity_id, labels={label.label_id})
hass.states.async_set(entry_a.entity_id, STATE_OFF)
hass.states.async_set(entry_b.entity_id, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_off_to_on_trigger(
hass,
[],
BEHAVIOR_ALL,
calls,
duration={"seconds": 5},
target={ATTR_LABEL_ID: label.label_id},
)
hass.states.async_set(entry_a.entity_id, STATE_ON)
await hass.async_block_till_done()
hass.states.async_set(entry_b.entity_id, STATE_ON)
await hass.async_block_till_done()
assert len(calls) == 0
# B leaves the target mid-wait and turns off: it should no longer gate
# the all-match.
freezer.tick(datetime.timedelta(seconds=2))
async_fire_time_changed(hass)
entity_registry.async_update_entity(entry_b.entity_id, labels=set())
await hass.async_block_till_done()
hass.states.async_set(entry_b.entity_id, STATE_OFF)
await hass.async_block_till_done()
# Unwanted: the remaining targeted entity stayed on for the duration,
# so the trigger should fire — but the no-longer-targeted entity
# cancelled the timer.
freezer.tick(datetime.timedelta(seconds=4))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 0
unsub()
async def test_entity_trigger_first_nested_state_revert(
hass: HomeAssistant,
) -> None:
"""Test a synchronous bus listener reverting a state change.
Writing states from a synchronous bus listener during state change
dispatch is not supported: the nested state write is dispatched to the
target tracker before the event that caused it, inverting per-entity
delivery order. Supported state change tracking via
async_track_state_change_event or async_track_state_change_filtered is
deferred precisely so callbacks cannot run inside the dispatch loop and
cause this.
This test documents the resulting behavior rather than guaranteeing it:
both of entity_a's events are evaluated against the live state machine,
which already shows the entity off again, so the trigger does not fire
for the blip; entity_b turning on later counts as the only match and
fires.
"""
entity_a = "test.entity_a"
entity_b = "test.entity_b"
hass.states.async_set(entity_a, STATE_OFF)
hass.states.async_set(entity_b, STATE_OFF)
await hass.async_block_till_done()
@callback
def revert_entity_a(event: Event[EventStateChangedData]) -> None:
"""Synchronously turn entity_a off again when it turns on."""
if (
event.data["entity_id"] == entity_a
and (new_state := event.data["new_state"]) is not None
and new_state.state == STATE_ON
):
hass.states.async_set(entity_a, STATE_OFF)
# Registered before the trigger is armed, so it runs before the state
# change tracker's bus listener and its nested write is dispatched to
# the tracker first.
unsub_revert = hass.bus.async_listen(EVENT_STATE_CHANGED, revert_entity_a)
calls: list[dict[str, Any]] = []
unsub = await _arm_off_to_on_trigger(
hass, [entity_a, entity_b], BEHAVIOR_FIRST, calls, duration=None
)
# entity_a turns on and is synchronously reverted to off. The trigger
# receives (on→off) then (off→on); the on-event counts no matches in
# the live state machine and the trigger does not fire.
hass.states.async_set(entity_a, STATE_ON)
await hass.async_block_till_done()
assert len(calls) == 0
# entity_a is off, so entity_b is the first matching entity and fires.
hass.states.async_set(entity_b, STATE_ON)
await hass.async_block_till_done()
assert len(calls) == 1
assert calls[0]["entity_id"] == entity_b
unsub()
unsub_revert()
async def test_entity_trigger_duration_all_requires_all(
hass: HomeAssistant, freezer: FrozenDateTimeFactory
) -> None: