Compare commits

..

6 Commits

Author SHA1 Message Date
Linkplay2020 d9a89beb3d Bump wiim to 1.0.4 (#172334)
Co-authored-by: Tao Jiang <tao.jiang@linkplay.com>
2026-05-28 11:38:22 +02:00
Ludovic BOUÉ 41f783f14d Add Matter soil moisture sensor (#172372) 2026-05-28 11:03:58 +02:00
Erik Montnemery 35397b818d Deprecate device tracker battery_level property (#171819)
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
2026-05-28 10:54:08 +02:00
Erik Montnemery d42d02f20a Revert "Add zone triggers entered/left zone" (#172409) 2026-05-28 10:32:28 +02:00
Franck Nijhof 99c445f261 Bump version to 2026.7.0dev0 (#172367) 2026-05-28 10:20:00 +02:00
Stefan Agner 567fe85828 Reject backup uploads with unsafe inner name (#172368)
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-28 10:19:06 +02:00
28 changed files with 451 additions and 969 deletions
+1 -1
View File
@@ -39,7 +39,7 @@ on:
env:
CACHE_VERSION: 3
MYPY_CACHE_VERSION: 1
HA_SHORT_VERSION: "2026.6"
HA_SHORT_VERSION: "2026.7"
ADDITIONAL_PYTHON_VERSIONS: "[]"
# 10.3 is the oldest supported version
# - 10.3.32 is the version currently shipped with Synology (as of 17 Feb 2022)
+16 -3
View File
@@ -11,7 +11,7 @@ from homeassistant.helpers.hassio import is_hassio
from .agent import BackupAgent, LocalBackupAgent, OnProgressCallback
from .const import DOMAIN, LOGGER
from .models import AgentBackup, BackupNotFound
from .models import AgentBackup, BackupNotFound, InvalidBackupFilename
from .util import read_backup, suggested_filename
@@ -54,7 +54,13 @@ class CoreLocalBackupAgent(LocalBackupAgent):
try:
backup = read_backup(backup_path)
backups[backup.backup_id] = (backup, backup_path)
except (OSError, TarError, json.JSONDecodeError, KeyError) as err:
except (
OSError,
TarError,
json.JSONDecodeError,
KeyError,
InvalidBackupFilename,
) as err:
LOGGER.warning("Unable to read backup %s: %s", backup_path, err)
return backups
@@ -122,7 +128,14 @@ class CoreLocalBackupAgent(LocalBackupAgent):
def get_new_backup_path(self, backup: AgentBackup) -> Path:
"""Return the local path to a new backup."""
return self._backup_dir / suggested_filename(backup)
candidate = self._backup_dir / suggested_filename(backup)
# suggested_filename does not strip separators; refuse paths that would
# land outside the backup directory.
if candidate.parent != self._backup_dir:
raise InvalidBackupFilename(
f"Refusing to write outside {self._backup_dir}: {candidate}"
)
return candidate
async def async_delete_backup(self, backup_id: str, **kwargs: Any) -> None:
"""Delete a backup file."""
+7 -1
View File
@@ -1978,7 +1978,13 @@ class CoreBackupReaderWriter(BackupReaderWriter):
try:
backup = await async_add_executor_job(read_backup, temp_file)
except (OSError, tarfile.TarError, json.JSONDecodeError, KeyError) as err:
except (
OSError,
tarfile.TarError,
json.JSONDecodeError,
KeyError,
InvalidBackupFilename,
) as err:
LOGGER.warning("Unable to parse backup %s: %s", temp_file, err)
raise
+10 -3
View File
@@ -6,7 +6,7 @@ import copy
from dataclasses import dataclass, replace
from io import BytesIO
import json
from pathlib import Path, PurePath
from pathlib import Path, PurePath, PureWindowsPath
from queue import SimpleQueue
import tarfile
import threading
@@ -34,7 +34,7 @@ from homeassistant.util.async_iterator import (
from homeassistant.util.json import JsonObjectType, json_loads_object
from .const import BUF_SIZE, LOGGER, SECURETAR_CREATE_VERSION
from .models import AddonInfo, AgentBackup, Folder
from .models import AddonInfo, AgentBackup, Folder, InvalidBackupFilename
class DecryptError(HomeAssistantError):
@@ -109,6 +109,13 @@ def read_backup(backup_path: Path) -> AgentBackup:
extra_metadata = cast(dict[str, bool | str], data.get("extra", {}))
date = extra_metadata.get("supervisor.backup_request_date", data["date"])
name = cast(str, data["name"])
# The name is used to derive the on-disk filename via suggested_filename;
# reject anything that could escape the backup directory.
safe_name = PureWindowsPath(name).name
if safe_name != name or name in ("", ".", ".."):
raise InvalidBackupFilename(f"Invalid backup name: {name!r}")
return AgentBackup(
addons=addons,
backup_id=cast(str, data["slug"]),
@@ -118,7 +125,7 @@ def read_backup(backup_path: Path) -> AgentBackup:
folders=folders,
homeassistant_included=homeassistant_included,
homeassistant_version=homeassistant_version,
name=cast(str, data["name"]),
name=name,
protected=cast(bool, data.get("protected", False)),
size=backup_path.stat().st_size,
)
@@ -12,18 +12,13 @@ from homeassistant.const import (
CONF_DOMAIN,
CONF_ENTITY_ID,
CONF_EVENT,
CONF_OPTIONS,
CONF_PLATFORM,
CONF_TYPE,
CONF_ZONE,
)
from homeassistant.core import CALLBACK_TYPE, HomeAssistant
from homeassistant.helpers import config_validation as cv, entity_registry as er
from homeassistant.helpers.trigger import (
TriggerActionType,
TriggerInfo,
_async_attach_trigger_cls,
)
from homeassistant.helpers.trigger import TriggerActionType, TriggerInfo
from homeassistant.helpers.typing import ConfigType
from .const import DOMAIN
@@ -84,18 +79,16 @@ async def async_attach_trigger(
event = zone.EVENT_ENTER
else:
event = zone.EVENT_LEAVE
zone_config = await zone.LegacyZoneTrigger.async_validate_config(
hass,
{
CONF_OPTIONS: {
CONF_ENTITY_ID: [config[CONF_ENTITY_ID]],
CONF_ZONE: config[CONF_ZONE],
CONF_EVENT: event,
}
},
)
return await _async_attach_trigger_cls(
hass, zone.LegacyZoneTrigger, "device", zone_config, action, trigger_info
zone_config = {
CONF_PLATFORM: ZONE_DOMAIN,
CONF_ENTITY_ID: config[CONF_ENTITY_ID],
CONF_ZONE: config[CONF_ZONE],
CONF_EVENT: event,
}
zone_config = await zone.async_validate_trigger_config(hass, zone_config)
return await zone.async_attach_trigger(
hass, zone_config, action, trigger_info, platform_type="device"
)
@@ -169,11 +169,35 @@ class BaseTrackerEntity(Entity):
_attr_entity_category = EntityCategory.DIAGNOSTIC
_attr_source_type: SourceType
def __init_subclass__(cls, **kwargs: Any) -> None:
"""Post initialisation processing."""
super().__init_subclass__(**kwargs)
if "battery_level" in cls.__dict__:
if cls.__module__.startswith("homeassistant.components."):
# Don't ask users to report issue for built in integrations,
# they already have issues opened on them.
return
report_issue = async_suggest_report_issue(
async_get_hass_or_none(), module=cls.__module__
)
_LOGGER.warning(
(
"%s::%s is overriding the deprecated battery_level property on "
"a subclass of BaseTrackerEntity, this will be unsupported from "
"Home Assistant 2027.7, please %s"
),
cls.__module__,
cls.__name__,
report_issue,
)
@cached_property
def battery_level(self) -> int | None:
"""Return the battery level of the device.
Percentage from 0-100.
The property is deprecated and will be removed in Home Assistant 2027.7.
"""
return None
+13
View File
@@ -439,6 +439,19 @@ DISCOVERY_SCHEMAS = [
),
allow_multi=True, # also used for climate entity
),
MatterDiscoverySchema(
platform=Platform.SENSOR,
entity_description=MatterSensorEntityDescription(
key="SoilMoistureSensor",
native_unit_of_measurement=PERCENTAGE,
device_class=SensorDeviceClass.MOISTURE,
state_class=SensorStateClass.MEASUREMENT,
),
entity_class=MatterSensor,
required_attributes=(
clusters.SoilMeasurement.Attributes.SoilMoistureMeasuredValue,
),
),
MatterDiscoverySchema(
platform=Platform.SENSOR,
entity_description=MatterSensorEntityDescription(
+1 -1
View File
@@ -8,6 +8,6 @@
"iot_class": "local_push",
"loggers": ["wiim.sdk", "async_upnp_client"],
"quality_scale": "bronze",
"requirements": ["wiim==0.1.2"],
"requirements": ["wiim==0.1.4"],
"zeroconf": ["_linkplay._tcp.local."]
}
@@ -349,15 +349,12 @@ class WiimMediaPlayerEntity(WiimBaseEntity, MediaPlayerEntity):
sdk_status_str,
)
else:
self._device.playing_status = sdk_status
if sdk_status == SDKPlayingStatus.STOPPED:
LOGGER.debug(
"Device %s: TransportState is STOPPED."
" Resetting media position and metadata",
self.entity_id,
)
self._device.current_position = 0
self._device.current_track_duration = 0
self._attr_media_position_updated_at = None
self._attr_media_duration = None
self._attr_media_position = None
-14
View File
@@ -3,19 +3,5 @@
"reload": {
"service": "mdi:reload"
}
},
"triggers": {
"entered": {
"trigger": "mdi:map-marker-plus"
},
"left": {
"trigger": "mdi:map-marker-minus"
},
"occupancy_cleared": {
"trigger": "mdi:account-off"
},
"occupancy_detected": {
"trigger": "mdi:account-group"
}
}
}
@@ -1,74 +1,8 @@
{
"common": {
"trigger_behavior_name": "Trigger when",
"trigger_for_name": "For at least",
"trigger_zone_description": "The zone to trigger on.",
"trigger_zone_name": "Zone"
},
"services": {
"reload": {
"description": "Reloads zones from the YAML-configuration.",
"name": "Reload zones"
}
},
"triggers": {
"entered": {
"description": "Triggers when one or more persons or device trackers enter a zone.",
"fields": {
"behavior": {
"name": "[%key:component::zone::common::trigger_behavior_name%]"
},
"for": {
"name": "[%key:component::zone::common::trigger_for_name%]"
},
"zone": {
"description": "[%key:component::zone::common::trigger_zone_description%]",
"name": "[%key:component::zone::common::trigger_zone_name%]"
}
},
"name": "Entered zone"
},
"left": {
"description": "Triggers when one or more persons or device trackers leave a zone.",
"fields": {
"behavior": {
"name": "[%key:component::zone::common::trigger_behavior_name%]"
},
"for": {
"name": "[%key:component::zone::common::trigger_for_name%]"
},
"zone": {
"description": "[%key:component::zone::common::trigger_zone_description%]",
"name": "[%key:component::zone::common::trigger_zone_name%]"
}
},
"name": "Left zone"
},
"occupancy_cleared": {
"description": "Triggers when a zone transitions from occupied to unoccupied.",
"fields": {
"for": {
"name": "[%key:component::zone::common::trigger_for_name%]"
},
"zone": {
"description": "[%key:component::zone::triggers::occupancy_detected::fields::zone::description%]",
"name": "[%key:component::zone::triggers::occupancy_detected::fields::zone::name%]"
}
},
"name": "Zone occupancy cleared"
},
"occupancy_detected": {
"description": "Triggers when a zone transitions to an occupied state.",
"fields": {
"for": {
"name": "[%key:component::zone::common::trigger_for_name%]"
},
"zone": {
"description": "The zone to monitor.",
"name": "Zone"
}
},
"name": "Zone occupancy detected"
}
}
}
+76 -229
View File
@@ -1,26 +1,22 @@
"""Offer zone automation rules."""
import logging
from typing import TYPE_CHECKING, Any, cast
import voluptuous as vol
from homeassistant.components.device_tracker import ATTR_IN_ZONES
from homeassistant.const import (
ATTR_FRIENDLY_NAME,
CONF_ENTITY_ID,
CONF_EVENT,
CONF_FOR,
CONF_OPTIONS,
CONF_TARGET,
CONF_PLATFORM,
CONF_ZONE,
)
from homeassistant.core import (
CALLBACK_TYPE,
Event,
EventStateChangedData,
HassJob,
HomeAssistant,
State,
callback,
)
from homeassistant.helpers import (
@@ -28,18 +24,8 @@ from homeassistant.helpers import (
entity_registry as er,
location,
)
from homeassistant.helpers.automation import (
DomainSpec,
move_top_level_schema_fields_to_options,
)
from homeassistant.helpers.event import async_track_state_change_event
from homeassistant.helpers.trigger import (
ENTITY_STATE_TRIGGER_SCHEMA_WITH_BEHAVIOR,
EntityTriggerBase,
Trigger,
TriggerActionRunner,
TriggerConfig,
)
from homeassistant.helpers.trigger import TriggerActionType, TriggerInfo
from homeassistant.helpers.typing import ConfigType
from . import condition
@@ -52,232 +38,93 @@ _LOGGER = logging.getLogger(__name__)
_EVENT_DESCRIPTION = {EVENT_ENTER: "entering", EVENT_LEAVE: "leaving"}
_LEGACY_OPTIONS_SCHEMA: dict[vol.Marker, Any] = {
vol.Required(CONF_ENTITY_ID): cv.entity_ids_or_uuids,
vol.Required(CONF_ZONE): cv.entity_id,
vol.Required(CONF_EVENT, default=DEFAULT_EVENT): vol.Any(EVENT_ENTER, EVENT_LEAVE),
}
_LEGACY_TRIGGER_OPTIONS_SCHEMA = vol.Schema(
_TRIGGER_SCHEMA = cv.TRIGGER_BASE_SCHEMA.extend(
{
vol.Required(CONF_OPTIONS): _LEGACY_OPTIONS_SCHEMA,
},
)
# New-style zone trigger schema
_ZONE_TRIGGER_SCHEMA = ENTITY_STATE_TRIGGER_SCHEMA_WITH_BEHAVIOR.extend(
{
vol.Required(CONF_OPTIONS): {
vol.Required(CONF_ZONE): cv.entity_domain("zone"),
},
vol.Required(CONF_PLATFORM): "zone",
vol.Required(CONF_ENTITY_ID): cv.entity_ids_or_uuids,
vol.Required(CONF_ZONE): cv.entity_id,
vol.Required(CONF_EVENT, default=DEFAULT_EVENT): vol.Any(
EVENT_ENTER, EVENT_LEAVE
),
}
)
_DOMAIN_SPECS: dict[str, DomainSpec] = {
"person": DomainSpec(),
"device_tracker": DomainSpec(),
}
async def async_validate_trigger_config(
hass: HomeAssistant, config: ConfigType
) -> ConfigType:
"""Validate trigger config."""
config = _TRIGGER_SCHEMA(config)
registry = er.async_get(hass)
config[CONF_ENTITY_ID] = er.async_validate_entity_ids(
registry, config[CONF_ENTITY_ID]
)
return config
class LegacyZoneTrigger(Trigger):
"""Legacy zone trigger (platform: zone)."""
async def async_attach_trigger(
hass: HomeAssistant,
config: ConfigType,
action: TriggerActionType,
trigger_info: TriggerInfo,
*,
platform_type: str = "zone",
) -> CALLBACK_TYPE:
"""Listen for state changes based on configuration."""
trigger_data = trigger_info["trigger_data"]
entity_id: list[str] = config[CONF_ENTITY_ID]
zone_entity_id: str = config[CONF_ZONE]
event: str = config[CONF_EVENT]
job = HassJob(action)
@classmethod
async def async_validate_complete_config(
cls, hass: HomeAssistant, complete_config: ConfigType
) -> ConfigType:
"""Validate complete config, migrating legacy format to options."""
complete_config = move_top_level_schema_fields_to_options(
complete_config, _LEGACY_OPTIONS_SCHEMA
)
return await super().async_validate_complete_config(hass, complete_config)
@callback
def zone_automation_listener(zone_event: Event[EventStateChangedData]) -> None:
"""Listen for state changes and calls action."""
entity = zone_event.data["entity_id"]
from_s = zone_event.data["old_state"]
to_s = zone_event.data["new_state"]
@classmethod
async def async_validate_config(
cls, hass: HomeAssistant, config: ConfigType
) -> ConfigType:
"""Validate config."""
config = cast(ConfigType, _LEGACY_TRIGGER_OPTIONS_SCHEMA(config))
registry = er.async_get(hass)
config[CONF_OPTIONS][CONF_ENTITY_ID] = er.async_validate_entity_ids(
registry, config[CONF_OPTIONS][CONF_ENTITY_ID]
)
return config
if (from_s and not location.has_location(from_s)) or (
to_s and not location.has_location(to_s)
):
return
def __init__(self, hass: HomeAssistant, config: TriggerConfig) -> None:
"""Initialize trigger."""
super().__init__(hass, config)
if TYPE_CHECKING:
assert config.options is not None
self._options = config.options
async def async_attach_runner(
self, run_action: TriggerActionRunner
) -> CALLBACK_TYPE:
"""Listen for state changes based on configuration."""
entity_id: list[str] = self._options[CONF_ENTITY_ID]
zone_entity_id: str = self._options[CONF_ZONE]
event: str = self._options[CONF_EVENT]
@callback
def zone_automation_listener(zone_event: Event[EventStateChangedData]) -> None:
"""Listen for state changes and calls action."""
entity = zone_event.data["entity_id"]
from_s = zone_event.data["old_state"]
to_s = zone_event.data["new_state"]
if (from_s and not location.has_location(from_s)) or (
to_s and not location.has_location(to_s)
):
return
if not (zone_state := self._hass.states.get(zone_entity_id)):
_LOGGER.warning(
"Non-existing zone '%s' in a zone trigger",
zone_entity_id,
)
return
from_match = (
condition.zone(self._hass, zone_state, from_s) if from_s else False
if not (zone_state := hass.states.get(zone_entity_id)):
_LOGGER.warning(
(
"Automation '%s' is referencing non-existing zone '%s' in a zone"
" trigger"
),
trigger_info["name"],
zone_entity_id,
)
to_match = condition.zone(self._hass, zone_state, to_s) if to_s else False
return
if (event == EVENT_ENTER and not from_match and to_match) or (
event == EVENT_LEAVE and from_match and not to_match
):
description = f"{entity} {_EVENT_DESCRIPTION[event]} {zone_state.attributes[ATTR_FRIENDLY_NAME]}"
run_action(
{
from_match = condition.zone(hass, zone_state, from_s) if from_s else False
to_match = condition.zone(hass, zone_state, to_s) if to_s else False
if (event == EVENT_ENTER and not from_match and to_match) or (
event == EVENT_LEAVE and from_match and not to_match
):
description = (
f"{entity} {_EVENT_DESCRIPTION[event]}"
f" {zone_state.attributes[ATTR_FRIENDLY_NAME]}"
)
hass.async_run_hass_job(
job,
{
"trigger": {
**trigger_data,
"platform": platform_type,
"entity_id": entity,
"from_state": from_s,
"to_state": to_s,
"zone": zone_state,
"event": event,
},
description,
to_s.context if to_s else None,
)
"description": description,
}
},
to_s.context if to_s else None,
)
return async_track_state_change_event(
self._hass, entity_id, zone_automation_listener
)
class ZoneTriggerBase(EntityTriggerBase):
"""Base for zone-based triggers targeting person and device_tracker entities."""
_domain_specs = _DOMAIN_SPECS
_schema = _ZONE_TRIGGER_SCHEMA
def __init__(self, hass: HomeAssistant, config: TriggerConfig) -> None:
"""Initialize the trigger."""
super().__init__(hass, config)
self._zone: str = self._options[CONF_ZONE]
def _in_target_zone(self, state: State) -> bool:
"""Check if the entity is in the selected zone."""
in_zones = state.attributes.get(ATTR_IN_ZONES) or ()
return self._zone in in_zones
class EnteredZoneTrigger(ZoneTriggerBase):
"""Trigger when an entity enters the selected zone."""
def is_valid_transition(self, from_state: State, to_state: State) -> bool:
"""Check that the entity was not already in the selected zone."""
return not self._in_target_zone(from_state)
def is_valid_state(self, state: State) -> bool:
"""Check that the entity is now in the selected zone."""
return self._in_target_zone(state)
class LeftZoneTrigger(ZoneTriggerBase):
"""Trigger when an entity leaves the selected zone."""
def is_valid_transition(self, from_state: State, to_state: State) -> bool:
"""Check that the entity was previously in the selected zone."""
return self._in_target_zone(from_state)
def is_valid_state(self, state: State) -> bool:
"""Check that the entity is no longer in the selected zone."""
return not self._in_target_zone(state)
_OCCUPANCY_TRIGGER_SCHEMA = vol.Schema(
{
vol.Required(CONF_OPTIONS, default={}): {
vol.Required(CONF_ZONE): cv.entity_id,
vol.Optional(CONF_FOR): cv.positive_time_period,
},
}
)
class _ZoneOccupancyTriggerBase(EntityTriggerBase):
"""Base for zone occupancy triggers (single zone, no behavior)."""
_domain_specs = {"zone": DomainSpec()}
_schema = _OCCUPANCY_TRIGGER_SCHEMA
@classmethod
async def async_validate_config(
cls, hass: HomeAssistant, config: ConfigType
) -> ConfigType:
"""Validate config and synthesize a target from the zone option."""
config = cast(ConfigType, cls._schema(config))
config[CONF_TARGET] = {CONF_ENTITY_ID: [config[CONF_OPTIONS][CONF_ZONE]]}
return config
@staticmethod
def _occupancy_count(state: State) -> int | None:
"""Return the zone's persons-in-zone count; None if unparsable."""
try:
return int(state.state)
except TypeError, ValueError:
return None
@classmethod
def _is_occupied(cls, state: State) -> bool:
"""Return True if the zone has at least one occupant."""
count = cls._occupancy_count(state)
return count is not None and count >= 1
class OccupancyDetectedTrigger(_ZoneOccupancyTriggerBase):
"""Trigger when a zone transitions to an occupied state."""
def is_valid_state(self, state: State) -> bool:
"""Check that the zone is occupied."""
return self._is_occupied(state)
def is_valid_transition(self, from_state: State, to_state: State) -> bool:
"""Check that the zone was previously not occupied."""
return not self._is_occupied(from_state)
class OccupancyClearedTrigger(_ZoneOccupancyTriggerBase):
"""Trigger when a zone transitions from occupied to unoccupied."""
def is_valid_state(self, state: State) -> bool:
"""Check that the zone is empty (count == 0)."""
return self._occupancy_count(state) == 0
def is_valid_transition(self, from_state: State, to_state: State) -> bool:
"""Check that the zone was previously occupied."""
return self._is_occupied(from_state)
TRIGGERS: dict[str, type[Trigger]] = {
"_": LegacyZoneTrigger,
"entered": EnteredZoneTrigger,
"left": LeftZoneTrigger,
"occupancy_detected": OccupancyDetectedTrigger,
"occupancy_cleared": OccupancyClearedTrigger,
}
async def async_get_triggers(hass: HomeAssistant) -> dict[str, type[Trigger]]:
"""Return the triggers for zones."""
return TRIGGERS
return async_track_state_change_event(hass, entity_id, zone_automation_listener)
@@ -1,42 +0,0 @@
.trigger_zone: &trigger_zone
target:
entity:
domain:
- person
- device_tracker
fields:
behavior:
required: true
default: each
selector:
automation_behavior:
mode: trigger
for:
required: true
default: 00:00:00
selector:
duration:
zone:
required: true
selector:
entity:
domain: zone
entered: *trigger_zone
left: *trigger_zone
.trigger_occupancy: &trigger_occupancy
fields:
for:
required: true
default: 00:00:00
selector:
duration:
zone:
required: true
selector:
entity:
domain: zone
occupancy_detected: *trigger_occupancy
occupancy_cleared: *trigger_occupancy
+1 -1
View File
@@ -14,7 +14,7 @@ if TYPE_CHECKING:
APPLICATION_NAME: Final = "HomeAssistant"
MAJOR_VERSION: Final = 2026
MINOR_VERSION: Final = 6
MINOR_VERSION: Final = 7
PATCH_VERSION: Final = "0.dev0"
__short_version__: Final = f"{MAJOR_VERSION}.{MINOR_VERSION}"
__version__: Final = f"{__short_version__}.{PATCH_VERSION}"
+1 -8
View File
@@ -1714,14 +1714,7 @@ def async_extract_entities(trigger_conf: dict) -> list[str]:
return [trigger_conf[CONF_OPTIONS][CONF_ENTITY_ID]]
if trigger_conf[CONF_PLATFORM] == "zone":
options = trigger_conf[CONF_OPTIONS]
return [*options[CONF_ENTITY_ID], options[CONF_ZONE]]
if trigger_conf[CONF_PLATFORM] in ("zone.entered", "zone.left"):
return [
*async_extract_targets(trigger_conf, CONF_ENTITY_ID),
trigger_conf[CONF_OPTIONS][CONF_ZONE],
]
return trigger_conf[CONF_ENTITY_ID] + [trigger_conf[CONF_ZONE]] # type: ignore[no-any-return]
if trigger_conf[CONF_PLATFORM] == "geo_location":
return [trigger_conf[CONF_ZONE]]
+1 -1
View File
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "homeassistant"
version = "2026.6.0.dev0"
version = "2026.7.0.dev0"
license = "Apache-2.0"
license-files = ["LICENSE*", "homeassistant/backports/LICENSE*"]
description = "Open-source home automation platform running on Python 3."
+1 -1
View File
@@ -3357,7 +3357,7 @@ whois==0.9.27
wiffi==1.1.2
# homeassistant.components.wiim
wiim==0.1.2
wiim==0.1.4
# homeassistant.components.wirelesstag
wirelesstagpy==0.8.1
+30
View File
@@ -2088,6 +2088,36 @@ async def test_receive_backup_path_traversal(
assert resp.status == 400
@pytest.mark.parametrize(
"name",
[
"/absolute/path",
"../parent",
"with/slash",
],
)
async def test_receive_backup_rejects_unsafe_inner_name(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
name: str,
) -> None:
"""Test receive backup rejects an inner name that would escape the backup dir."""
await setup_backup_integration(hass)
client = await hass_client()
backup = replace(TEST_BACKUP_ABC123, name=name)
with patch(
"homeassistant.components.backup.manager.read_backup",
return_value=backup,
):
resp = await client.post(
"/api/backup/upload?agent_id=backup.local",
data={"file": StringIO("test")},
)
assert resp.status == 400
async def test_receive_backup_busy_manager(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
+32
View File
@@ -14,6 +14,7 @@ import pytest
import securetar
from homeassistant.components.backup import DOMAIN, AddonInfo, AgentBackup, Folder
from homeassistant.components.backup.models import InvalidBackupFilename
from homeassistant.components.backup.util import (
DecryptedBackupStreamer,
EncryptedBackupStreamer,
@@ -158,6 +159,37 @@ def test_read_backup(backup_json_content: bytes, expected_backup: AgentBackup) -
assert backup == expected_backup
@pytest.mark.parametrize(
"name",
[
"/absolute/path",
"../parent",
"with/slash",
"with\\backslash",
"C:\\drive\\path",
"",
".",
"..",
],
)
def test_read_backup_rejects_unsafe_name(name: str) -> None:
"""Test that read_backup rejects names that could escape the backup directory."""
backup_json_content = (
b'{"compressed":true,"date":"2024-12-02T07:23:58.261875-05:00","homeassistant":'
b'{"exclude_database":true,"version":"2024.12.0.dev0"},"name":"'
+ name.encode().replace(b"\\", b"\\\\")
+ b'","protected":true,"slug":"455645fe","type":"partial","version":2}'
)
mock_path = Mock()
mock_path.stat.return_value.st_size = 1234
with patch("homeassistant.components.backup.util.tarfile.open") as mock_open_tar:
tar_ctx = mock_open_tar.return_value.__enter__.return_value
tar_ctx.extractfile.return_value.read.return_value = backup_json_content
with pytest.raises(InvalidBackupFilename):
read_backup(mock_path)
@pytest.mark.parametrize(
("backup", "password", "validation_result", "expected_messages"),
[
+10 -29
View File
@@ -1586,12 +1586,12 @@ async def _validate_trigger_options(
options: dict[str, Any] | None,
*,
valid: bool,
supports_target: bool = True,
) -> None:
"""Assert that a trigger accepts or rejects the given options during validation."""
trigger_config: dict[str, Any] = {CONF_PLATFORM: trigger}
if supports_target:
trigger_config[CONF_TARGET] = {ATTR_LABEL_ID: "test_label"}
trigger_config: dict[str, Any] = {
CONF_PLATFORM: trigger,
CONF_TARGET: {ATTR_LABEL_ID: "test_label"},
}
if options is not None:
trigger_config[CONF_OPTIONS] = options
if valid:
@@ -1608,7 +1608,6 @@ async def assert_trigger_options_supported(
*,
supports_behavior: bool,
supports_duration: bool,
supports_target: bool = True,
) -> None:
"""Assert which options a trigger supports.
@@ -1625,15 +1624,9 @@ async def assert_trigger_options_supported(
# Minimal config should always be valid
supports_empty = not bool(base_options)
await _validate_trigger_options(
hass, trigger, None, valid=supports_empty, supports_target=supports_target
)
await _validate_trigger_options(
hass, trigger, {}, valid=supports_empty, supports_target=supports_target
)
await _validate_trigger_options(
hass, trigger, base_options, valid=True, supports_target=supports_target
)
await _validate_trigger_options(hass, trigger, None, valid=supports_empty)
await _validate_trigger_options(hass, trigger, {}, valid=supports_empty)
await _validate_trigger_options(hass, trigger, base_options, valid=True)
def _merge(extra: dict[str, Any]) -> dict[str, Any]:
return {**(base_options or {}), **extra}
@@ -1641,30 +1634,18 @@ async def assert_trigger_options_supported(
# Behavior
for behavior in ("each", "first", "all"):
await _validate_trigger_options(
hass,
trigger,
_merge({"behavior": behavior}),
valid=supports_behavior,
supports_target=supports_target,
hass, trigger, _merge({"behavior": behavior}), valid=supports_behavior
)
# Duration
for for_value in ({"seconds": 5}, "00:00:05", 5):
await _validate_trigger_options(
hass,
trigger,
_merge({"for": for_value}),
valid=supports_duration,
supports_target=supports_target,
hass, trigger, _merge({"for": for_value}), valid=supports_duration
)
# Unknown option should always be rejected
await _validate_trigger_options(
hass,
trigger,
_merge({"unknown_option": True}),
valid=False,
supports_target=supports_target,
hass, trigger, _merge({"unknown_option": True}), valid=False
)
@@ -1379,6 +1379,31 @@ def test_base_tracker_entity() -> None:
entity.state_attributes # noqa: B018
def test_battery_level_override_deprecation_warning(
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test that overriding battery_level in a subclass logs a warning."""
error_message = "is overriding the deprecated battery_level property"
caplog.clear()
class _SubclassWithOverride(TrackerEntity):
@property
def battery_level(self) -> int | None:
return 50
assert error_message in caplog.text
assert _SubclassWithOverride.__name__ in caplog.text
# No warning for a subclass that does not override battery_level
caplog.clear()
class _SubclassWithoutOverride(TrackerEntity):
pass
assert error_message not in caplog.text
async def test_attr_location_name_deprecation_warning(
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
+1
View File
@@ -78,6 +78,7 @@ FIXTURES = [
"mock_pressure_sensor",
"mock_pump",
"mock_room_airconditioner",
"mock_soil_sensor",
"mock_solar_inverter",
"mock_speaker",
"mock_switch_unit",
@@ -0,0 +1,87 @@
{
"node_id": 101,
"date_commissioned": "2024-11-27T00:00:00.000000",
"last_interview": "2024-11-27T00:00:00.000000",
"interview_version": 2,
"attributes": {
"0/29/0": [
{
"0": 22,
"1": 1
}
],
"0/29/1": [
4, 29, 31, 40, 42, 43, 44, 48, 49, 50, 51, 52, 53, 54, 55, 59, 60, 62, 63,
64, 65
],
"0/29/2": [41],
"0/29/3": [1],
"0/29/65532": 0,
"0/29/65533": 1,
"0/29/65528": [],
"0/29/65529": [],
"0/29/65531": [0, 1, 2, 3, 65528, 65529, 65531, 65532, 65533],
"0/40/0": 1,
"0/40/1": "Nabu Casa",
"0/40/2": 65521,
"0/40/3": "Mock SoilSensor",
"0/40/4": 32768,
"0/40/5": "Mock Soil Sensor",
"0/40/6": "XX",
"0/40/7": 0,
"0/40/8": "v1.0",
"0/40/9": 1,
"0/40/10": "v1.0",
"0/40/11": "20241127",
"0/40/12": "",
"0/40/13": "",
"0/40/14": "",
"0/40/15": "TEST_SN",
"0/40/16": false,
"0/40/17": true,
"0/40/18": "mock-soil-sensor",
"0/40/19": {
"0": 3,
"1": 3
},
"0/40/65532": 0,
"0/40/65533": 1,
"0/40/65528": [],
"0/40/65529": [],
"0/40/65531": [
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
65528, 65529, 65531, 65532, 65533
],
"1/3/65529": [0, 64],
"1/3/65531": [0, 1, 65528, 65529, 65531, 65532, 65533],
"1/29/0": [
{
"0": 69,
"1": 1
}
],
"1/29/1": [3, 29, 57, 1072, 40],
"1/29/2": [],
"1/29/3": [9, 10],
"1/29/65532": null,
"1/29/65533": 1,
"1/29/65528": [],
"1/29/65529": [],
"1/29/65531": [0, 1, 2, 3, 65528, 65529, 65531, 65532, 65533],
"1/1072/0": {
"0": 17,
"1": true,
"2": 0,
"3": 100,
"4": []
},
"1/1072/1": 50,
"1/1072/65532": 0,
"1/1072/65533": 1,
"1/1072/65528": [],
"1/1072/65529": [],
"1/1072/65531": [0, 1, 65528, 65529, 65531, 65532, 65533]
},
"available": true,
"attribute_subscriptions": []
}
@@ -18619,6 +18619,61 @@
'state': '0.0',
})
# ---
# name: test_sensors[mock_soil_sensor][sensor.mock_soil_sensor_moisture-entry]
EntityRegistryEntrySnapshot({
'aliases': list([
None,
]),
'area_id': None,
'capabilities': dict({
'state_class': <SensorStateClass.MEASUREMENT: 'measurement'>,
}),
'config_entry_id': <ANY>,
'config_subentry_id': <ANY>,
'device_class': None,
'device_id': <ANY>,
'disabled_by': None,
'domain': 'sensor',
'entity_category': None,
'entity_id': 'sensor.mock_soil_sensor_moisture',
'has_entity_name': True,
'hidden_by': None,
'icon': None,
'id': <ANY>,
'labels': set({
}),
'name': None,
'object_id_base': 'Moisture',
'options': dict({
}),
'original_device_class': <SensorDeviceClass.MOISTURE: 'moisture'>,
'original_icon': None,
'original_name': 'Moisture',
'platform': 'matter',
'previous_unique_id': None,
'suggested_object_id': None,
'supported_features': 0,
'translation_key': None,
'unique_id': '00000000000004D2-0000000000000065-MatterNodeDevice-1-SoilMoistureSensor-1072-1',
'unit_of_measurement': '%',
})
# ---
# name: test_sensors[mock_soil_sensor][sensor.mock_soil_sensor_moisture-state]
StateSnapshot({
'attributes': ReadOnlyDict({
'device_class': 'moisture',
'friendly_name': 'Mock Soil Sensor Moisture',
'state_class': <SensorStateClass.MEASUREMENT: 'measurement'>,
'unit_of_measurement': '%',
}),
'context': <ANY>,
'entity_id': 'sensor.mock_soil_sensor_moisture',
'last_changed': <ANY>,
'last_reported': <ANY>,
'last_updated': <ANY>,
'state': '50',
})
# ---
# name: test_sensors[mock_solar_inverter][sensor.mock_solar_inverter_active_current-entry]
EntityRegistryEntrySnapshot({
'aliases': list([
+19
View File
@@ -85,6 +85,25 @@ async def test_humidity_sensor(
assert state.state == "40.0"
@pytest.mark.parametrize("node_fixture", ["mock_soil_sensor"])
async def test_soil_moisture_sensor(
hass: HomeAssistant,
matter_client: MagicMock,
matter_node: MatterNode,
) -> None:
"""Test soil moisture sensor."""
state = hass.states.get("sensor.mock_soil_sensor_moisture")
assert state
assert state.state == "50"
set_node_attribute(matter_node, 1, 1072, 1, 75)
await trigger_subscription_callback(hass, matter_client)
state = hass.states.get("sensor.mock_soil_sensor_moisture")
assert state
assert state.state == "75"
@pytest.mark.parametrize("node_fixture", ["mock_light_sensor"])
async def test_light_sensor(
hass: HomeAssistant,
+1
View File
@@ -38,5 +38,6 @@ async def fire_transport_update(
"""Trigger the registered AVTransport callback on the mock device."""
assert mock_device.av_transport_event_callback is not None
mock_device.event_data = {"TransportState": transport_state.value}
mock_device.playing_status = transport_state
mock_device.av_transport_event_callback(MagicMock(), [])
await hass.async_block_till_done()
+9 -411
View File
@@ -1,36 +1,14 @@
"""The tests for the location automation."""
from datetime import timedelta
from typing import Any
from freezegun.api import FrozenDateTimeFactory
import pytest
import voluptuous as vol
from homeassistant.components import automation, zone
from homeassistant.const import (
ATTR_ENTITY_ID,
ENTITY_MATCH_ALL,
SERVICE_TURN_OFF,
STATE_UNAVAILABLE,
STATE_UNKNOWN,
)
from homeassistant.const import ATTR_ENTITY_ID, ENTITY_MATCH_ALL, SERVICE_TURN_OFF
from homeassistant.core import Context, HomeAssistant, ServiceCall
from homeassistant.helpers import entity_registry as er
from homeassistant.helpers.trigger import async_validate_trigger_config
from homeassistant.setup import async_setup_component
from tests.common import async_fire_time_changed, mock_component
from tests.components.common import (
TriggerStateDescription,
assert_trigger_behavior_all,
assert_trigger_behavior_each,
assert_trigger_behavior_first,
assert_trigger_options_supported,
parametrize_target_entities,
parametrize_trigger_states,
target_entities,
)
from tests.common import mock_component
@pytest.fixture(autouse=True)
@@ -365,7 +343,10 @@ async def test_unknown_zone(
},
)
assert "Non-existing zone 'zone.no_such_zone' in a zone trigger" not in caplog.text
assert (
"Automation 'My Automation' is referencing non-existing zone"
" 'zone.no_such_zone' in a zone trigger" not in caplog.text
)
hass.states.async_set(
"test.entity",
@@ -375,390 +356,7 @@ async def test_unknown_zone(
)
await hass.async_block_till_done()
assert "Non-existing zone 'zone.no_such_zone' in a zone trigger" in caplog.text
# --- New-style zone trigger tests ---
ZONE_HOME = "zone.home"
ZONE_WORK = "zone.work"
IN_ZONES_HOME = {"in_zones": [ZONE_HOME]}
IN_ZONES_WORK = {"in_zones": [ZONE_WORK]}
IN_ZONES_NONE: dict[str, list[str]] = {"in_zones": []}
TRIGGER_ZONE = ZONE_HOME
@pytest.mark.parametrize(
("trigger_key", "base_options", "supports_behavior", "supports_duration"),
[
("zone.entered", {"zone": TRIGGER_ZONE}, True, True),
("zone.left", {"zone": TRIGGER_ZONE}, True, True),
],
)
async def test_zone_trigger_options_validation(
hass: HomeAssistant,
trigger_key: str,
base_options: dict[str, Any] | None,
supports_behavior: bool,
supports_duration: bool,
) -> None:
"""Test that zone triggers support the expected options."""
await assert_trigger_options_supported(
hass,
trigger_key,
base_options,
supports_behavior=supports_behavior,
supports_duration=supports_duration,
assert (
"Automation 'My Automation' is referencing non-existing zone"
" 'zone.no_such_zone' in a zone trigger" in caplog.text
)
@pytest.mark.parametrize("trigger_key", ["zone.entered", "zone.left"])
async def test_zone_trigger_rejects_non_zone_entity_id(
hass: HomeAssistant, trigger_key: str
) -> None:
"""Test that the zone option must reference entities in the zone domain."""
with pytest.raises(vol.Invalid):
await async_validate_trigger_config(
hass,
[
{
"platform": trigger_key,
"target": {"entity_id": "person.alice"},
"options": {"zone": "person.alice"},
}
],
)
@pytest.fixture
async def target_zone_entities(
hass: HomeAssistant, domain: str
) -> dict[str, list[str]]:
"""Create multiple zone-trackable entities associated with different targets."""
return await target_entities(hass, domain, domain_excluded="sensor")
_ZONE_TRIGGER_STATES = [
*parametrize_trigger_states(
trigger="zone.entered",
trigger_options={"zone": TRIGGER_ZONE},
target_states=[
("home", IN_ZONES_HOME),
],
other_states=[
("not_home", IN_ZONES_NONE),
("Work", IN_ZONES_WORK),
],
),
*parametrize_trigger_states(
trigger="zone.left",
trigger_options={"zone": TRIGGER_ZONE},
target_states=[
("not_home", IN_ZONES_NONE),
("Work", IN_ZONES_WORK),
],
other_states=[
("home", IN_ZONES_HOME),
],
),
]
def _parametrize_zone_target_entities() -> list[tuple[dict[str, Any], str, int, str]]:
"""Parametrize target entities for all supported zone trigger domains."""
return [
(*params, domain)
for domain in ("person", "device_tracker")
for params in parametrize_target_entities(domain)
]
@pytest.mark.parametrize(
("trigger_target_config", "entity_id", "entities_in_target", "domain"),
_parametrize_zone_target_entities(),
)
@pytest.mark.parametrize(
("trigger", "trigger_options", "states"),
_ZONE_TRIGGER_STATES,
)
async def test_zone_trigger_behavior_each(
hass: HomeAssistant,
target_zone_entities: dict[str, list[str]],
trigger_target_config: dict[str, Any],
entity_id: str,
entities_in_target: int,
trigger: str,
trigger_options: dict[str, Any],
states: list[TriggerStateDescription],
) -> None:
"""Test zone triggers fire when any targeted entity changes."""
await assert_trigger_behavior_each(
hass,
target_entities=target_zone_entities,
trigger_target_config=trigger_target_config,
entity_id=entity_id,
entities_in_target=entities_in_target,
trigger=trigger,
trigger_options=trigger_options,
states=states,
)
@pytest.mark.parametrize(
("trigger_target_config", "entity_id", "entities_in_target", "domain"),
_parametrize_zone_target_entities(),
)
@pytest.mark.parametrize(
("trigger", "trigger_options", "states"),
_ZONE_TRIGGER_STATES,
)
async def test_zone_trigger_behavior_first(
hass: HomeAssistant,
target_zone_entities: dict[str, list[str]],
trigger_target_config: dict[str, Any],
entity_id: str,
entities_in_target: int,
trigger: str,
trigger_options: dict[str, Any],
states: list[TriggerStateDescription],
) -> None:
"""Test zone triggers fire when first targeted entity changes."""
await assert_trigger_behavior_first(
hass,
target_entities=target_zone_entities,
trigger_target_config=trigger_target_config,
entity_id=entity_id,
entities_in_target=entities_in_target,
trigger=trigger,
trigger_options=trigger_options,
states=states,
)
@pytest.mark.parametrize(
("trigger_target_config", "entity_id", "entities_in_target", "domain"),
_parametrize_zone_target_entities(),
)
@pytest.mark.parametrize(
("trigger", "trigger_options", "states"),
_ZONE_TRIGGER_STATES,
)
async def test_zone_trigger_behavior_all(
hass: HomeAssistant,
target_zone_entities: dict[str, list[str]],
trigger_target_config: dict[str, Any],
entity_id: str,
entities_in_target: int,
trigger: str,
trigger_options: dict[str, Any],
states: list[TriggerStateDescription],
) -> None:
"""Test zone triggers fire when last targeted entity changes."""
await assert_trigger_behavior_all(
hass,
target_entities=target_zone_entities,
trigger_target_config=trigger_target_config,
entity_id=entity_id,
entities_in_target=entities_in_target,
trigger=trigger,
trigger_options=trigger_options,
states=states,
)
# --- Zone occupancy trigger tests ---
@pytest.mark.usefixtures("enable_labs_preview_features")
@pytest.mark.parametrize(
("trigger_key"),
["zone.occupancy_detected", "zone.occupancy_cleared"],
)
async def test_zone_occupancy_trigger_options_validation(
hass: HomeAssistant,
trigger_key: str,
) -> None:
"""Test that occupancy triggers support the expected options."""
await assert_trigger_options_supported(
hass,
trigger_key,
{"zone": ZONE_HOME},
supports_behavior=False,
supports_duration=True,
supports_target=False,
)
@pytest.mark.usefixtures("enable_labs_preview_features")
@pytest.mark.parametrize(
("trigger_key", "from_state", "to_state", "should_fire"),
[
# occupancy_detected
pytest.param("zone.occupancy_detected", "0", "1", True, id="detected_0_to_1"),
pytest.param("zone.occupancy_detected", "0", "3", True, id="detected_0_to_3"),
pytest.param("zone.occupancy_detected", "1", "2", False, id="detected_1_to_2"),
pytest.param("zone.occupancy_detected", "2", "0", False, id="detected_2_to_0"),
pytest.param(
"zone.occupancy_detected",
STATE_UNKNOWN,
"1",
False,
id="detected_unknown_to_1",
),
pytest.param(
"zone.occupancy_detected",
STATE_UNAVAILABLE,
"1",
False,
id="detected_unavailable_to_1",
),
pytest.param(
"zone.occupancy_detected",
"0",
STATE_UNAVAILABLE,
False,
id="detected_0_to_unavailable",
),
# occupancy_cleared
pytest.param("zone.occupancy_cleared", "1", "0", True, id="cleared_1_to_0"),
pytest.param("zone.occupancy_cleared", "3", "0", True, id="cleared_3_to_0"),
pytest.param("zone.occupancy_cleared", "2", "1", False, id="cleared_2_to_1"),
pytest.param("zone.occupancy_cleared", "0", "1", False, id="cleared_0_to_1"),
pytest.param(
"zone.occupancy_cleared",
"1",
STATE_UNAVAILABLE,
False,
id="cleared_1_to_unavailable",
),
pytest.param(
"zone.occupancy_cleared",
"1",
STATE_UNKNOWN,
False,
id="cleared_1_to_unknown",
),
],
)
async def test_zone_occupancy_trigger_transitions(
hass: HomeAssistant,
service_calls: list[ServiceCall],
trigger_key: str,
from_state: str,
to_state: str,
should_fire: bool,
) -> None:
"""Test occupancy triggers fire on the expected numeric-state transitions."""
hass.states.async_set(ZONE_HOME, from_state)
await hass.async_block_till_done()
assert await async_setup_component(
hass,
automation.DOMAIN,
{
automation.DOMAIN: {
"trigger": {
"trigger": trigger_key,
"options": {"zone": ZONE_HOME},
},
"action": {"service": "test.automation"},
}
},
)
hass.states.async_set(ZONE_HOME, to_state)
await hass.async_block_till_done()
assert (len(service_calls) == 1) is should_fire
@pytest.mark.usefixtures("enable_labs_preview_features")
@pytest.mark.parametrize(
("trigger_key", "from_value", "to_value", "revert_value"),
[
("zone.occupancy_detected", "0", "1", "0"),
("zone.occupancy_cleared", "1", "0", "1"),
],
)
async def test_zone_occupancy_trigger_for_duration(
hass: HomeAssistant,
freezer: FrozenDateTimeFactory,
service_calls: list[ServiceCall],
trigger_key: str,
from_value: str,
to_value: str,
revert_value: str,
) -> None:
"""Test that `for` delays the firing and an early revert cancels it."""
hass.states.async_set(ZONE_HOME, from_value)
await hass.async_block_till_done()
assert await async_setup_component(
hass,
automation.DOMAIN,
{
automation.DOMAIN: {
"trigger": {
"trigger": trigger_key,
"options": {"zone": ZONE_HOME, "for": {"seconds": 5}},
},
"action": {"service": "test.automation"},
}
},
)
# Transition, then revert before the duration elapses -> no fire.
hass.states.async_set(ZONE_HOME, to_value)
await hass.async_block_till_done()
hass.states.async_set(ZONE_HOME, revert_value)
await hass.async_block_till_done()
freezer.tick(timedelta(seconds=10))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(service_calls) == 0
# Transition and hold past the duration -> fire once.
hass.states.async_set(ZONE_HOME, to_value)
await hass.async_block_till_done()
freezer.tick(timedelta(seconds=10))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(service_calls) == 1
@pytest.mark.usefixtures("enable_labs_preview_features")
async def test_zone_occupancy_trigger_payload(
hass: HomeAssistant, service_calls: list[ServiceCall]
) -> None:
"""Test the payload exposed to the action template."""
hass.states.async_set(ZONE_HOME, "0")
await hass.async_block_till_done()
assert await async_setup_component(
hass,
automation.DOMAIN,
{
automation.DOMAIN: {
"trigger": {
"trigger": "zone.occupancy_detected",
"options": {"zone": ZONE_HOME},
},
"action": {
"service": "test.automation",
"data_template": {
"some": (
"{{ trigger.entity_id }}"
" - {{ trigger.from_state.state }}"
" - {{ trigger.to_state.state }}"
" - {{ trigger.for }}"
)
},
},
}
},
)
hass.states.async_set(ZONE_HOME, "2")
await hass.async_block_till_done()
assert len(service_calls) == 1
assert service_calls[0].data["some"] == f"{ZONE_HOME} - 0 - 2 - None"
+19 -137
View File
@@ -14,7 +14,6 @@ from pytest_unordered import unordered
import voluptuous as vol
from homeassistant.components import automation
from homeassistant.components.device_automation import DEVICE_TRIGGER_BASE_SCHEMA
from homeassistant.components.sun import DOMAIN as SUN_DOMAIN
from homeassistant.components.system_health import DOMAIN as SYSTEM_HEALTH_DOMAIN
from homeassistant.components.tag import DOMAIN as TAG_DOMAIN
@@ -42,11 +41,7 @@ from homeassistant.core import (
callback,
)
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import (
config_validation as cv,
device_registry as dr,
trigger,
)
from homeassistant.helpers import config_validation as cv, trigger
from homeassistant.helpers.automation import (
DomainSpec,
move_top_level_schema_fields_to_options,
@@ -83,7 +78,6 @@ from homeassistant.util.unit_conversion import TemperatureConverter
from homeassistant.util.yaml.loader import parse_yaml
from tests.common import (
MockConfigEntry,
MockModule,
MockPlatform,
async_fire_time_changed,
@@ -4624,34 +4618,6 @@ async def test_entity_trigger_duration_cancelled_on_invalid_state(
unsub()
@pytest.fixture
def mock_test_modern_trigger(hass: HomeAssistant) -> None:
"""Register a mock 'test' integration and trigger platform exposing 'test.modern'."""
class MockModernTrigger(Trigger):
"""Mock modern trigger that accepts any options/target."""
@classmethod
async def async_validate_config(
cls, hass: HomeAssistant, config: ConfigType
) -> ConfigType:
return config
async def async_attach_runner(
self, run_action: TriggerActionRunner
) -> CALLBACK_TYPE:
return lambda: None
async def async_get_triggers(
hass: HomeAssistant,
) -> dict[str, type[Trigger]]:
return {"modern": MockModernTrigger}
mock_integration(hass, MockModule("test"))
mock_platform(hass, "test.trigger", Mock(async_get_triggers=async_get_triggers))
@pytest.mark.usefixtures("mock_test_modern_trigger")
@pytest.mark.parametrize(
("trigger_conf", "expected"),
[
@@ -4661,7 +4627,7 @@ def mock_test_modern_trigger(hass: HomeAssistant) -> None:
id="state",
),
pytest.param(
{"platform": "numeric_state", "entity_id": ["sensor.a"], "above": 5},
{"platform": "numeric_state", "entity_id": ["sensor.a"]},
["sensor.a"],
id="numeric_state",
),
@@ -4673,61 +4639,36 @@ def mock_test_modern_trigger(hass: HomeAssistant) -> None:
pytest.param(
{
"platform": "zone",
"options": {
"entity_id": ["person.a"],
"zone": "zone.home",
"event": "enter",
},
"entity_id": ["person.a"],
"zone": "zone.home",
"event": "enter",
},
["person.a", "zone.home"],
id="zone-legacy",
),
pytest.param(
{
"platform": "zone.entered",
"target": {"entity_id": ["person.a", "device_tracker.b"]},
"options": {"zone": "zone.home"},
},
["person.a", "device_tracker.b", "zone.home"],
id="zone-entered-modern",
),
pytest.param(
{
"platform": "zone.left",
"target": {"entity_id": "person.a"},
"options": {"zone": "zone.home"},
},
["person.a", "zone.home"],
id="zone-left-modern",
),
pytest.param(
{"platform": "geo_location", "zone": "zone.home", "source": "test"},
{"platform": "geo_location", "zone": "zone.home"},
["zone.home"],
id="geo_location",
),
pytest.param(
{"platform": "sun", "event": "sunrise"},
{"platform": "sun"},
["sun.sun"],
id="sun",
),
pytest.param(
{
"platform": "event",
"event_type": "test_event",
"event_data": {"entity_id": "sensor.x"},
},
{"platform": "event", "event_data": {"entity_id": "sensor.x"}},
["sensor.x"],
id="event-with-entity-id",
),
pytest.param(
{"platform": "event", "event_type": "test_event"},
{"platform": "event"},
[],
id="event-without-entity-id",
),
pytest.param(
{
"platform": "event",
"event_type": "test_event",
"event_data": {"entity_id": "not-a-valid-entity-id"},
},
[],
@@ -4736,7 +4677,6 @@ def mock_test_modern_trigger(hass: HomeAssistant) -> None:
pytest.param(
{
"platform": "event",
"event_type": "test_event",
"event_data": {"entity_id": ["sensor.x", "sensor.y"]},
},
[],
@@ -4767,89 +4707,38 @@ def mock_test_modern_trigger(hass: HomeAssistant) -> None:
),
],
)
async def test_async_extract_entities(
hass: HomeAssistant,
trigger_conf: dict[str, Any],
expected: list[str],
def test_async_extract_entities(
trigger_conf: dict[str, Any], expected: list[str]
) -> None:
"""Test extracting entities from various trigger config shapes."""
[trigger_conf] = await trigger.async_validate_trigger_config(hass, [trigger_conf])
assert trigger.async_extract_entities(trigger_conf) == expected
_MOCK_DEVICE_ID = "_mock_device_id_"
@pytest.fixture
async def mock_device_automation(hass: HomeAssistant) -> str:
"""Register a mock 'test' integration, device_trigger platform, and device.
Returns the device id, which tests substitute for _MOCK_DEVICE_ID in their
parametrized configs so the device platform branch can validate properly.
"""
mock_integration(hass, MockModule("test"))
mock_platform(
hass,
"test.device_trigger",
Mock(
TRIGGER_SCHEMA=DEVICE_TRIGGER_BASE_SCHEMA.extend({}, extra=vol.ALLOW_EXTRA)
),
)
config_entry = MockConfigEntry(domain="test")
config_entry.add_to_hass(hass)
device = dr.async_get(hass).async_get_or_create(
config_entry_id=config_entry.entry_id,
identifiers={("test", "test")},
)
return device.id
def _substitute(obj: Any, placeholder: str, value: str) -> Any:
"""Recursively replace `placeholder` with `value` inside lists/dicts."""
if isinstance(obj, dict):
return {k: _substitute(v, placeholder, value) for k, v in obj.items()}
if isinstance(obj, list):
return [_substitute(v, placeholder, value) for v in obj]
return value if obj == placeholder else obj
@pytest.mark.parametrize(
("trigger_conf", "expected"),
[
pytest.param(
{
"platform": "device",
"device_id": _MOCK_DEVICE_ID,
"domain": "test",
},
[_MOCK_DEVICE_ID],
{"platform": "device", "device_id": "abc123"},
["abc123"],
id="device",
),
pytest.param(
{
"platform": "event",
"event_type": "test_event",
"event_data": {"device_id": "abc123"},
},
{"platform": "event", "event_data": {"device_id": "abc123"}},
["abc123"],
id="event-with-device-id",
),
pytest.param(
{"platform": "event", "event_type": "test_event"},
{"platform": "event"},
[],
id="event-without-device-id",
),
pytest.param(
{
"platform": "tag",
"tag_id": "mytag",
"device_id": ["abc123", "def456"],
},
{"platform": "tag", "device_id": ["abc123", "def456"]},
["abc123", "def456"],
id="tag-with-device-id",
),
pytest.param(
{"platform": "tag", "tag_id": "mytag"},
{"platform": "tag"},
[],
id="tag-without-device-id",
),
@@ -4873,15 +4762,8 @@ def _substitute(obj: Any, placeholder: str, value: str) -> Any:
),
],
)
async def test_async_extract_devices(
hass: HomeAssistant,
mock_test_modern_trigger: None,
mock_device_automation: str,
trigger_conf: dict[str, Any],
expected: list[str],
def test_async_extract_devices(
trigger_conf: dict[str, Any], expected: list[str]
) -> None:
"""Test extracting devices from various trigger config shapes."""
trigger_conf = _substitute(trigger_conf, _MOCK_DEVICE_ID, mock_device_automation)
expected = _substitute(expected, _MOCK_DEVICE_ID, mock_device_automation)
[trigger_conf] = await trigger.async_validate_trigger_config(hass, [trigger_conf])
assert trigger.async_extract_devices(trigger_conf) == expected