From ffc23a5dda826512d325573feacbff01738cc06e Mon Sep 17 00:00:00 2001 From: abmantis Date: Thu, 31 Jul 2025 20:20:49 +0100 Subject: [PATCH] Add light state condition --- homeassistant/components/light/condition.py | 125 ++++++++++ .../components/light/conditions.yaml | 18 ++ homeassistant/components/light/icons.json | 5 + homeassistant/components/light/strings.json | 17 ++ tests/components/light/test_condition.py | 227 ++++++++++++++++++ 5 files changed, 392 insertions(+) create mode 100644 homeassistant/components/light/condition.py create mode 100644 homeassistant/components/light/conditions.yaml create mode 100644 tests/components/light/test_condition.py diff --git a/homeassistant/components/light/condition.py b/homeassistant/components/light/condition.py new file mode 100644 index 00000000000..9a0b993a306 --- /dev/null +++ b/homeassistant/components/light/condition.py @@ -0,0 +1,125 @@ +"""Provides conditions for lights.""" + +from typing import Final + +import voluptuous as vol + +from homeassistant.const import CONF_CONDITION, CONF_STATE, STATE_OFF, STATE_ON +from homeassistant.core import HomeAssistant, split_entity_id +from homeassistant.helpers import config_validation as cv, target +from homeassistant.helpers.condition import ( + Condition, + ConditionCheckerType, + trace_condition_function, +) +from homeassistant.helpers.typing import ConfigType, TemplateVarsType + +from .const import DOMAIN + +ATTR_BEHAVIOR: Final = "behavior" +BEHAVIOR_ONE: Final = "one" +BEHAVIOR_ANY: Final = "any" +BEHAVIOR_ALL: Final = "all" + +STATE_CONDITION_TYPE = f"{DOMAIN}.state" +STATE_CONDITION_SCHEMA = vol.All( + { + **cv.CONDITION_BASE_SCHEMA, + vol.Required(CONF_CONDITION): STATE_CONDITION_TYPE, + vol.Required(CONF_STATE): vol.In([STATE_ON, STATE_OFF]), + vol.Required(ATTR_BEHAVIOR, default=BEHAVIOR_ANY): vol.In( + [BEHAVIOR_ONE, BEHAVIOR_ANY, BEHAVIOR_ALL] + ), + **cv.ENTITY_SERVICE_FIELDS, + }, + cv.has_at_least_one_key(*cv.ENTITY_SERVICE_FIELDS), +) + + +class StateCondition(Condition): + """State condition.""" + + def __init__(self, hass: HomeAssistant, config: ConfigType) -> None: + """Initialize condition.""" + self._hass = hass + self._config = config + + @classmethod + async def async_validate_condition_config( + cls, hass: HomeAssistant, config: ConfigType + ) -> ConfigType: + """Validate config.""" + return STATE_CONDITION_SCHEMA(config) # type: ignore[no-any-return] + + async def async_condition_from_config(self) -> ConditionCheckerType: + """Wrap action method with zone based condition.""" + state = self._config[CONF_STATE] + behavior = self._config.get(ATTR_BEHAVIOR) + + def check_any_match_state(entity_ids: set[str]) -> bool: + """Test if any entity match the state.""" + return any( + entity_state.state == state + for entity_id in entity_ids + if (entity_state := self._hass.states.get(entity_id)) + is not None # Ignore unavailable entities + ) + + def check_all_match_state(entity_ids: set[str]) -> bool: + """Test if all entities match the state.""" + return all( + entity_state.state == state + for entity_id in entity_ids + if (entity_state := self._hass.states.get(entity_id)) + is not None # Ignore unavailable entities + ) + + def check_one_match_state(entity_ids: set[str]) -> bool: + """Check that only one entity matches the state.""" + matched = False + for entity_id in entity_ids: + # Ignore unavailable entities + if (entity_state := self._hass.states.get(entity_id)) is None: + continue + if entity_state.state != state: + continue + if matched: + return False + matched = True + return matched + + if behavior == BEHAVIOR_ANY: + matcher = check_any_match_state + elif behavior == BEHAVIOR_ALL: + matcher = check_all_match_state + elif behavior == BEHAVIOR_ONE: + matcher = check_one_match_state + + @trace_condition_function + def test_state(hass: HomeAssistant, variables: TemplateVarsType = None) -> bool: + """Test state condition.""" + selector_data = target.TargetSelectorData(self._config) + targeted_entities = target.async_extract_referenced_entity_ids( + hass, selector_data, expand_group=False + ) + referenced_entity_ids = targeted_entities.referenced.union( + targeted_entities.indirectly_referenced + ) + light_entity_ids = { + entity_id + for entity_id in referenced_entity_ids + if split_entity_id(entity_id)[0] == DOMAIN + } + return matcher(light_entity_ids) + + return test_state + + +CONDITIONS: dict[str, type[Condition]] = { + STATE_CONDITION_TYPE: StateCondition, +} + + +async def async_get_conditions(hass: HomeAssistant) -> dict[str, type[Condition]]: + """Return the light conditions.""" + return CONDITIONS diff --git a/homeassistant/components/light/conditions.yaml b/homeassistant/components/light/conditions.yaml new file mode 100644 index 00000000000..45610329c36 --- /dev/null +++ b/homeassistant/components/light/conditions.yaml @@ -0,0 +1,18 @@ +state: + target: + entity: + domain: light + fields: + state: + required: true + selector: + state: + behavior: + required: true + default: any + selector: + select: + options: + - one + - all + - any diff --git a/homeassistant/components/light/icons.json b/homeassistant/components/light/icons.json index c0b478e895d..08b19333f75 100644 --- a/homeassistant/components/light/icons.json +++ b/homeassistant/components/light/icons.json @@ -1,4 +1,9 @@ { + "conditions": { + "state": { + "condition": "mdi:state-machine" + } + }, "entity_component": { "_": { "default": "mdi:lightbulb", diff --git a/homeassistant/components/light/strings.json b/homeassistant/components/light/strings.json index 7a53f2569e7..3e12db4f228 100644 --- a/homeassistant/components/light/strings.json +++ b/homeassistant/components/light/strings.json @@ -37,6 +37,23 @@ "field_xy_color_name": "XY-color", "section_advanced_fields_name": "Advanced options" }, + "conditions": { + "state": { + "name": "State", + "description": "If lights are in a specific state, such as on or off.", + "description_configured": "If light states match", + "fields": { + "state": { + "name": "State", + "description": "The state to match." + }, + "behavior": { + "name": "Behavior", + "description": "How the state should match on the targeted lights." + } + } + } + }, "device_automation": { "action_type": { "brightness_decrease": "Decrease {entity_name} brightness", diff --git a/tests/components/light/test_condition.py b/tests/components/light/test_condition.py new file mode 100644 index 00000000000..77d079bcc8d --- /dev/null +++ b/tests/components/light/test_condition.py @@ -0,0 +1,227 @@ +"""Test light conditions.""" + +import pytest + +from homeassistant.components import automation +from homeassistant.const import ( + ATTR_LABEL_ID, + CONF_CONDITION, + CONF_STATE, + STATE_OFF, + STATE_ON, + STATE_UNAVAILABLE, +) +from homeassistant.core import HomeAssistant, ServiceCall +from homeassistant.helpers import entity_registry as er, label_registry as lr +from homeassistant.setup import async_setup_component + +from tests.common import MockConfigEntry + + +@pytest.fixture(autouse=True, name="stub_blueprint_populate") +def stub_blueprint_populate_autouse(stub_blueprint_populate: None) -> None: + """Stub copying the blueprints to the config folder.""" + + +@pytest.fixture +async def label_lights(hass: HomeAssistant) -> None: + """Create multiple light entities associated with labels.""" + await async_setup_component(hass, "light", {}) + + config_entry = MockConfigEntry(domain="test_labels") + config_entry.add_to_hass(hass) + + label_reg = lr.async_get(hass) + label = label_reg.async_create("Test Label") + + entity_reg = er.async_get(hass) + + for i in range(3): + light_entity = entity_reg.async_get_or_create( + domain="light", + platform="test", + unique_id=f"label_light_{i}", + suggested_object_id=f"label_light_{i}", + ) + entity_reg.async_update_entity(light_entity.entity_id, labels={label.label_id}) + + return [ + "light.label_light_0", + "light.label_light_1", + "light.label_light_2", + ] + + +async def has_calls_after_trigger( + hass: HomeAssistant, service_calls: list[ServiceCall] +) -> bool: + """Check if there are service calls after the trigger event.""" + hass.bus.async_fire("test_event") + await hass.async_block_till_done() + has_calls = len(service_calls) == 1 + service_calls.clear() + return has_calls + + +@pytest.mark.parametrize("condition_state", [STATE_ON, STATE_OFF]) +async def test_light_state_condition_behavior_one( + hass: HomeAssistant, + service_calls: list[ServiceCall], + label_lights: list[str], + condition_state: str, +) -> None: + """Test the light state condition with the 'one' behavior.""" + await async_setup_component(hass, "light", {}) + + reverse_state = STATE_OFF if condition_state == STATE_ON else STATE_ON + for entity_id in label_lights: + hass.states.async_set(entity_id, reverse_state) + + await async_setup_component( + hass, + automation.DOMAIN, + { + automation.DOMAIN: { + "trigger": {"platform": "event", "event_type": "test_event"}, + "condition": { + CONF_CONDITION: "light.state", + ATTR_LABEL_ID: "test_label", + "behavior": "one", + CONF_STATE: condition_state, + }, + "action": { + "service": "test.automation", + }, + } + }, + ) + + # No lights on the condition state + assert not await has_calls_after_trigger(hass, service_calls) + + # Set one light to the condition state -> condition pass + hass.states.async_set(label_lights[0], condition_state) + assert await has_calls_after_trigger(hass, service_calls) + + # Set second light to the condition state -> condition fail + hass.states.async_set(label_lights[1], condition_state) + assert not await has_calls_after_trigger(hass, service_calls) + + # Set first light to unavailable -> condition pass again since only the + # second light is on the condition state + hass.states.async_set(label_lights[0], STATE_UNAVAILABLE) + assert await has_calls_after_trigger(hass, service_calls) + + # Set all lights to unavailable -> condition fail + for entity_id in label_lights: + hass.states.async_set(entity_id, STATE_UNAVAILABLE) + assert not await has_calls_after_trigger(hass, service_calls) + + +@pytest.mark.parametrize("condition_state", [STATE_ON, STATE_OFF]) +async def test_light_state_condition_behavior_any( + hass: HomeAssistant, + service_calls: list[ServiceCall], + label_lights: list[str], + condition_state: str, +) -> None: + """Test the light state condition with the 'any' behavior.""" + await async_setup_component(hass, "light", {}) + + reverse_state = STATE_OFF if condition_state == STATE_ON else STATE_ON + for entity_id in label_lights: + hass.states.async_set(entity_id, reverse_state) + + await async_setup_component( + hass, + automation.DOMAIN, + { + automation.DOMAIN: { + "trigger": {"platform": "event", "event_type": "test_event"}, + "condition": { + CONF_CONDITION: "light.state", + ATTR_LABEL_ID: "test_label", + "behavior": "any", + CONF_STATE: condition_state, + }, + "action": { + "service": "test.automation", + }, + } + }, + ) + + # No lights on the condition state + assert not await has_calls_after_trigger(hass, service_calls) + + # Set one light to the condition state -> condition pass + hass.states.async_set(label_lights[0], condition_state) + assert await has_calls_after_trigger(hass, service_calls) + + # Set all lights to the condition state -> condition pass + for entity_id in label_lights: + hass.states.async_set(entity_id, condition_state) + assert await has_calls_after_trigger(hass, service_calls) + + # Set one light to unavailable -> condition pass + hass.states.async_set(label_lights[0], STATE_UNAVAILABLE) + assert await has_calls_after_trigger(hass, service_calls) + + # Set all lights to unavailable -> condition fail + for entity_id in label_lights: + hass.states.async_set(entity_id, STATE_UNAVAILABLE) + assert not await has_calls_after_trigger(hass, service_calls) + + +@pytest.mark.parametrize("condition_state", [STATE_ON, STATE_OFF]) +async def test_light_state_condition_behavior_all( + hass: HomeAssistant, + service_calls: list[ServiceCall], + label_lights: list[str], + condition_state: str, +) -> None: + """Test the light state condition with the 'all' behavior.""" + await async_setup_component(hass, "light", {}) + + reverse_state = STATE_OFF if condition_state == STATE_ON else STATE_ON + for entity_id in label_lights: + hass.states.async_set(entity_id, reverse_state) + + await async_setup_component( + hass, + automation.DOMAIN, + { + automation.DOMAIN: { + "trigger": {"platform": "event", "event_type": "test_event"}, + "condition": { + CONF_CONDITION: "light.state", + ATTR_LABEL_ID: "test_label", + "behavior": "all", + CONF_STATE: condition_state, + }, + "action": { + "service": "test.automation", + }, + } + }, + ) + + # No lights on the condition state + assert not await has_calls_after_trigger(hass, service_calls) + + # Set one light to the condition state -> condition fail + hass.states.async_set(label_lights[0], condition_state) + assert not await has_calls_after_trigger(hass, service_calls) + + # Set all lights to the condition state -> condition pass + for entity_id in label_lights: + hass.states.async_set(entity_id, condition_state) + assert await has_calls_after_trigger(hass, service_calls) + + # Set one light to unavailable -> condition should still pass + hass.states.async_set(label_lights[0], STATE_UNAVAILABLE) + + # Set all lights to unavailable -> condition fail + for entity_id in label_lights: + hass.states.async_set(entity_id, STATE_UNAVAILABLE) + assert not await has_calls_after_trigger(hass, service_calls)