Add light state condition

This commit is contained in:
abmantis
2025-07-31 20:20:49 +01:00
parent ff7c125334
commit ffc23a5dda
5 changed files with 392 additions and 0 deletions

View File

@@ -0,0 +1,125 @@
"""Provides conditions for lights."""
from typing import Final
import voluptuous as vol
from homeassistant.const import CONF_CONDITION, CONF_STATE, STATE_OFF, STATE_ON
from homeassistant.core import HomeAssistant, split_entity_id
from homeassistant.helpers import config_validation as cv, target
from homeassistant.helpers.condition import (
Condition,
ConditionCheckerType,
trace_condition_function,
)
from homeassistant.helpers.typing import ConfigType, TemplateVarsType
from .const import DOMAIN
ATTR_BEHAVIOR: Final = "behavior"
BEHAVIOR_ONE: Final = "one"
BEHAVIOR_ANY: Final = "any"
BEHAVIOR_ALL: Final = "all"
STATE_CONDITION_TYPE = f"{DOMAIN}.state"
STATE_CONDITION_SCHEMA = vol.All(
{
**cv.CONDITION_BASE_SCHEMA,
vol.Required(CONF_CONDITION): STATE_CONDITION_TYPE,
vol.Required(CONF_STATE): vol.In([STATE_ON, STATE_OFF]),
vol.Required(ATTR_BEHAVIOR, default=BEHAVIOR_ANY): vol.In(
[BEHAVIOR_ONE, BEHAVIOR_ANY, BEHAVIOR_ALL]
),
**cv.ENTITY_SERVICE_FIELDS,
},
cv.has_at_least_one_key(*cv.ENTITY_SERVICE_FIELDS),
)
class StateCondition(Condition):
"""State condition."""
def __init__(self, hass: HomeAssistant, config: ConfigType) -> None:
"""Initialize condition."""
self._hass = hass
self._config = config
@classmethod
async def async_validate_condition_config(
cls, hass: HomeAssistant, config: ConfigType
) -> ConfigType:
"""Validate config."""
return STATE_CONDITION_SCHEMA(config) # type: ignore[no-any-return]
async def async_condition_from_config(self) -> ConditionCheckerType:
"""Wrap action method with zone based condition."""
state = self._config[CONF_STATE]
behavior = self._config.get(ATTR_BEHAVIOR)
def check_any_match_state(entity_ids: set[str]) -> bool:
"""Test if any entity match the state."""
return any(
entity_state.state == state
for entity_id in entity_ids
if (entity_state := self._hass.states.get(entity_id))
is not None # Ignore unavailable entities
)
def check_all_match_state(entity_ids: set[str]) -> bool:
"""Test if all entities match the state."""
return all(
entity_state.state == state
for entity_id in entity_ids
if (entity_state := self._hass.states.get(entity_id))
is not None # Ignore unavailable entities
)
def check_one_match_state(entity_ids: set[str]) -> bool:
"""Check that only one entity matches the state."""
matched = False
for entity_id in entity_ids:
# Ignore unavailable entities
if (entity_state := self._hass.states.get(entity_id)) is None:
continue
if entity_state.state != state:
continue
if matched:
return False
matched = True
return matched
if behavior == BEHAVIOR_ANY:
matcher = check_any_match_state
elif behavior == BEHAVIOR_ALL:
matcher = check_all_match_state
elif behavior == BEHAVIOR_ONE:
matcher = check_one_match_state
@trace_condition_function
def test_state(hass: HomeAssistant, variables: TemplateVarsType = None) -> bool:
"""Test state condition."""
selector_data = target.TargetSelectorData(self._config)
targeted_entities = target.async_extract_referenced_entity_ids(
hass, selector_data, expand_group=False
)
referenced_entity_ids = targeted_entities.referenced.union(
targeted_entities.indirectly_referenced
)
light_entity_ids = {
entity_id
for entity_id in referenced_entity_ids
if split_entity_id(entity_id)[0] == DOMAIN
}
return matcher(light_entity_ids)
return test_state
CONDITIONS: dict[str, type[Condition]] = {
STATE_CONDITION_TYPE: StateCondition,
}
async def async_get_conditions(hass: HomeAssistant) -> dict[str, type[Condition]]:
"""Return the light conditions."""
return CONDITIONS

View File

@@ -0,0 +1,18 @@
state:
target:
entity:
domain: light
fields:
state:
required: true
selector:
state:
behavior:
required: true
default: any
selector:
select:
options:
- one
- all
- any

View File

@@ -1,4 +1,9 @@
{
"conditions": {
"state": {
"condition": "mdi:state-machine"
}
},
"entity_component": {
"_": {
"default": "mdi:lightbulb",

View File

@@ -37,6 +37,23 @@
"field_xy_color_name": "XY-color",
"section_advanced_fields_name": "Advanced options"
},
"conditions": {
"state": {
"name": "State",
"description": "If lights are in a specific state, such as on or off.",
"description_configured": "If light states match",
"fields": {
"state": {
"name": "State",
"description": "The state to match."
},
"behavior": {
"name": "Behavior",
"description": "How the state should match on the targeted lights."
}
}
}
},
"device_automation": {
"action_type": {
"brightness_decrease": "Decrease {entity_name} brightness",

View File

@@ -0,0 +1,227 @@
"""Test light conditions."""
import pytest
from homeassistant.components import automation
from homeassistant.const import (
ATTR_LABEL_ID,
CONF_CONDITION,
CONF_STATE,
STATE_OFF,
STATE_ON,
STATE_UNAVAILABLE,
)
from homeassistant.core import HomeAssistant, ServiceCall
from homeassistant.helpers import entity_registry as er, label_registry as lr
from homeassistant.setup import async_setup_component
from tests.common import MockConfigEntry
@pytest.fixture(autouse=True, name="stub_blueprint_populate")
def stub_blueprint_populate_autouse(stub_blueprint_populate: None) -> None:
"""Stub copying the blueprints to the config folder."""
@pytest.fixture
async def label_lights(hass: HomeAssistant) -> None:
"""Create multiple light entities associated with labels."""
await async_setup_component(hass, "light", {})
config_entry = MockConfigEntry(domain="test_labels")
config_entry.add_to_hass(hass)
label_reg = lr.async_get(hass)
label = label_reg.async_create("Test Label")
entity_reg = er.async_get(hass)
for i in range(3):
light_entity = entity_reg.async_get_or_create(
domain="light",
platform="test",
unique_id=f"label_light_{i}",
suggested_object_id=f"label_light_{i}",
)
entity_reg.async_update_entity(light_entity.entity_id, labels={label.label_id})
return [
"light.label_light_0",
"light.label_light_1",
"light.label_light_2",
]
async def has_calls_after_trigger(
hass: HomeAssistant, service_calls: list[ServiceCall]
) -> bool:
"""Check if there are service calls after the trigger event."""
hass.bus.async_fire("test_event")
await hass.async_block_till_done()
has_calls = len(service_calls) == 1
service_calls.clear()
return has_calls
@pytest.mark.parametrize("condition_state", [STATE_ON, STATE_OFF])
async def test_light_state_condition_behavior_one(
hass: HomeAssistant,
service_calls: list[ServiceCall],
label_lights: list[str],
condition_state: str,
) -> None:
"""Test the light state condition with the 'one' behavior."""
await async_setup_component(hass, "light", {})
reverse_state = STATE_OFF if condition_state == STATE_ON else STATE_ON
for entity_id in label_lights:
hass.states.async_set(entity_id, reverse_state)
await async_setup_component(
hass,
automation.DOMAIN,
{
automation.DOMAIN: {
"trigger": {"platform": "event", "event_type": "test_event"},
"condition": {
CONF_CONDITION: "light.state",
ATTR_LABEL_ID: "test_label",
"behavior": "one",
CONF_STATE: condition_state,
},
"action": {
"service": "test.automation",
},
}
},
)
# No lights on the condition state
assert not await has_calls_after_trigger(hass, service_calls)
# Set one light to the condition state -> condition pass
hass.states.async_set(label_lights[0], condition_state)
assert await has_calls_after_trigger(hass, service_calls)
# Set second light to the condition state -> condition fail
hass.states.async_set(label_lights[1], condition_state)
assert not await has_calls_after_trigger(hass, service_calls)
# Set first light to unavailable -> condition pass again since only the
# second light is on the condition state
hass.states.async_set(label_lights[0], STATE_UNAVAILABLE)
assert await has_calls_after_trigger(hass, service_calls)
# Set all lights to unavailable -> condition fail
for entity_id in label_lights:
hass.states.async_set(entity_id, STATE_UNAVAILABLE)
assert not await has_calls_after_trigger(hass, service_calls)
@pytest.mark.parametrize("condition_state", [STATE_ON, STATE_OFF])
async def test_light_state_condition_behavior_any(
hass: HomeAssistant,
service_calls: list[ServiceCall],
label_lights: list[str],
condition_state: str,
) -> None:
"""Test the light state condition with the 'any' behavior."""
await async_setup_component(hass, "light", {})
reverse_state = STATE_OFF if condition_state == STATE_ON else STATE_ON
for entity_id in label_lights:
hass.states.async_set(entity_id, reverse_state)
await async_setup_component(
hass,
automation.DOMAIN,
{
automation.DOMAIN: {
"trigger": {"platform": "event", "event_type": "test_event"},
"condition": {
CONF_CONDITION: "light.state",
ATTR_LABEL_ID: "test_label",
"behavior": "any",
CONF_STATE: condition_state,
},
"action": {
"service": "test.automation",
},
}
},
)
# No lights on the condition state
assert not await has_calls_after_trigger(hass, service_calls)
# Set one light to the condition state -> condition pass
hass.states.async_set(label_lights[0], condition_state)
assert await has_calls_after_trigger(hass, service_calls)
# Set all lights to the condition state -> condition pass
for entity_id in label_lights:
hass.states.async_set(entity_id, condition_state)
assert await has_calls_after_trigger(hass, service_calls)
# Set one light to unavailable -> condition pass
hass.states.async_set(label_lights[0], STATE_UNAVAILABLE)
assert await has_calls_after_trigger(hass, service_calls)
# Set all lights to unavailable -> condition fail
for entity_id in label_lights:
hass.states.async_set(entity_id, STATE_UNAVAILABLE)
assert not await has_calls_after_trigger(hass, service_calls)
@pytest.mark.parametrize("condition_state", [STATE_ON, STATE_OFF])
async def test_light_state_condition_behavior_all(
hass: HomeAssistant,
service_calls: list[ServiceCall],
label_lights: list[str],
condition_state: str,
) -> None:
"""Test the light state condition with the 'all' behavior."""
await async_setup_component(hass, "light", {})
reverse_state = STATE_OFF if condition_state == STATE_ON else STATE_ON
for entity_id in label_lights:
hass.states.async_set(entity_id, reverse_state)
await async_setup_component(
hass,
automation.DOMAIN,
{
automation.DOMAIN: {
"trigger": {"platform": "event", "event_type": "test_event"},
"condition": {
CONF_CONDITION: "light.state",
ATTR_LABEL_ID: "test_label",
"behavior": "all",
CONF_STATE: condition_state,
},
"action": {
"service": "test.automation",
},
}
},
)
# No lights on the condition state
assert not await has_calls_after_trigger(hass, service_calls)
# Set one light to the condition state -> condition fail
hass.states.async_set(label_lights[0], condition_state)
assert not await has_calls_after_trigger(hass, service_calls)
# Set all lights to the condition state -> condition pass
for entity_id in label_lights:
hass.states.async_set(entity_id, condition_state)
assert await has_calls_after_trigger(hass, service_calls)
# Set one light to unavailable -> condition should still pass
hass.states.async_set(label_lights[0], STATE_UNAVAILABLE)
# Set all lights to unavailable -> condition fail
for entity_id in label_lights:
hass.states.async_set(entity_id, STATE_UNAVAILABLE)
assert not await has_calls_after_trigger(hass, service_calls)