mirror of
https://github.com/home-assistant/core.git
synced 2026-05-28 11:43:16 +02:00
Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| d9a89beb3d | |||
| 41f783f14d | |||
| 35397b818d | |||
| d42d02f20a | |||
| 99c445f261 | |||
| 567fe85828 |
@@ -39,7 +39,7 @@ on:
|
||||
env:
|
||||
CACHE_VERSION: 3
|
||||
MYPY_CACHE_VERSION: 1
|
||||
HA_SHORT_VERSION: "2026.6"
|
||||
HA_SHORT_VERSION: "2026.7"
|
||||
ADDITIONAL_PYTHON_VERSIONS: "[]"
|
||||
# 10.3 is the oldest supported version
|
||||
# - 10.3.32 is the version currently shipped with Synology (as of 17 Feb 2022)
|
||||
|
||||
@@ -11,7 +11,7 @@ from homeassistant.helpers.hassio import is_hassio
|
||||
|
||||
from .agent import BackupAgent, LocalBackupAgent, OnProgressCallback
|
||||
from .const import DOMAIN, LOGGER
|
||||
from .models import AgentBackup, BackupNotFound
|
||||
from .models import AgentBackup, BackupNotFound, InvalidBackupFilename
|
||||
from .util import read_backup, suggested_filename
|
||||
|
||||
|
||||
@@ -54,7 +54,13 @@ class CoreLocalBackupAgent(LocalBackupAgent):
|
||||
try:
|
||||
backup = read_backup(backup_path)
|
||||
backups[backup.backup_id] = (backup, backup_path)
|
||||
except (OSError, TarError, json.JSONDecodeError, KeyError) as err:
|
||||
except (
|
||||
OSError,
|
||||
TarError,
|
||||
json.JSONDecodeError,
|
||||
KeyError,
|
||||
InvalidBackupFilename,
|
||||
) as err:
|
||||
LOGGER.warning("Unable to read backup %s: %s", backup_path, err)
|
||||
return backups
|
||||
|
||||
@@ -122,7 +128,14 @@ class CoreLocalBackupAgent(LocalBackupAgent):
|
||||
|
||||
def get_new_backup_path(self, backup: AgentBackup) -> Path:
|
||||
"""Return the local path to a new backup."""
|
||||
return self._backup_dir / suggested_filename(backup)
|
||||
candidate = self._backup_dir / suggested_filename(backup)
|
||||
# suggested_filename does not strip separators; refuse paths that would
|
||||
# land outside the backup directory.
|
||||
if candidate.parent != self._backup_dir:
|
||||
raise InvalidBackupFilename(
|
||||
f"Refusing to write outside {self._backup_dir}: {candidate}"
|
||||
)
|
||||
return candidate
|
||||
|
||||
async def async_delete_backup(self, backup_id: str, **kwargs: Any) -> None:
|
||||
"""Delete a backup file."""
|
||||
|
||||
@@ -1978,7 +1978,13 @@ class CoreBackupReaderWriter(BackupReaderWriter):
|
||||
|
||||
try:
|
||||
backup = await async_add_executor_job(read_backup, temp_file)
|
||||
except (OSError, tarfile.TarError, json.JSONDecodeError, KeyError) as err:
|
||||
except (
|
||||
OSError,
|
||||
tarfile.TarError,
|
||||
json.JSONDecodeError,
|
||||
KeyError,
|
||||
InvalidBackupFilename,
|
||||
) as err:
|
||||
LOGGER.warning("Unable to parse backup %s: %s", temp_file, err)
|
||||
raise
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@ import copy
|
||||
from dataclasses import dataclass, replace
|
||||
from io import BytesIO
|
||||
import json
|
||||
from pathlib import Path, PurePath
|
||||
from pathlib import Path, PurePath, PureWindowsPath
|
||||
from queue import SimpleQueue
|
||||
import tarfile
|
||||
import threading
|
||||
@@ -34,7 +34,7 @@ from homeassistant.util.async_iterator import (
|
||||
from homeassistant.util.json import JsonObjectType, json_loads_object
|
||||
|
||||
from .const import BUF_SIZE, LOGGER, SECURETAR_CREATE_VERSION
|
||||
from .models import AddonInfo, AgentBackup, Folder
|
||||
from .models import AddonInfo, AgentBackup, Folder, InvalidBackupFilename
|
||||
|
||||
|
||||
class DecryptError(HomeAssistantError):
|
||||
@@ -109,6 +109,13 @@ def read_backup(backup_path: Path) -> AgentBackup:
|
||||
extra_metadata = cast(dict[str, bool | str], data.get("extra", {}))
|
||||
date = extra_metadata.get("supervisor.backup_request_date", data["date"])
|
||||
|
||||
name = cast(str, data["name"])
|
||||
# The name is used to derive the on-disk filename via suggested_filename;
|
||||
# reject anything that could escape the backup directory.
|
||||
safe_name = PureWindowsPath(name).name
|
||||
if safe_name != name or name in ("", ".", ".."):
|
||||
raise InvalidBackupFilename(f"Invalid backup name: {name!r}")
|
||||
|
||||
return AgentBackup(
|
||||
addons=addons,
|
||||
backup_id=cast(str, data["slug"]),
|
||||
@@ -118,7 +125,7 @@ def read_backup(backup_path: Path) -> AgentBackup:
|
||||
folders=folders,
|
||||
homeassistant_included=homeassistant_included,
|
||||
homeassistant_version=homeassistant_version,
|
||||
name=cast(str, data["name"]),
|
||||
name=name,
|
||||
protected=cast(bool, data.get("protected", False)),
|
||||
size=backup_path.stat().st_size,
|
||||
)
|
||||
|
||||
@@ -12,18 +12,13 @@ from homeassistant.const import (
|
||||
CONF_DOMAIN,
|
||||
CONF_ENTITY_ID,
|
||||
CONF_EVENT,
|
||||
CONF_OPTIONS,
|
||||
CONF_PLATFORM,
|
||||
CONF_TYPE,
|
||||
CONF_ZONE,
|
||||
)
|
||||
from homeassistant.core import CALLBACK_TYPE, HomeAssistant
|
||||
from homeassistant.helpers import config_validation as cv, entity_registry as er
|
||||
from homeassistant.helpers.trigger import (
|
||||
TriggerActionType,
|
||||
TriggerInfo,
|
||||
_async_attach_trigger_cls,
|
||||
)
|
||||
from homeassistant.helpers.trigger import TriggerActionType, TriggerInfo
|
||||
from homeassistant.helpers.typing import ConfigType
|
||||
|
||||
from .const import DOMAIN
|
||||
@@ -84,18 +79,16 @@ async def async_attach_trigger(
|
||||
event = zone.EVENT_ENTER
|
||||
else:
|
||||
event = zone.EVENT_LEAVE
|
||||
zone_config = await zone.LegacyZoneTrigger.async_validate_config(
|
||||
hass,
|
||||
{
|
||||
CONF_OPTIONS: {
|
||||
CONF_ENTITY_ID: [config[CONF_ENTITY_ID]],
|
||||
CONF_ZONE: config[CONF_ZONE],
|
||||
CONF_EVENT: event,
|
||||
}
|
||||
},
|
||||
)
|
||||
return await _async_attach_trigger_cls(
|
||||
hass, zone.LegacyZoneTrigger, "device", zone_config, action, trigger_info
|
||||
|
||||
zone_config = {
|
||||
CONF_PLATFORM: ZONE_DOMAIN,
|
||||
CONF_ENTITY_ID: config[CONF_ENTITY_ID],
|
||||
CONF_ZONE: config[CONF_ZONE],
|
||||
CONF_EVENT: event,
|
||||
}
|
||||
zone_config = await zone.async_validate_trigger_config(hass, zone_config)
|
||||
return await zone.async_attach_trigger(
|
||||
hass, zone_config, action, trigger_info, platform_type="device"
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -169,11 +169,35 @@ class BaseTrackerEntity(Entity):
|
||||
_attr_entity_category = EntityCategory.DIAGNOSTIC
|
||||
_attr_source_type: SourceType
|
||||
|
||||
def __init_subclass__(cls, **kwargs: Any) -> None:
|
||||
"""Post initialisation processing."""
|
||||
super().__init_subclass__(**kwargs)
|
||||
if "battery_level" in cls.__dict__:
|
||||
if cls.__module__.startswith("homeassistant.components."):
|
||||
# Don't ask users to report issue for built in integrations,
|
||||
# they already have issues opened on them.
|
||||
return
|
||||
report_issue = async_suggest_report_issue(
|
||||
async_get_hass_or_none(), module=cls.__module__
|
||||
)
|
||||
_LOGGER.warning(
|
||||
(
|
||||
"%s::%s is overriding the deprecated battery_level property on "
|
||||
"a subclass of BaseTrackerEntity, this will be unsupported from "
|
||||
"Home Assistant 2027.7, please %s"
|
||||
),
|
||||
cls.__module__,
|
||||
cls.__name__,
|
||||
report_issue,
|
||||
)
|
||||
|
||||
@cached_property
|
||||
def battery_level(self) -> int | None:
|
||||
"""Return the battery level of the device.
|
||||
|
||||
Percentage from 0-100.
|
||||
|
||||
The property is deprecated and will be removed in Home Assistant 2027.7.
|
||||
"""
|
||||
return None
|
||||
|
||||
|
||||
@@ -439,6 +439,19 @@ DISCOVERY_SCHEMAS = [
|
||||
),
|
||||
allow_multi=True, # also used for climate entity
|
||||
),
|
||||
MatterDiscoverySchema(
|
||||
platform=Platform.SENSOR,
|
||||
entity_description=MatterSensorEntityDescription(
|
||||
key="SoilMoistureSensor",
|
||||
native_unit_of_measurement=PERCENTAGE,
|
||||
device_class=SensorDeviceClass.MOISTURE,
|
||||
state_class=SensorStateClass.MEASUREMENT,
|
||||
),
|
||||
entity_class=MatterSensor,
|
||||
required_attributes=(
|
||||
clusters.SoilMeasurement.Attributes.SoilMoistureMeasuredValue,
|
||||
),
|
||||
),
|
||||
MatterDiscoverySchema(
|
||||
platform=Platform.SENSOR,
|
||||
entity_description=MatterSensorEntityDescription(
|
||||
|
||||
@@ -8,6 +8,6 @@
|
||||
"iot_class": "local_push",
|
||||
"loggers": ["wiim.sdk", "async_upnp_client"],
|
||||
"quality_scale": "bronze",
|
||||
"requirements": ["wiim==0.1.2"],
|
||||
"requirements": ["wiim==0.1.4"],
|
||||
"zeroconf": ["_linkplay._tcp.local."]
|
||||
}
|
||||
|
||||
@@ -349,15 +349,12 @@ class WiimMediaPlayerEntity(WiimBaseEntity, MediaPlayerEntity):
|
||||
sdk_status_str,
|
||||
)
|
||||
else:
|
||||
self._device.playing_status = sdk_status
|
||||
if sdk_status == SDKPlayingStatus.STOPPED:
|
||||
LOGGER.debug(
|
||||
"Device %s: TransportState is STOPPED."
|
||||
" Resetting media position and metadata",
|
||||
self.entity_id,
|
||||
)
|
||||
self._device.current_position = 0
|
||||
self._device.current_track_duration = 0
|
||||
self._attr_media_position_updated_at = None
|
||||
self._attr_media_duration = None
|
||||
self._attr_media_position = None
|
||||
|
||||
@@ -3,19 +3,5 @@
|
||||
"reload": {
|
||||
"service": "mdi:reload"
|
||||
}
|
||||
},
|
||||
"triggers": {
|
||||
"entered": {
|
||||
"trigger": "mdi:map-marker-plus"
|
||||
},
|
||||
"left": {
|
||||
"trigger": "mdi:map-marker-minus"
|
||||
},
|
||||
"occupancy_cleared": {
|
||||
"trigger": "mdi:account-off"
|
||||
},
|
||||
"occupancy_detected": {
|
||||
"trigger": "mdi:account-group"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,74 +1,8 @@
|
||||
{
|
||||
"common": {
|
||||
"trigger_behavior_name": "Trigger when",
|
||||
"trigger_for_name": "For at least",
|
||||
"trigger_zone_description": "The zone to trigger on.",
|
||||
"trigger_zone_name": "Zone"
|
||||
},
|
||||
"services": {
|
||||
"reload": {
|
||||
"description": "Reloads zones from the YAML-configuration.",
|
||||
"name": "Reload zones"
|
||||
}
|
||||
},
|
||||
"triggers": {
|
||||
"entered": {
|
||||
"description": "Triggers when one or more persons or device trackers enter a zone.",
|
||||
"fields": {
|
||||
"behavior": {
|
||||
"name": "[%key:component::zone::common::trigger_behavior_name%]"
|
||||
},
|
||||
"for": {
|
||||
"name": "[%key:component::zone::common::trigger_for_name%]"
|
||||
},
|
||||
"zone": {
|
||||
"description": "[%key:component::zone::common::trigger_zone_description%]",
|
||||
"name": "[%key:component::zone::common::trigger_zone_name%]"
|
||||
}
|
||||
},
|
||||
"name": "Entered zone"
|
||||
},
|
||||
"left": {
|
||||
"description": "Triggers when one or more persons or device trackers leave a zone.",
|
||||
"fields": {
|
||||
"behavior": {
|
||||
"name": "[%key:component::zone::common::trigger_behavior_name%]"
|
||||
},
|
||||
"for": {
|
||||
"name": "[%key:component::zone::common::trigger_for_name%]"
|
||||
},
|
||||
"zone": {
|
||||
"description": "[%key:component::zone::common::trigger_zone_description%]",
|
||||
"name": "[%key:component::zone::common::trigger_zone_name%]"
|
||||
}
|
||||
},
|
||||
"name": "Left zone"
|
||||
},
|
||||
"occupancy_cleared": {
|
||||
"description": "Triggers when a zone transitions from occupied to unoccupied.",
|
||||
"fields": {
|
||||
"for": {
|
||||
"name": "[%key:component::zone::common::trigger_for_name%]"
|
||||
},
|
||||
"zone": {
|
||||
"description": "[%key:component::zone::triggers::occupancy_detected::fields::zone::description%]",
|
||||
"name": "[%key:component::zone::triggers::occupancy_detected::fields::zone::name%]"
|
||||
}
|
||||
},
|
||||
"name": "Zone occupancy cleared"
|
||||
},
|
||||
"occupancy_detected": {
|
||||
"description": "Triggers when a zone transitions to an occupied state.",
|
||||
"fields": {
|
||||
"for": {
|
||||
"name": "[%key:component::zone::common::trigger_for_name%]"
|
||||
},
|
||||
"zone": {
|
||||
"description": "The zone to monitor.",
|
||||
"name": "Zone"
|
||||
}
|
||||
},
|
||||
"name": "Zone occupancy detected"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,26 +1,22 @@
|
||||
"""Offer zone automation rules."""
|
||||
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Any, cast
|
||||
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.components.device_tracker import ATTR_IN_ZONES
|
||||
from homeassistant.const import (
|
||||
ATTR_FRIENDLY_NAME,
|
||||
CONF_ENTITY_ID,
|
||||
CONF_EVENT,
|
||||
CONF_FOR,
|
||||
CONF_OPTIONS,
|
||||
CONF_TARGET,
|
||||
CONF_PLATFORM,
|
||||
CONF_ZONE,
|
||||
)
|
||||
from homeassistant.core import (
|
||||
CALLBACK_TYPE,
|
||||
Event,
|
||||
EventStateChangedData,
|
||||
HassJob,
|
||||
HomeAssistant,
|
||||
State,
|
||||
callback,
|
||||
)
|
||||
from homeassistant.helpers import (
|
||||
@@ -28,18 +24,8 @@ from homeassistant.helpers import (
|
||||
entity_registry as er,
|
||||
location,
|
||||
)
|
||||
from homeassistant.helpers.automation import (
|
||||
DomainSpec,
|
||||
move_top_level_schema_fields_to_options,
|
||||
)
|
||||
from homeassistant.helpers.event import async_track_state_change_event
|
||||
from homeassistant.helpers.trigger import (
|
||||
ENTITY_STATE_TRIGGER_SCHEMA_WITH_BEHAVIOR,
|
||||
EntityTriggerBase,
|
||||
Trigger,
|
||||
TriggerActionRunner,
|
||||
TriggerConfig,
|
||||
)
|
||||
from homeassistant.helpers.trigger import TriggerActionType, TriggerInfo
|
||||
from homeassistant.helpers.typing import ConfigType
|
||||
|
||||
from . import condition
|
||||
@@ -52,232 +38,93 @@ _LOGGER = logging.getLogger(__name__)
|
||||
|
||||
_EVENT_DESCRIPTION = {EVENT_ENTER: "entering", EVENT_LEAVE: "leaving"}
|
||||
|
||||
_LEGACY_OPTIONS_SCHEMA: dict[vol.Marker, Any] = {
|
||||
vol.Required(CONF_ENTITY_ID): cv.entity_ids_or_uuids,
|
||||
vol.Required(CONF_ZONE): cv.entity_id,
|
||||
vol.Required(CONF_EVENT, default=DEFAULT_EVENT): vol.Any(EVENT_ENTER, EVENT_LEAVE),
|
||||
}
|
||||
|
||||
_LEGACY_TRIGGER_OPTIONS_SCHEMA = vol.Schema(
|
||||
_TRIGGER_SCHEMA = cv.TRIGGER_BASE_SCHEMA.extend(
|
||||
{
|
||||
vol.Required(CONF_OPTIONS): _LEGACY_OPTIONS_SCHEMA,
|
||||
},
|
||||
)
|
||||
|
||||
# New-style zone trigger schema
|
||||
_ZONE_TRIGGER_SCHEMA = ENTITY_STATE_TRIGGER_SCHEMA_WITH_BEHAVIOR.extend(
|
||||
{
|
||||
vol.Required(CONF_OPTIONS): {
|
||||
vol.Required(CONF_ZONE): cv.entity_domain("zone"),
|
||||
},
|
||||
vol.Required(CONF_PLATFORM): "zone",
|
||||
vol.Required(CONF_ENTITY_ID): cv.entity_ids_or_uuids,
|
||||
vol.Required(CONF_ZONE): cv.entity_id,
|
||||
vol.Required(CONF_EVENT, default=DEFAULT_EVENT): vol.Any(
|
||||
EVENT_ENTER, EVENT_LEAVE
|
||||
),
|
||||
}
|
||||
)
|
||||
|
||||
_DOMAIN_SPECS: dict[str, DomainSpec] = {
|
||||
"person": DomainSpec(),
|
||||
"device_tracker": DomainSpec(),
|
||||
}
|
||||
|
||||
async def async_validate_trigger_config(
|
||||
hass: HomeAssistant, config: ConfigType
|
||||
) -> ConfigType:
|
||||
"""Validate trigger config."""
|
||||
config = _TRIGGER_SCHEMA(config)
|
||||
registry = er.async_get(hass)
|
||||
config[CONF_ENTITY_ID] = er.async_validate_entity_ids(
|
||||
registry, config[CONF_ENTITY_ID]
|
||||
)
|
||||
return config
|
||||
|
||||
|
||||
class LegacyZoneTrigger(Trigger):
|
||||
"""Legacy zone trigger (platform: zone)."""
|
||||
async def async_attach_trigger(
|
||||
hass: HomeAssistant,
|
||||
config: ConfigType,
|
||||
action: TriggerActionType,
|
||||
trigger_info: TriggerInfo,
|
||||
*,
|
||||
platform_type: str = "zone",
|
||||
) -> CALLBACK_TYPE:
|
||||
"""Listen for state changes based on configuration."""
|
||||
trigger_data = trigger_info["trigger_data"]
|
||||
entity_id: list[str] = config[CONF_ENTITY_ID]
|
||||
zone_entity_id: str = config[CONF_ZONE]
|
||||
event: str = config[CONF_EVENT]
|
||||
job = HassJob(action)
|
||||
|
||||
@classmethod
|
||||
async def async_validate_complete_config(
|
||||
cls, hass: HomeAssistant, complete_config: ConfigType
|
||||
) -> ConfigType:
|
||||
"""Validate complete config, migrating legacy format to options."""
|
||||
complete_config = move_top_level_schema_fields_to_options(
|
||||
complete_config, _LEGACY_OPTIONS_SCHEMA
|
||||
)
|
||||
return await super().async_validate_complete_config(hass, complete_config)
|
||||
@callback
|
||||
def zone_automation_listener(zone_event: Event[EventStateChangedData]) -> None:
|
||||
"""Listen for state changes and calls action."""
|
||||
entity = zone_event.data["entity_id"]
|
||||
from_s = zone_event.data["old_state"]
|
||||
to_s = zone_event.data["new_state"]
|
||||
|
||||
@classmethod
|
||||
async def async_validate_config(
|
||||
cls, hass: HomeAssistant, config: ConfigType
|
||||
) -> ConfigType:
|
||||
"""Validate config."""
|
||||
config = cast(ConfigType, _LEGACY_TRIGGER_OPTIONS_SCHEMA(config))
|
||||
registry = er.async_get(hass)
|
||||
config[CONF_OPTIONS][CONF_ENTITY_ID] = er.async_validate_entity_ids(
|
||||
registry, config[CONF_OPTIONS][CONF_ENTITY_ID]
|
||||
)
|
||||
return config
|
||||
if (from_s and not location.has_location(from_s)) or (
|
||||
to_s and not location.has_location(to_s)
|
||||
):
|
||||
return
|
||||
|
||||
def __init__(self, hass: HomeAssistant, config: TriggerConfig) -> None:
|
||||
"""Initialize trigger."""
|
||||
super().__init__(hass, config)
|
||||
if TYPE_CHECKING:
|
||||
assert config.options is not None
|
||||
self._options = config.options
|
||||
|
||||
async def async_attach_runner(
|
||||
self, run_action: TriggerActionRunner
|
||||
) -> CALLBACK_TYPE:
|
||||
"""Listen for state changes based on configuration."""
|
||||
entity_id: list[str] = self._options[CONF_ENTITY_ID]
|
||||
zone_entity_id: str = self._options[CONF_ZONE]
|
||||
event: str = self._options[CONF_EVENT]
|
||||
|
||||
@callback
|
||||
def zone_automation_listener(zone_event: Event[EventStateChangedData]) -> None:
|
||||
"""Listen for state changes and calls action."""
|
||||
entity = zone_event.data["entity_id"]
|
||||
from_s = zone_event.data["old_state"]
|
||||
to_s = zone_event.data["new_state"]
|
||||
|
||||
if (from_s and not location.has_location(from_s)) or (
|
||||
to_s and not location.has_location(to_s)
|
||||
):
|
||||
return
|
||||
|
||||
if not (zone_state := self._hass.states.get(zone_entity_id)):
|
||||
_LOGGER.warning(
|
||||
"Non-existing zone '%s' in a zone trigger",
|
||||
zone_entity_id,
|
||||
)
|
||||
return
|
||||
|
||||
from_match = (
|
||||
condition.zone(self._hass, zone_state, from_s) if from_s else False
|
||||
if not (zone_state := hass.states.get(zone_entity_id)):
|
||||
_LOGGER.warning(
|
||||
(
|
||||
"Automation '%s' is referencing non-existing zone '%s' in a zone"
|
||||
" trigger"
|
||||
),
|
||||
trigger_info["name"],
|
||||
zone_entity_id,
|
||||
)
|
||||
to_match = condition.zone(self._hass, zone_state, to_s) if to_s else False
|
||||
return
|
||||
|
||||
if (event == EVENT_ENTER and not from_match and to_match) or (
|
||||
event == EVENT_LEAVE and from_match and not to_match
|
||||
):
|
||||
description = f"{entity} {_EVENT_DESCRIPTION[event]} {zone_state.attributes[ATTR_FRIENDLY_NAME]}"
|
||||
run_action(
|
||||
{
|
||||
from_match = condition.zone(hass, zone_state, from_s) if from_s else False
|
||||
to_match = condition.zone(hass, zone_state, to_s) if to_s else False
|
||||
|
||||
if (event == EVENT_ENTER and not from_match and to_match) or (
|
||||
event == EVENT_LEAVE and from_match and not to_match
|
||||
):
|
||||
description = (
|
||||
f"{entity} {_EVENT_DESCRIPTION[event]}"
|
||||
f" {zone_state.attributes[ATTR_FRIENDLY_NAME]}"
|
||||
)
|
||||
hass.async_run_hass_job(
|
||||
job,
|
||||
{
|
||||
"trigger": {
|
||||
**trigger_data,
|
||||
"platform": platform_type,
|
||||
"entity_id": entity,
|
||||
"from_state": from_s,
|
||||
"to_state": to_s,
|
||||
"zone": zone_state,
|
||||
"event": event,
|
||||
},
|
||||
description,
|
||||
to_s.context if to_s else None,
|
||||
)
|
||||
"description": description,
|
||||
}
|
||||
},
|
||||
to_s.context if to_s else None,
|
||||
)
|
||||
|
||||
return async_track_state_change_event(
|
||||
self._hass, entity_id, zone_automation_listener
|
||||
)
|
||||
|
||||
|
||||
class ZoneTriggerBase(EntityTriggerBase):
|
||||
"""Base for zone-based triggers targeting person and device_tracker entities."""
|
||||
|
||||
_domain_specs = _DOMAIN_SPECS
|
||||
_schema = _ZONE_TRIGGER_SCHEMA
|
||||
|
||||
def __init__(self, hass: HomeAssistant, config: TriggerConfig) -> None:
|
||||
"""Initialize the trigger."""
|
||||
super().__init__(hass, config)
|
||||
self._zone: str = self._options[CONF_ZONE]
|
||||
|
||||
def _in_target_zone(self, state: State) -> bool:
|
||||
"""Check if the entity is in the selected zone."""
|
||||
in_zones = state.attributes.get(ATTR_IN_ZONES) or ()
|
||||
return self._zone in in_zones
|
||||
|
||||
|
||||
class EnteredZoneTrigger(ZoneTriggerBase):
|
||||
"""Trigger when an entity enters the selected zone."""
|
||||
|
||||
def is_valid_transition(self, from_state: State, to_state: State) -> bool:
|
||||
"""Check that the entity was not already in the selected zone."""
|
||||
return not self._in_target_zone(from_state)
|
||||
|
||||
def is_valid_state(self, state: State) -> bool:
|
||||
"""Check that the entity is now in the selected zone."""
|
||||
return self._in_target_zone(state)
|
||||
|
||||
|
||||
class LeftZoneTrigger(ZoneTriggerBase):
|
||||
"""Trigger when an entity leaves the selected zone."""
|
||||
|
||||
def is_valid_transition(self, from_state: State, to_state: State) -> bool:
|
||||
"""Check that the entity was previously in the selected zone."""
|
||||
return self._in_target_zone(from_state)
|
||||
|
||||
def is_valid_state(self, state: State) -> bool:
|
||||
"""Check that the entity is no longer in the selected zone."""
|
||||
return not self._in_target_zone(state)
|
||||
|
||||
|
||||
_OCCUPANCY_TRIGGER_SCHEMA = vol.Schema(
|
||||
{
|
||||
vol.Required(CONF_OPTIONS, default={}): {
|
||||
vol.Required(CONF_ZONE): cv.entity_id,
|
||||
vol.Optional(CONF_FOR): cv.positive_time_period,
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
class _ZoneOccupancyTriggerBase(EntityTriggerBase):
|
||||
"""Base for zone occupancy triggers (single zone, no behavior)."""
|
||||
|
||||
_domain_specs = {"zone": DomainSpec()}
|
||||
_schema = _OCCUPANCY_TRIGGER_SCHEMA
|
||||
|
||||
@classmethod
|
||||
async def async_validate_config(
|
||||
cls, hass: HomeAssistant, config: ConfigType
|
||||
) -> ConfigType:
|
||||
"""Validate config and synthesize a target from the zone option."""
|
||||
config = cast(ConfigType, cls._schema(config))
|
||||
config[CONF_TARGET] = {CONF_ENTITY_ID: [config[CONF_OPTIONS][CONF_ZONE]]}
|
||||
return config
|
||||
|
||||
@staticmethod
|
||||
def _occupancy_count(state: State) -> int | None:
|
||||
"""Return the zone's persons-in-zone count; None if unparsable."""
|
||||
try:
|
||||
return int(state.state)
|
||||
except TypeError, ValueError:
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def _is_occupied(cls, state: State) -> bool:
|
||||
"""Return True if the zone has at least one occupant."""
|
||||
count = cls._occupancy_count(state)
|
||||
return count is not None and count >= 1
|
||||
|
||||
|
||||
class OccupancyDetectedTrigger(_ZoneOccupancyTriggerBase):
|
||||
"""Trigger when a zone transitions to an occupied state."""
|
||||
|
||||
def is_valid_state(self, state: State) -> bool:
|
||||
"""Check that the zone is occupied."""
|
||||
return self._is_occupied(state)
|
||||
|
||||
def is_valid_transition(self, from_state: State, to_state: State) -> bool:
|
||||
"""Check that the zone was previously not occupied."""
|
||||
return not self._is_occupied(from_state)
|
||||
|
||||
|
||||
class OccupancyClearedTrigger(_ZoneOccupancyTriggerBase):
|
||||
"""Trigger when a zone transitions from occupied to unoccupied."""
|
||||
|
||||
def is_valid_state(self, state: State) -> bool:
|
||||
"""Check that the zone is empty (count == 0)."""
|
||||
return self._occupancy_count(state) == 0
|
||||
|
||||
def is_valid_transition(self, from_state: State, to_state: State) -> bool:
|
||||
"""Check that the zone was previously occupied."""
|
||||
return self._is_occupied(from_state)
|
||||
|
||||
|
||||
TRIGGERS: dict[str, type[Trigger]] = {
|
||||
"_": LegacyZoneTrigger,
|
||||
"entered": EnteredZoneTrigger,
|
||||
"left": LeftZoneTrigger,
|
||||
"occupancy_detected": OccupancyDetectedTrigger,
|
||||
"occupancy_cleared": OccupancyClearedTrigger,
|
||||
}
|
||||
|
||||
|
||||
async def async_get_triggers(hass: HomeAssistant) -> dict[str, type[Trigger]]:
|
||||
"""Return the triggers for zones."""
|
||||
return TRIGGERS
|
||||
return async_track_state_change_event(hass, entity_id, zone_automation_listener)
|
||||
|
||||
@@ -1,42 +0,0 @@
|
||||
.trigger_zone: &trigger_zone
|
||||
target:
|
||||
entity:
|
||||
domain:
|
||||
- person
|
||||
- device_tracker
|
||||
fields:
|
||||
behavior:
|
||||
required: true
|
||||
default: each
|
||||
selector:
|
||||
automation_behavior:
|
||||
mode: trigger
|
||||
for:
|
||||
required: true
|
||||
default: 00:00:00
|
||||
selector:
|
||||
duration:
|
||||
zone:
|
||||
required: true
|
||||
selector:
|
||||
entity:
|
||||
domain: zone
|
||||
|
||||
entered: *trigger_zone
|
||||
left: *trigger_zone
|
||||
|
||||
.trigger_occupancy: &trigger_occupancy
|
||||
fields:
|
||||
for:
|
||||
required: true
|
||||
default: 00:00:00
|
||||
selector:
|
||||
duration:
|
||||
zone:
|
||||
required: true
|
||||
selector:
|
||||
entity:
|
||||
domain: zone
|
||||
|
||||
occupancy_detected: *trigger_occupancy
|
||||
occupancy_cleared: *trigger_occupancy
|
||||
@@ -14,7 +14,7 @@ if TYPE_CHECKING:
|
||||
|
||||
APPLICATION_NAME: Final = "HomeAssistant"
|
||||
MAJOR_VERSION: Final = 2026
|
||||
MINOR_VERSION: Final = 6
|
||||
MINOR_VERSION: Final = 7
|
||||
PATCH_VERSION: Final = "0.dev0"
|
||||
__short_version__: Final = f"{MAJOR_VERSION}.{MINOR_VERSION}"
|
||||
__version__: Final = f"{__short_version__}.{PATCH_VERSION}"
|
||||
|
||||
@@ -1714,14 +1714,7 @@ def async_extract_entities(trigger_conf: dict) -> list[str]:
|
||||
return [trigger_conf[CONF_OPTIONS][CONF_ENTITY_ID]]
|
||||
|
||||
if trigger_conf[CONF_PLATFORM] == "zone":
|
||||
options = trigger_conf[CONF_OPTIONS]
|
||||
return [*options[CONF_ENTITY_ID], options[CONF_ZONE]]
|
||||
|
||||
if trigger_conf[CONF_PLATFORM] in ("zone.entered", "zone.left"):
|
||||
return [
|
||||
*async_extract_targets(trigger_conf, CONF_ENTITY_ID),
|
||||
trigger_conf[CONF_OPTIONS][CONF_ZONE],
|
||||
]
|
||||
return trigger_conf[CONF_ENTITY_ID] + [trigger_conf[CONF_ZONE]] # type: ignore[no-any-return]
|
||||
|
||||
if trigger_conf[CONF_PLATFORM] == "geo_location":
|
||||
return [trigger_conf[CONF_ZONE]]
|
||||
|
||||
+1
-1
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
|
||||
|
||||
[project]
|
||||
name = "homeassistant"
|
||||
version = "2026.6.0.dev0"
|
||||
version = "2026.7.0.dev0"
|
||||
license = "Apache-2.0"
|
||||
license-files = ["LICENSE*", "homeassistant/backports/LICENSE*"]
|
||||
description = "Open-source home automation platform running on Python 3."
|
||||
|
||||
Generated
+1
-1
@@ -3357,7 +3357,7 @@ whois==0.9.27
|
||||
wiffi==1.1.2
|
||||
|
||||
# homeassistant.components.wiim
|
||||
wiim==0.1.2
|
||||
wiim==0.1.4
|
||||
|
||||
# homeassistant.components.wirelesstag
|
||||
wirelesstagpy==0.8.1
|
||||
|
||||
@@ -2088,6 +2088,36 @@ async def test_receive_backup_path_traversal(
|
||||
assert resp.status == 400
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"name",
|
||||
[
|
||||
"/absolute/path",
|
||||
"../parent",
|
||||
"with/slash",
|
||||
],
|
||||
)
|
||||
async def test_receive_backup_rejects_unsafe_inner_name(
|
||||
hass: HomeAssistant,
|
||||
hass_client: ClientSessionGenerator,
|
||||
name: str,
|
||||
) -> None:
|
||||
"""Test receive backup rejects an inner name that would escape the backup dir."""
|
||||
await setup_backup_integration(hass)
|
||||
client = await hass_client()
|
||||
|
||||
backup = replace(TEST_BACKUP_ABC123, name=name)
|
||||
with patch(
|
||||
"homeassistant.components.backup.manager.read_backup",
|
||||
return_value=backup,
|
||||
):
|
||||
resp = await client.post(
|
||||
"/api/backup/upload?agent_id=backup.local",
|
||||
data={"file": StringIO("test")},
|
||||
)
|
||||
|
||||
assert resp.status == 400
|
||||
|
||||
|
||||
async def test_receive_backup_busy_manager(
|
||||
hass: HomeAssistant,
|
||||
hass_client: ClientSessionGenerator,
|
||||
|
||||
@@ -14,6 +14,7 @@ import pytest
|
||||
import securetar
|
||||
|
||||
from homeassistant.components.backup import DOMAIN, AddonInfo, AgentBackup, Folder
|
||||
from homeassistant.components.backup.models import InvalidBackupFilename
|
||||
from homeassistant.components.backup.util import (
|
||||
DecryptedBackupStreamer,
|
||||
EncryptedBackupStreamer,
|
||||
@@ -158,6 +159,37 @@ def test_read_backup(backup_json_content: bytes, expected_backup: AgentBackup) -
|
||||
assert backup == expected_backup
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"name",
|
||||
[
|
||||
"/absolute/path",
|
||||
"../parent",
|
||||
"with/slash",
|
||||
"with\\backslash",
|
||||
"C:\\drive\\path",
|
||||
"",
|
||||
".",
|
||||
"..",
|
||||
],
|
||||
)
|
||||
def test_read_backup_rejects_unsafe_name(name: str) -> None:
|
||||
"""Test that read_backup rejects names that could escape the backup directory."""
|
||||
backup_json_content = (
|
||||
b'{"compressed":true,"date":"2024-12-02T07:23:58.261875-05:00","homeassistant":'
|
||||
b'{"exclude_database":true,"version":"2024.12.0.dev0"},"name":"'
|
||||
+ name.encode().replace(b"\\", b"\\\\")
|
||||
+ b'","protected":true,"slug":"455645fe","type":"partial","version":2}'
|
||||
)
|
||||
mock_path = Mock()
|
||||
mock_path.stat.return_value.st_size = 1234
|
||||
|
||||
with patch("homeassistant.components.backup.util.tarfile.open") as mock_open_tar:
|
||||
tar_ctx = mock_open_tar.return_value.__enter__.return_value
|
||||
tar_ctx.extractfile.return_value.read.return_value = backup_json_content
|
||||
with pytest.raises(InvalidBackupFilename):
|
||||
read_backup(mock_path)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("backup", "password", "validation_result", "expected_messages"),
|
||||
[
|
||||
|
||||
+10
-29
@@ -1586,12 +1586,12 @@ async def _validate_trigger_options(
|
||||
options: dict[str, Any] | None,
|
||||
*,
|
||||
valid: bool,
|
||||
supports_target: bool = True,
|
||||
) -> None:
|
||||
"""Assert that a trigger accepts or rejects the given options during validation."""
|
||||
trigger_config: dict[str, Any] = {CONF_PLATFORM: trigger}
|
||||
if supports_target:
|
||||
trigger_config[CONF_TARGET] = {ATTR_LABEL_ID: "test_label"}
|
||||
trigger_config: dict[str, Any] = {
|
||||
CONF_PLATFORM: trigger,
|
||||
CONF_TARGET: {ATTR_LABEL_ID: "test_label"},
|
||||
}
|
||||
if options is not None:
|
||||
trigger_config[CONF_OPTIONS] = options
|
||||
if valid:
|
||||
@@ -1608,7 +1608,6 @@ async def assert_trigger_options_supported(
|
||||
*,
|
||||
supports_behavior: bool,
|
||||
supports_duration: bool,
|
||||
supports_target: bool = True,
|
||||
) -> None:
|
||||
"""Assert which options a trigger supports.
|
||||
|
||||
@@ -1625,15 +1624,9 @@ async def assert_trigger_options_supported(
|
||||
|
||||
# Minimal config should always be valid
|
||||
supports_empty = not bool(base_options)
|
||||
await _validate_trigger_options(
|
||||
hass, trigger, None, valid=supports_empty, supports_target=supports_target
|
||||
)
|
||||
await _validate_trigger_options(
|
||||
hass, trigger, {}, valid=supports_empty, supports_target=supports_target
|
||||
)
|
||||
await _validate_trigger_options(
|
||||
hass, trigger, base_options, valid=True, supports_target=supports_target
|
||||
)
|
||||
await _validate_trigger_options(hass, trigger, None, valid=supports_empty)
|
||||
await _validate_trigger_options(hass, trigger, {}, valid=supports_empty)
|
||||
await _validate_trigger_options(hass, trigger, base_options, valid=True)
|
||||
|
||||
def _merge(extra: dict[str, Any]) -> dict[str, Any]:
|
||||
return {**(base_options or {}), **extra}
|
||||
@@ -1641,30 +1634,18 @@ async def assert_trigger_options_supported(
|
||||
# Behavior
|
||||
for behavior in ("each", "first", "all"):
|
||||
await _validate_trigger_options(
|
||||
hass,
|
||||
trigger,
|
||||
_merge({"behavior": behavior}),
|
||||
valid=supports_behavior,
|
||||
supports_target=supports_target,
|
||||
hass, trigger, _merge({"behavior": behavior}), valid=supports_behavior
|
||||
)
|
||||
|
||||
# Duration
|
||||
for for_value in ({"seconds": 5}, "00:00:05", 5):
|
||||
await _validate_trigger_options(
|
||||
hass,
|
||||
trigger,
|
||||
_merge({"for": for_value}),
|
||||
valid=supports_duration,
|
||||
supports_target=supports_target,
|
||||
hass, trigger, _merge({"for": for_value}), valid=supports_duration
|
||||
)
|
||||
|
||||
# Unknown option should always be rejected
|
||||
await _validate_trigger_options(
|
||||
hass,
|
||||
trigger,
|
||||
_merge({"unknown_option": True}),
|
||||
valid=False,
|
||||
supports_target=supports_target,
|
||||
hass, trigger, _merge({"unknown_option": True}), valid=False
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -1379,6 +1379,31 @@ def test_base_tracker_entity() -> None:
|
||||
entity.state_attributes # noqa: B018
|
||||
|
||||
|
||||
def test_battery_level_override_deprecation_warning(
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
"""Test that overriding battery_level in a subclass logs a warning."""
|
||||
error_message = "is overriding the deprecated battery_level property"
|
||||
|
||||
caplog.clear()
|
||||
|
||||
class _SubclassWithOverride(TrackerEntity):
|
||||
@property
|
||||
def battery_level(self) -> int | None:
|
||||
return 50
|
||||
|
||||
assert error_message in caplog.text
|
||||
assert _SubclassWithOverride.__name__ in caplog.text
|
||||
|
||||
# No warning for a subclass that does not override battery_level
|
||||
caplog.clear()
|
||||
|
||||
class _SubclassWithoutOverride(TrackerEntity):
|
||||
pass
|
||||
|
||||
assert error_message not in caplog.text
|
||||
|
||||
|
||||
async def test_attr_location_name_deprecation_warning(
|
||||
hass: HomeAssistant,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
|
||||
@@ -78,6 +78,7 @@ FIXTURES = [
|
||||
"mock_pressure_sensor",
|
||||
"mock_pump",
|
||||
"mock_room_airconditioner",
|
||||
"mock_soil_sensor",
|
||||
"mock_solar_inverter",
|
||||
"mock_speaker",
|
||||
"mock_switch_unit",
|
||||
|
||||
@@ -0,0 +1,87 @@
|
||||
{
|
||||
"node_id": 101,
|
||||
"date_commissioned": "2024-11-27T00:00:00.000000",
|
||||
"last_interview": "2024-11-27T00:00:00.000000",
|
||||
"interview_version": 2,
|
||||
"attributes": {
|
||||
"0/29/0": [
|
||||
{
|
||||
"0": 22,
|
||||
"1": 1
|
||||
}
|
||||
],
|
||||
"0/29/1": [
|
||||
4, 29, 31, 40, 42, 43, 44, 48, 49, 50, 51, 52, 53, 54, 55, 59, 60, 62, 63,
|
||||
64, 65
|
||||
],
|
||||
"0/29/2": [41],
|
||||
"0/29/3": [1],
|
||||
"0/29/65532": 0,
|
||||
"0/29/65533": 1,
|
||||
"0/29/65528": [],
|
||||
"0/29/65529": [],
|
||||
"0/29/65531": [0, 1, 2, 3, 65528, 65529, 65531, 65532, 65533],
|
||||
"0/40/0": 1,
|
||||
"0/40/1": "Nabu Casa",
|
||||
"0/40/2": 65521,
|
||||
"0/40/3": "Mock SoilSensor",
|
||||
"0/40/4": 32768,
|
||||
"0/40/5": "Mock Soil Sensor",
|
||||
"0/40/6": "XX",
|
||||
"0/40/7": 0,
|
||||
"0/40/8": "v1.0",
|
||||
"0/40/9": 1,
|
||||
"0/40/10": "v1.0",
|
||||
"0/40/11": "20241127",
|
||||
"0/40/12": "",
|
||||
"0/40/13": "",
|
||||
"0/40/14": "",
|
||||
"0/40/15": "TEST_SN",
|
||||
"0/40/16": false,
|
||||
"0/40/17": true,
|
||||
"0/40/18": "mock-soil-sensor",
|
||||
"0/40/19": {
|
||||
"0": 3,
|
||||
"1": 3
|
||||
},
|
||||
"0/40/65532": 0,
|
||||
"0/40/65533": 1,
|
||||
"0/40/65528": [],
|
||||
"0/40/65529": [],
|
||||
"0/40/65531": [
|
||||
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
|
||||
65528, 65529, 65531, 65532, 65533
|
||||
],
|
||||
"1/3/65529": [0, 64],
|
||||
"1/3/65531": [0, 1, 65528, 65529, 65531, 65532, 65533],
|
||||
"1/29/0": [
|
||||
{
|
||||
"0": 69,
|
||||
"1": 1
|
||||
}
|
||||
],
|
||||
"1/29/1": [3, 29, 57, 1072, 40],
|
||||
"1/29/2": [],
|
||||
"1/29/3": [9, 10],
|
||||
"1/29/65532": null,
|
||||
"1/29/65533": 1,
|
||||
"1/29/65528": [],
|
||||
"1/29/65529": [],
|
||||
"1/29/65531": [0, 1, 2, 3, 65528, 65529, 65531, 65532, 65533],
|
||||
"1/1072/0": {
|
||||
"0": 17,
|
||||
"1": true,
|
||||
"2": 0,
|
||||
"3": 100,
|
||||
"4": []
|
||||
},
|
||||
"1/1072/1": 50,
|
||||
"1/1072/65532": 0,
|
||||
"1/1072/65533": 1,
|
||||
"1/1072/65528": [],
|
||||
"1/1072/65529": [],
|
||||
"1/1072/65531": [0, 1, 65528, 65529, 65531, 65532, 65533]
|
||||
},
|
||||
"available": true,
|
||||
"attribute_subscriptions": []
|
||||
}
|
||||
@@ -18619,6 +18619,61 @@
|
||||
'state': '0.0',
|
||||
})
|
||||
# ---
|
||||
# name: test_sensors[mock_soil_sensor][sensor.mock_soil_sensor_moisture-entry]
|
||||
EntityRegistryEntrySnapshot({
|
||||
'aliases': list([
|
||||
None,
|
||||
]),
|
||||
'area_id': None,
|
||||
'capabilities': dict({
|
||||
'state_class': <SensorStateClass.MEASUREMENT: 'measurement'>,
|
||||
}),
|
||||
'config_entry_id': <ANY>,
|
||||
'config_subentry_id': <ANY>,
|
||||
'device_class': None,
|
||||
'device_id': <ANY>,
|
||||
'disabled_by': None,
|
||||
'domain': 'sensor',
|
||||
'entity_category': None,
|
||||
'entity_id': 'sensor.mock_soil_sensor_moisture',
|
||||
'has_entity_name': True,
|
||||
'hidden_by': None,
|
||||
'icon': None,
|
||||
'id': <ANY>,
|
||||
'labels': set({
|
||||
}),
|
||||
'name': None,
|
||||
'object_id_base': 'Moisture',
|
||||
'options': dict({
|
||||
}),
|
||||
'original_device_class': <SensorDeviceClass.MOISTURE: 'moisture'>,
|
||||
'original_icon': None,
|
||||
'original_name': 'Moisture',
|
||||
'platform': 'matter',
|
||||
'previous_unique_id': None,
|
||||
'suggested_object_id': None,
|
||||
'supported_features': 0,
|
||||
'translation_key': None,
|
||||
'unique_id': '00000000000004D2-0000000000000065-MatterNodeDevice-1-SoilMoistureSensor-1072-1',
|
||||
'unit_of_measurement': '%',
|
||||
})
|
||||
# ---
|
||||
# name: test_sensors[mock_soil_sensor][sensor.mock_soil_sensor_moisture-state]
|
||||
StateSnapshot({
|
||||
'attributes': ReadOnlyDict({
|
||||
'device_class': 'moisture',
|
||||
'friendly_name': 'Mock Soil Sensor Moisture',
|
||||
'state_class': <SensorStateClass.MEASUREMENT: 'measurement'>,
|
||||
'unit_of_measurement': '%',
|
||||
}),
|
||||
'context': <ANY>,
|
||||
'entity_id': 'sensor.mock_soil_sensor_moisture',
|
||||
'last_changed': <ANY>,
|
||||
'last_reported': <ANY>,
|
||||
'last_updated': <ANY>,
|
||||
'state': '50',
|
||||
})
|
||||
# ---
|
||||
# name: test_sensors[mock_solar_inverter][sensor.mock_solar_inverter_active_current-entry]
|
||||
EntityRegistryEntrySnapshot({
|
||||
'aliases': list([
|
||||
|
||||
@@ -85,6 +85,25 @@ async def test_humidity_sensor(
|
||||
assert state.state == "40.0"
|
||||
|
||||
|
||||
@pytest.mark.parametrize("node_fixture", ["mock_soil_sensor"])
|
||||
async def test_soil_moisture_sensor(
|
||||
hass: HomeAssistant,
|
||||
matter_client: MagicMock,
|
||||
matter_node: MatterNode,
|
||||
) -> None:
|
||||
"""Test soil moisture sensor."""
|
||||
state = hass.states.get("sensor.mock_soil_sensor_moisture")
|
||||
assert state
|
||||
assert state.state == "50"
|
||||
|
||||
set_node_attribute(matter_node, 1, 1072, 1, 75)
|
||||
await trigger_subscription_callback(hass, matter_client)
|
||||
|
||||
state = hass.states.get("sensor.mock_soil_sensor_moisture")
|
||||
assert state
|
||||
assert state.state == "75"
|
||||
|
||||
|
||||
@pytest.mark.parametrize("node_fixture", ["mock_light_sensor"])
|
||||
async def test_light_sensor(
|
||||
hass: HomeAssistant,
|
||||
|
||||
@@ -38,5 +38,6 @@ async def fire_transport_update(
|
||||
"""Trigger the registered AVTransport callback on the mock device."""
|
||||
assert mock_device.av_transport_event_callback is not None
|
||||
mock_device.event_data = {"TransportState": transport_state.value}
|
||||
mock_device.playing_status = transport_state
|
||||
mock_device.av_transport_event_callback(MagicMock(), [])
|
||||
await hass.async_block_till_done()
|
||||
|
||||
@@ -1,36 +1,14 @@
|
||||
"""The tests for the location automation."""
|
||||
|
||||
from datetime import timedelta
|
||||
from typing import Any
|
||||
|
||||
from freezegun.api import FrozenDateTimeFactory
|
||||
import pytest
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.components import automation, zone
|
||||
from homeassistant.const import (
|
||||
ATTR_ENTITY_ID,
|
||||
ENTITY_MATCH_ALL,
|
||||
SERVICE_TURN_OFF,
|
||||
STATE_UNAVAILABLE,
|
||||
STATE_UNKNOWN,
|
||||
)
|
||||
from homeassistant.const import ATTR_ENTITY_ID, ENTITY_MATCH_ALL, SERVICE_TURN_OFF
|
||||
from homeassistant.core import Context, HomeAssistant, ServiceCall
|
||||
from homeassistant.helpers import entity_registry as er
|
||||
from homeassistant.helpers.trigger import async_validate_trigger_config
|
||||
from homeassistant.setup import async_setup_component
|
||||
|
||||
from tests.common import async_fire_time_changed, mock_component
|
||||
from tests.components.common import (
|
||||
TriggerStateDescription,
|
||||
assert_trigger_behavior_all,
|
||||
assert_trigger_behavior_each,
|
||||
assert_trigger_behavior_first,
|
||||
assert_trigger_options_supported,
|
||||
parametrize_target_entities,
|
||||
parametrize_trigger_states,
|
||||
target_entities,
|
||||
)
|
||||
from tests.common import mock_component
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
@@ -365,7 +343,10 @@ async def test_unknown_zone(
|
||||
},
|
||||
)
|
||||
|
||||
assert "Non-existing zone 'zone.no_such_zone' in a zone trigger" not in caplog.text
|
||||
assert (
|
||||
"Automation 'My Automation' is referencing non-existing zone"
|
||||
" 'zone.no_such_zone' in a zone trigger" not in caplog.text
|
||||
)
|
||||
|
||||
hass.states.async_set(
|
||||
"test.entity",
|
||||
@@ -375,390 +356,7 @@ async def test_unknown_zone(
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert "Non-existing zone 'zone.no_such_zone' in a zone trigger" in caplog.text
|
||||
|
||||
|
||||
# --- New-style zone trigger tests ---
|
||||
|
||||
ZONE_HOME = "zone.home"
|
||||
ZONE_WORK = "zone.work"
|
||||
IN_ZONES_HOME = {"in_zones": [ZONE_HOME]}
|
||||
IN_ZONES_WORK = {"in_zones": [ZONE_WORK]}
|
||||
IN_ZONES_NONE: dict[str, list[str]] = {"in_zones": []}
|
||||
TRIGGER_ZONE = ZONE_HOME
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("trigger_key", "base_options", "supports_behavior", "supports_duration"),
|
||||
[
|
||||
("zone.entered", {"zone": TRIGGER_ZONE}, True, True),
|
||||
("zone.left", {"zone": TRIGGER_ZONE}, True, True),
|
||||
],
|
||||
)
|
||||
async def test_zone_trigger_options_validation(
|
||||
hass: HomeAssistant,
|
||||
trigger_key: str,
|
||||
base_options: dict[str, Any] | None,
|
||||
supports_behavior: bool,
|
||||
supports_duration: bool,
|
||||
) -> None:
|
||||
"""Test that zone triggers support the expected options."""
|
||||
await assert_trigger_options_supported(
|
||||
hass,
|
||||
trigger_key,
|
||||
base_options,
|
||||
supports_behavior=supports_behavior,
|
||||
supports_duration=supports_duration,
|
||||
assert (
|
||||
"Automation 'My Automation' is referencing non-existing zone"
|
||||
" 'zone.no_such_zone' in a zone trigger" in caplog.text
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("trigger_key", ["zone.entered", "zone.left"])
|
||||
async def test_zone_trigger_rejects_non_zone_entity_id(
|
||||
hass: HomeAssistant, trigger_key: str
|
||||
) -> None:
|
||||
"""Test that the zone option must reference entities in the zone domain."""
|
||||
with pytest.raises(vol.Invalid):
|
||||
await async_validate_trigger_config(
|
||||
hass,
|
||||
[
|
||||
{
|
||||
"platform": trigger_key,
|
||||
"target": {"entity_id": "person.alice"},
|
||||
"options": {"zone": "person.alice"},
|
||||
}
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def target_zone_entities(
|
||||
hass: HomeAssistant, domain: str
|
||||
) -> dict[str, list[str]]:
|
||||
"""Create multiple zone-trackable entities associated with different targets."""
|
||||
return await target_entities(hass, domain, domain_excluded="sensor")
|
||||
|
||||
|
||||
_ZONE_TRIGGER_STATES = [
|
||||
*parametrize_trigger_states(
|
||||
trigger="zone.entered",
|
||||
trigger_options={"zone": TRIGGER_ZONE},
|
||||
target_states=[
|
||||
("home", IN_ZONES_HOME),
|
||||
],
|
||||
other_states=[
|
||||
("not_home", IN_ZONES_NONE),
|
||||
("Work", IN_ZONES_WORK),
|
||||
],
|
||||
),
|
||||
*parametrize_trigger_states(
|
||||
trigger="zone.left",
|
||||
trigger_options={"zone": TRIGGER_ZONE},
|
||||
target_states=[
|
||||
("not_home", IN_ZONES_NONE),
|
||||
("Work", IN_ZONES_WORK),
|
||||
],
|
||||
other_states=[
|
||||
("home", IN_ZONES_HOME),
|
||||
],
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
def _parametrize_zone_target_entities() -> list[tuple[dict[str, Any], str, int, str]]:
|
||||
"""Parametrize target entities for all supported zone trigger domains."""
|
||||
return [
|
||||
(*params, domain)
|
||||
for domain in ("person", "device_tracker")
|
||||
for params in parametrize_target_entities(domain)
|
||||
]
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("trigger_target_config", "entity_id", "entities_in_target", "domain"),
|
||||
_parametrize_zone_target_entities(),
|
||||
)
|
||||
@pytest.mark.parametrize(
|
||||
("trigger", "trigger_options", "states"),
|
||||
_ZONE_TRIGGER_STATES,
|
||||
)
|
||||
async def test_zone_trigger_behavior_each(
|
||||
hass: HomeAssistant,
|
||||
target_zone_entities: dict[str, list[str]],
|
||||
trigger_target_config: dict[str, Any],
|
||||
entity_id: str,
|
||||
entities_in_target: int,
|
||||
trigger: str,
|
||||
trigger_options: dict[str, Any],
|
||||
states: list[TriggerStateDescription],
|
||||
) -> None:
|
||||
"""Test zone triggers fire when any targeted entity changes."""
|
||||
await assert_trigger_behavior_each(
|
||||
hass,
|
||||
target_entities=target_zone_entities,
|
||||
trigger_target_config=trigger_target_config,
|
||||
entity_id=entity_id,
|
||||
entities_in_target=entities_in_target,
|
||||
trigger=trigger,
|
||||
trigger_options=trigger_options,
|
||||
states=states,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("trigger_target_config", "entity_id", "entities_in_target", "domain"),
|
||||
_parametrize_zone_target_entities(),
|
||||
)
|
||||
@pytest.mark.parametrize(
|
||||
("trigger", "trigger_options", "states"),
|
||||
_ZONE_TRIGGER_STATES,
|
||||
)
|
||||
async def test_zone_trigger_behavior_first(
|
||||
hass: HomeAssistant,
|
||||
target_zone_entities: dict[str, list[str]],
|
||||
trigger_target_config: dict[str, Any],
|
||||
entity_id: str,
|
||||
entities_in_target: int,
|
||||
trigger: str,
|
||||
trigger_options: dict[str, Any],
|
||||
states: list[TriggerStateDescription],
|
||||
) -> None:
|
||||
"""Test zone triggers fire when first targeted entity changes."""
|
||||
await assert_trigger_behavior_first(
|
||||
hass,
|
||||
target_entities=target_zone_entities,
|
||||
trigger_target_config=trigger_target_config,
|
||||
entity_id=entity_id,
|
||||
entities_in_target=entities_in_target,
|
||||
trigger=trigger,
|
||||
trigger_options=trigger_options,
|
||||
states=states,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("trigger_target_config", "entity_id", "entities_in_target", "domain"),
|
||||
_parametrize_zone_target_entities(),
|
||||
)
|
||||
@pytest.mark.parametrize(
|
||||
("trigger", "trigger_options", "states"),
|
||||
_ZONE_TRIGGER_STATES,
|
||||
)
|
||||
async def test_zone_trigger_behavior_all(
|
||||
hass: HomeAssistant,
|
||||
target_zone_entities: dict[str, list[str]],
|
||||
trigger_target_config: dict[str, Any],
|
||||
entity_id: str,
|
||||
entities_in_target: int,
|
||||
trigger: str,
|
||||
trigger_options: dict[str, Any],
|
||||
states: list[TriggerStateDescription],
|
||||
) -> None:
|
||||
"""Test zone triggers fire when last targeted entity changes."""
|
||||
await assert_trigger_behavior_all(
|
||||
hass,
|
||||
target_entities=target_zone_entities,
|
||||
trigger_target_config=trigger_target_config,
|
||||
entity_id=entity_id,
|
||||
entities_in_target=entities_in_target,
|
||||
trigger=trigger,
|
||||
trigger_options=trigger_options,
|
||||
states=states,
|
||||
)
|
||||
|
||||
|
||||
# --- Zone occupancy trigger tests ---
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("enable_labs_preview_features")
|
||||
@pytest.mark.parametrize(
|
||||
("trigger_key"),
|
||||
["zone.occupancy_detected", "zone.occupancy_cleared"],
|
||||
)
|
||||
async def test_zone_occupancy_trigger_options_validation(
|
||||
hass: HomeAssistant,
|
||||
trigger_key: str,
|
||||
) -> None:
|
||||
"""Test that occupancy triggers support the expected options."""
|
||||
await assert_trigger_options_supported(
|
||||
hass,
|
||||
trigger_key,
|
||||
{"zone": ZONE_HOME},
|
||||
supports_behavior=False,
|
||||
supports_duration=True,
|
||||
supports_target=False,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("enable_labs_preview_features")
|
||||
@pytest.mark.parametrize(
|
||||
("trigger_key", "from_state", "to_state", "should_fire"),
|
||||
[
|
||||
# occupancy_detected
|
||||
pytest.param("zone.occupancy_detected", "0", "1", True, id="detected_0_to_1"),
|
||||
pytest.param("zone.occupancy_detected", "0", "3", True, id="detected_0_to_3"),
|
||||
pytest.param("zone.occupancy_detected", "1", "2", False, id="detected_1_to_2"),
|
||||
pytest.param("zone.occupancy_detected", "2", "0", False, id="detected_2_to_0"),
|
||||
pytest.param(
|
||||
"zone.occupancy_detected",
|
||||
STATE_UNKNOWN,
|
||||
"1",
|
||||
False,
|
||||
id="detected_unknown_to_1",
|
||||
),
|
||||
pytest.param(
|
||||
"zone.occupancy_detected",
|
||||
STATE_UNAVAILABLE,
|
||||
"1",
|
||||
False,
|
||||
id="detected_unavailable_to_1",
|
||||
),
|
||||
pytest.param(
|
||||
"zone.occupancy_detected",
|
||||
"0",
|
||||
STATE_UNAVAILABLE,
|
||||
False,
|
||||
id="detected_0_to_unavailable",
|
||||
),
|
||||
# occupancy_cleared
|
||||
pytest.param("zone.occupancy_cleared", "1", "0", True, id="cleared_1_to_0"),
|
||||
pytest.param("zone.occupancy_cleared", "3", "0", True, id="cleared_3_to_0"),
|
||||
pytest.param("zone.occupancy_cleared", "2", "1", False, id="cleared_2_to_1"),
|
||||
pytest.param("zone.occupancy_cleared", "0", "1", False, id="cleared_0_to_1"),
|
||||
pytest.param(
|
||||
"zone.occupancy_cleared",
|
||||
"1",
|
||||
STATE_UNAVAILABLE,
|
||||
False,
|
||||
id="cleared_1_to_unavailable",
|
||||
),
|
||||
pytest.param(
|
||||
"zone.occupancy_cleared",
|
||||
"1",
|
||||
STATE_UNKNOWN,
|
||||
False,
|
||||
id="cleared_1_to_unknown",
|
||||
),
|
||||
],
|
||||
)
|
||||
async def test_zone_occupancy_trigger_transitions(
|
||||
hass: HomeAssistant,
|
||||
service_calls: list[ServiceCall],
|
||||
trigger_key: str,
|
||||
from_state: str,
|
||||
to_state: str,
|
||||
should_fire: bool,
|
||||
) -> None:
|
||||
"""Test occupancy triggers fire on the expected numeric-state transitions."""
|
||||
hass.states.async_set(ZONE_HOME, from_state)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert await async_setup_component(
|
||||
hass,
|
||||
automation.DOMAIN,
|
||||
{
|
||||
automation.DOMAIN: {
|
||||
"trigger": {
|
||||
"trigger": trigger_key,
|
||||
"options": {"zone": ZONE_HOME},
|
||||
},
|
||||
"action": {"service": "test.automation"},
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
hass.states.async_set(ZONE_HOME, to_state)
|
||||
await hass.async_block_till_done()
|
||||
assert (len(service_calls) == 1) is should_fire
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("enable_labs_preview_features")
|
||||
@pytest.mark.parametrize(
|
||||
("trigger_key", "from_value", "to_value", "revert_value"),
|
||||
[
|
||||
("zone.occupancy_detected", "0", "1", "0"),
|
||||
("zone.occupancy_cleared", "1", "0", "1"),
|
||||
],
|
||||
)
|
||||
async def test_zone_occupancy_trigger_for_duration(
|
||||
hass: HomeAssistant,
|
||||
freezer: FrozenDateTimeFactory,
|
||||
service_calls: list[ServiceCall],
|
||||
trigger_key: str,
|
||||
from_value: str,
|
||||
to_value: str,
|
||||
revert_value: str,
|
||||
) -> None:
|
||||
"""Test that `for` delays the firing and an early revert cancels it."""
|
||||
hass.states.async_set(ZONE_HOME, from_value)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert await async_setup_component(
|
||||
hass,
|
||||
automation.DOMAIN,
|
||||
{
|
||||
automation.DOMAIN: {
|
||||
"trigger": {
|
||||
"trigger": trigger_key,
|
||||
"options": {"zone": ZONE_HOME, "for": {"seconds": 5}},
|
||||
},
|
||||
"action": {"service": "test.automation"},
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
# Transition, then revert before the duration elapses -> no fire.
|
||||
hass.states.async_set(ZONE_HOME, to_value)
|
||||
await hass.async_block_till_done()
|
||||
hass.states.async_set(ZONE_HOME, revert_value)
|
||||
await hass.async_block_till_done()
|
||||
freezer.tick(timedelta(seconds=10))
|
||||
async_fire_time_changed(hass)
|
||||
await hass.async_block_till_done()
|
||||
assert len(service_calls) == 0
|
||||
|
||||
# Transition and hold past the duration -> fire once.
|
||||
hass.states.async_set(ZONE_HOME, to_value)
|
||||
await hass.async_block_till_done()
|
||||
freezer.tick(timedelta(seconds=10))
|
||||
async_fire_time_changed(hass)
|
||||
await hass.async_block_till_done()
|
||||
assert len(service_calls) == 1
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("enable_labs_preview_features")
|
||||
async def test_zone_occupancy_trigger_payload(
|
||||
hass: HomeAssistant, service_calls: list[ServiceCall]
|
||||
) -> None:
|
||||
"""Test the payload exposed to the action template."""
|
||||
hass.states.async_set(ZONE_HOME, "0")
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert await async_setup_component(
|
||||
hass,
|
||||
automation.DOMAIN,
|
||||
{
|
||||
automation.DOMAIN: {
|
||||
"trigger": {
|
||||
"trigger": "zone.occupancy_detected",
|
||||
"options": {"zone": ZONE_HOME},
|
||||
},
|
||||
"action": {
|
||||
"service": "test.automation",
|
||||
"data_template": {
|
||||
"some": (
|
||||
"{{ trigger.entity_id }}"
|
||||
" - {{ trigger.from_state.state }}"
|
||||
" - {{ trigger.to_state.state }}"
|
||||
" - {{ trigger.for }}"
|
||||
)
|
||||
},
|
||||
},
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
hass.states.async_set(ZONE_HOME, "2")
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert len(service_calls) == 1
|
||||
assert service_calls[0].data["some"] == f"{ZONE_HOME} - 0 - 2 - None"
|
||||
|
||||
+19
-137
@@ -14,7 +14,6 @@ from pytest_unordered import unordered
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.components import automation
|
||||
from homeassistant.components.device_automation import DEVICE_TRIGGER_BASE_SCHEMA
|
||||
from homeassistant.components.sun import DOMAIN as SUN_DOMAIN
|
||||
from homeassistant.components.system_health import DOMAIN as SYSTEM_HEALTH_DOMAIN
|
||||
from homeassistant.components.tag import DOMAIN as TAG_DOMAIN
|
||||
@@ -42,11 +41,7 @@ from homeassistant.core import (
|
||||
callback,
|
||||
)
|
||||
from homeassistant.exceptions import HomeAssistantError
|
||||
from homeassistant.helpers import (
|
||||
config_validation as cv,
|
||||
device_registry as dr,
|
||||
trigger,
|
||||
)
|
||||
from homeassistant.helpers import config_validation as cv, trigger
|
||||
from homeassistant.helpers.automation import (
|
||||
DomainSpec,
|
||||
move_top_level_schema_fields_to_options,
|
||||
@@ -83,7 +78,6 @@ from homeassistant.util.unit_conversion import TemperatureConverter
|
||||
from homeassistant.util.yaml.loader import parse_yaml
|
||||
|
||||
from tests.common import (
|
||||
MockConfigEntry,
|
||||
MockModule,
|
||||
MockPlatform,
|
||||
async_fire_time_changed,
|
||||
@@ -4624,34 +4618,6 @@ async def test_entity_trigger_duration_cancelled_on_invalid_state(
|
||||
unsub()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_test_modern_trigger(hass: HomeAssistant) -> None:
|
||||
"""Register a mock 'test' integration and trigger platform exposing 'test.modern'."""
|
||||
|
||||
class MockModernTrigger(Trigger):
|
||||
"""Mock modern trigger that accepts any options/target."""
|
||||
|
||||
@classmethod
|
||||
async def async_validate_config(
|
||||
cls, hass: HomeAssistant, config: ConfigType
|
||||
) -> ConfigType:
|
||||
return config
|
||||
|
||||
async def async_attach_runner(
|
||||
self, run_action: TriggerActionRunner
|
||||
) -> CALLBACK_TYPE:
|
||||
return lambda: None
|
||||
|
||||
async def async_get_triggers(
|
||||
hass: HomeAssistant,
|
||||
) -> dict[str, type[Trigger]]:
|
||||
return {"modern": MockModernTrigger}
|
||||
|
||||
mock_integration(hass, MockModule("test"))
|
||||
mock_platform(hass, "test.trigger", Mock(async_get_triggers=async_get_triggers))
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("mock_test_modern_trigger")
|
||||
@pytest.mark.parametrize(
|
||||
("trigger_conf", "expected"),
|
||||
[
|
||||
@@ -4661,7 +4627,7 @@ def mock_test_modern_trigger(hass: HomeAssistant) -> None:
|
||||
id="state",
|
||||
),
|
||||
pytest.param(
|
||||
{"platform": "numeric_state", "entity_id": ["sensor.a"], "above": 5},
|
||||
{"platform": "numeric_state", "entity_id": ["sensor.a"]},
|
||||
["sensor.a"],
|
||||
id="numeric_state",
|
||||
),
|
||||
@@ -4673,61 +4639,36 @@ def mock_test_modern_trigger(hass: HomeAssistant) -> None:
|
||||
pytest.param(
|
||||
{
|
||||
"platform": "zone",
|
||||
"options": {
|
||||
"entity_id": ["person.a"],
|
||||
"zone": "zone.home",
|
||||
"event": "enter",
|
||||
},
|
||||
"entity_id": ["person.a"],
|
||||
"zone": "zone.home",
|
||||
"event": "enter",
|
||||
},
|
||||
["person.a", "zone.home"],
|
||||
id="zone-legacy",
|
||||
),
|
||||
pytest.param(
|
||||
{
|
||||
"platform": "zone.entered",
|
||||
"target": {"entity_id": ["person.a", "device_tracker.b"]},
|
||||
"options": {"zone": "zone.home"},
|
||||
},
|
||||
["person.a", "device_tracker.b", "zone.home"],
|
||||
id="zone-entered-modern",
|
||||
),
|
||||
pytest.param(
|
||||
{
|
||||
"platform": "zone.left",
|
||||
"target": {"entity_id": "person.a"},
|
||||
"options": {"zone": "zone.home"},
|
||||
},
|
||||
["person.a", "zone.home"],
|
||||
id="zone-left-modern",
|
||||
),
|
||||
pytest.param(
|
||||
{"platform": "geo_location", "zone": "zone.home", "source": "test"},
|
||||
{"platform": "geo_location", "zone": "zone.home"},
|
||||
["zone.home"],
|
||||
id="geo_location",
|
||||
),
|
||||
pytest.param(
|
||||
{"platform": "sun", "event": "sunrise"},
|
||||
{"platform": "sun"},
|
||||
["sun.sun"],
|
||||
id="sun",
|
||||
),
|
||||
pytest.param(
|
||||
{
|
||||
"platform": "event",
|
||||
"event_type": "test_event",
|
||||
"event_data": {"entity_id": "sensor.x"},
|
||||
},
|
||||
{"platform": "event", "event_data": {"entity_id": "sensor.x"}},
|
||||
["sensor.x"],
|
||||
id="event-with-entity-id",
|
||||
),
|
||||
pytest.param(
|
||||
{"platform": "event", "event_type": "test_event"},
|
||||
{"platform": "event"},
|
||||
[],
|
||||
id="event-without-entity-id",
|
||||
),
|
||||
pytest.param(
|
||||
{
|
||||
"platform": "event",
|
||||
"event_type": "test_event",
|
||||
"event_data": {"entity_id": "not-a-valid-entity-id"},
|
||||
},
|
||||
[],
|
||||
@@ -4736,7 +4677,6 @@ def mock_test_modern_trigger(hass: HomeAssistant) -> None:
|
||||
pytest.param(
|
||||
{
|
||||
"platform": "event",
|
||||
"event_type": "test_event",
|
||||
"event_data": {"entity_id": ["sensor.x", "sensor.y"]},
|
||||
},
|
||||
[],
|
||||
@@ -4767,89 +4707,38 @@ def mock_test_modern_trigger(hass: HomeAssistant) -> None:
|
||||
),
|
||||
],
|
||||
)
|
||||
async def test_async_extract_entities(
|
||||
hass: HomeAssistant,
|
||||
trigger_conf: dict[str, Any],
|
||||
expected: list[str],
|
||||
def test_async_extract_entities(
|
||||
trigger_conf: dict[str, Any], expected: list[str]
|
||||
) -> None:
|
||||
"""Test extracting entities from various trigger config shapes."""
|
||||
[trigger_conf] = await trigger.async_validate_trigger_config(hass, [trigger_conf])
|
||||
assert trigger.async_extract_entities(trigger_conf) == expected
|
||||
|
||||
|
||||
_MOCK_DEVICE_ID = "_mock_device_id_"
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def mock_device_automation(hass: HomeAssistant) -> str:
|
||||
"""Register a mock 'test' integration, device_trigger platform, and device.
|
||||
|
||||
Returns the device id, which tests substitute for _MOCK_DEVICE_ID in their
|
||||
parametrized configs so the device platform branch can validate properly.
|
||||
"""
|
||||
mock_integration(hass, MockModule("test"))
|
||||
mock_platform(
|
||||
hass,
|
||||
"test.device_trigger",
|
||||
Mock(
|
||||
TRIGGER_SCHEMA=DEVICE_TRIGGER_BASE_SCHEMA.extend({}, extra=vol.ALLOW_EXTRA)
|
||||
),
|
||||
)
|
||||
config_entry = MockConfigEntry(domain="test")
|
||||
config_entry.add_to_hass(hass)
|
||||
device = dr.async_get(hass).async_get_or_create(
|
||||
config_entry_id=config_entry.entry_id,
|
||||
identifiers={("test", "test")},
|
||||
)
|
||||
return device.id
|
||||
|
||||
|
||||
def _substitute(obj: Any, placeholder: str, value: str) -> Any:
|
||||
"""Recursively replace `placeholder` with `value` inside lists/dicts."""
|
||||
if isinstance(obj, dict):
|
||||
return {k: _substitute(v, placeholder, value) for k, v in obj.items()}
|
||||
if isinstance(obj, list):
|
||||
return [_substitute(v, placeholder, value) for v in obj]
|
||||
return value if obj == placeholder else obj
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("trigger_conf", "expected"),
|
||||
[
|
||||
pytest.param(
|
||||
{
|
||||
"platform": "device",
|
||||
"device_id": _MOCK_DEVICE_ID,
|
||||
"domain": "test",
|
||||
},
|
||||
[_MOCK_DEVICE_ID],
|
||||
{"platform": "device", "device_id": "abc123"},
|
||||
["abc123"],
|
||||
id="device",
|
||||
),
|
||||
pytest.param(
|
||||
{
|
||||
"platform": "event",
|
||||
"event_type": "test_event",
|
||||
"event_data": {"device_id": "abc123"},
|
||||
},
|
||||
{"platform": "event", "event_data": {"device_id": "abc123"}},
|
||||
["abc123"],
|
||||
id="event-with-device-id",
|
||||
),
|
||||
pytest.param(
|
||||
{"platform": "event", "event_type": "test_event"},
|
||||
{"platform": "event"},
|
||||
[],
|
||||
id="event-without-device-id",
|
||||
),
|
||||
pytest.param(
|
||||
{
|
||||
"platform": "tag",
|
||||
"tag_id": "mytag",
|
||||
"device_id": ["abc123", "def456"],
|
||||
},
|
||||
{"platform": "tag", "device_id": ["abc123", "def456"]},
|
||||
["abc123", "def456"],
|
||||
id="tag-with-device-id",
|
||||
),
|
||||
pytest.param(
|
||||
{"platform": "tag", "tag_id": "mytag"},
|
||||
{"platform": "tag"},
|
||||
[],
|
||||
id="tag-without-device-id",
|
||||
),
|
||||
@@ -4873,15 +4762,8 @@ def _substitute(obj: Any, placeholder: str, value: str) -> Any:
|
||||
),
|
||||
],
|
||||
)
|
||||
async def test_async_extract_devices(
|
||||
hass: HomeAssistant,
|
||||
mock_test_modern_trigger: None,
|
||||
mock_device_automation: str,
|
||||
trigger_conf: dict[str, Any],
|
||||
expected: list[str],
|
||||
def test_async_extract_devices(
|
||||
trigger_conf: dict[str, Any], expected: list[str]
|
||||
) -> None:
|
||||
"""Test extracting devices from various trigger config shapes."""
|
||||
trigger_conf = _substitute(trigger_conf, _MOCK_DEVICE_ID, mock_device_automation)
|
||||
expected = _substitute(expected, _MOCK_DEVICE_ID, mock_device_automation)
|
||||
[trigger_conf] = await trigger.async_validate_trigger_config(hass, [trigger_conf])
|
||||
assert trigger.async_extract_devices(trigger_conf) == expected
|
||||
|
||||
Reference in New Issue
Block a user