Compare commits

..

1 Commits

Author SHA1 Message Date
Erik 0f789a6797 Improve tests of helpers.trigger.extract_devices/entities 2026-05-28 10:06:24 +02:00
44 changed files with 1000 additions and 606 deletions
+1 -1
View File
@@ -39,7 +39,7 @@ on:
env:
CACHE_VERSION: 3
MYPY_CACHE_VERSION: 1
HA_SHORT_VERSION: "2026.7"
HA_SHORT_VERSION: "2026.6"
ADDITIONAL_PYTHON_VERSIONS: "[]"
# 10.3 is the oldest supported version
# - 10.3.32 is the version currently shipped with Synology (as of 17 Feb 2022)
+3 -16
View File
@@ -11,7 +11,7 @@ from homeassistant.helpers.hassio import is_hassio
from .agent import BackupAgent, LocalBackupAgent, OnProgressCallback
from .const import DOMAIN, LOGGER
from .models import AgentBackup, BackupNotFound, InvalidBackupFilename
from .models import AgentBackup, BackupNotFound
from .util import read_backup, suggested_filename
@@ -54,13 +54,7 @@ class CoreLocalBackupAgent(LocalBackupAgent):
try:
backup = read_backup(backup_path)
backups[backup.backup_id] = (backup, backup_path)
except (
OSError,
TarError,
json.JSONDecodeError,
KeyError,
InvalidBackupFilename,
) as err:
except (OSError, TarError, json.JSONDecodeError, KeyError) as err:
LOGGER.warning("Unable to read backup %s: %s", backup_path, err)
return backups
@@ -128,14 +122,7 @@ class CoreLocalBackupAgent(LocalBackupAgent):
def get_new_backup_path(self, backup: AgentBackup) -> Path:
"""Return the local path to a new backup."""
candidate = self._backup_dir / suggested_filename(backup)
# suggested_filename does not strip separators; refuse paths that would
# land outside the backup directory.
if candidate.parent != self._backup_dir:
raise InvalidBackupFilename(
f"Refusing to write outside {self._backup_dir}: {candidate}"
)
return candidate
return self._backup_dir / suggested_filename(backup)
async def async_delete_backup(self, backup_id: str, **kwargs: Any) -> None:
"""Delete a backup file."""
+1 -7
View File
@@ -1978,13 +1978,7 @@ class CoreBackupReaderWriter(BackupReaderWriter):
try:
backup = await async_add_executor_job(read_backup, temp_file)
except (
OSError,
tarfile.TarError,
json.JSONDecodeError,
KeyError,
InvalidBackupFilename,
) as err:
except (OSError, tarfile.TarError, json.JSONDecodeError, KeyError) as err:
LOGGER.warning("Unable to parse backup %s: %s", temp_file, err)
raise
+3 -10
View File
@@ -6,7 +6,7 @@ import copy
from dataclasses import dataclass, replace
from io import BytesIO
import json
from pathlib import Path, PurePath, PureWindowsPath
from pathlib import Path, PurePath
from queue import SimpleQueue
import tarfile
import threading
@@ -34,7 +34,7 @@ from homeassistant.util.async_iterator import (
from homeassistant.util.json import JsonObjectType, json_loads_object
from .const import BUF_SIZE, LOGGER, SECURETAR_CREATE_VERSION
from .models import AddonInfo, AgentBackup, Folder, InvalidBackupFilename
from .models import AddonInfo, AgentBackup, Folder
class DecryptError(HomeAssistantError):
@@ -109,13 +109,6 @@ def read_backup(backup_path: Path) -> AgentBackup:
extra_metadata = cast(dict[str, bool | str], data.get("extra", {}))
date = extra_metadata.get("supervisor.backup_request_date", data["date"])
name = cast(str, data["name"])
# The name is used to derive the on-disk filename via suggested_filename;
# reject anything that could escape the backup directory.
safe_name = PureWindowsPath(name).name
if safe_name != name or name in ("", ".", ".."):
raise InvalidBackupFilename(f"Invalid backup name: {name!r}")
return AgentBackup(
addons=addons,
backup_id=cast(str, data["slug"]),
@@ -125,7 +118,7 @@ def read_backup(backup_path: Path) -> AgentBackup:
folders=folders,
homeassistant_included=homeassistant_included,
homeassistant_version=homeassistant_version,
name=name,
name=cast(str, data["name"]),
protected=cast(bool, data.get("protected", False)),
size=backup_path.stat().st_size,
)
@@ -12,13 +12,18 @@ from homeassistant.const import (
CONF_DOMAIN,
CONF_ENTITY_ID,
CONF_EVENT,
CONF_OPTIONS,
CONF_PLATFORM,
CONF_TYPE,
CONF_ZONE,
)
from homeassistant.core import CALLBACK_TYPE, HomeAssistant
from homeassistant.helpers import config_validation as cv, entity_registry as er
from homeassistant.helpers.trigger import TriggerActionType, TriggerInfo
from homeassistant.helpers.trigger import (
TriggerActionType,
TriggerInfo,
_async_attach_trigger_cls,
)
from homeassistant.helpers.typing import ConfigType
from .const import DOMAIN
@@ -79,16 +84,18 @@ async def async_attach_trigger(
event = zone.EVENT_ENTER
else:
event = zone.EVENT_LEAVE
zone_config = {
CONF_PLATFORM: ZONE_DOMAIN,
CONF_ENTITY_ID: config[CONF_ENTITY_ID],
CONF_ZONE: config[CONF_ZONE],
CONF_EVENT: event,
}
zone_config = await zone.async_validate_trigger_config(hass, zone_config)
return await zone.async_attach_trigger(
hass, zone_config, action, trigger_info, platform_type="device"
zone_config = await zone.LegacyZoneTrigger.async_validate_config(
hass,
{
CONF_OPTIONS: {
CONF_ENTITY_ID: [config[CONF_ENTITY_ID]],
CONF_ZONE: config[CONF_ZONE],
CONF_EVENT: event,
}
},
)
return await _async_attach_trigger_cls(
hass, zone.LegacyZoneTrigger, "device", zone_config, action, trigger_info
)
@@ -169,35 +169,11 @@ class BaseTrackerEntity(Entity):
_attr_entity_category = EntityCategory.DIAGNOSTIC
_attr_source_type: SourceType
def __init_subclass__(cls, **kwargs: Any) -> None:
"""Post initialisation processing."""
super().__init_subclass__(**kwargs)
if "battery_level" in cls.__dict__:
if cls.__module__.startswith("homeassistant.components."):
# Don't ask users to report issue for built in integrations,
# they already have issues opened on them.
return
report_issue = async_suggest_report_issue(
async_get_hass_or_none(), module=cls.__module__
)
_LOGGER.warning(
(
"%s::%s is overriding the deprecated battery_level property on "
"a subclass of BaseTrackerEntity, this will be unsupported from "
"Home Assistant 2027.7, please %s"
),
cls.__module__,
cls.__name__,
report_issue,
)
@cached_property
def battery_level(self) -> int | None:
"""Return the battery level of the device.
Percentage from 0-100.
The property is deprecated and will be removed in Home Assistant 2027.7.
"""
return None
@@ -50,7 +50,7 @@ class DuckDnsUpdateCoordinator(DataUpdateCoordinator[None]):
"""Update Duck DNS."""
retry_after = BACKOFF_INTERVALS[
min(self.failed, len(BACKOFF_INTERVALS) - 1)
min(self.failed, len(BACKOFF_INTERVALS))
].total_seconds()
try:
@@ -8,7 +8,7 @@
"integration_type": "service",
"iot_class": "cloud_push",
"requirements": [
"google-cloud-texttospeech==2.36.0",
"google-cloud-speech==2.38.0"
"google-cloud-texttospeech==2.25.1",
"google-cloud-speech==2.31.1"
]
}
@@ -5,5 +5,5 @@
"documentation": "https://www.home-assistant.io/integrations/google_pubsub",
"iot_class": "cloud_push",
"quality_scale": "legacy",
"requirements": ["google-cloud-pubsub==2.38.0"]
"requirements": ["google-cloud-pubsub==2.29.0"]
}
@@ -7,5 +7,5 @@
"integration_type": "service",
"iot_class": "cloud_polling",
"loggers": ["google", "homeassistant.helpers.location"],
"requirements": ["google-maps-routing==0.10.0"]
"requirements": ["google-maps-routing==0.6.15"]
}
-13
View File
@@ -439,19 +439,6 @@ DISCOVERY_SCHEMAS = [
),
allow_multi=True, # also used for climate entity
),
MatterDiscoverySchema(
platform=Platform.SENSOR,
entity_description=MatterSensorEntityDescription(
key="SoilMoistureSensor",
native_unit_of_measurement=PERCENTAGE,
device_class=SensorDeviceClass.MOISTURE,
state_class=SensorStateClass.MEASUREMENT,
),
entity_class=MatterSensor,
required_attributes=(
clusters.SoilMeasurement.Attributes.SoilMoistureMeasuredValue,
),
),
MatterDiscoverySchema(
platform=Platform.SENSOR,
entity_description=MatterSensorEntityDescription(
+1 -1
View File
@@ -1140,7 +1140,7 @@
},
"step": {
"confirm": {
"description": "Home Assistant needs to migrate to MQTT protocol version 5. The currently configured protocol version for broker {broker} is {protocol}. This protocol version is deprecated, and support for it will be removed.\n\nSubmitting this form will attempt to migrate your MQTT broker configuration to use protocol version 5 to fix this issue. If the broker cannot be reached using MQTT protocol version 5, for example because it does not support it, the migration will be aborted.",
"description": "Home Assistant is migrating to MQTT protocol version 5. The currently configured protocol version for broker {broker} is {protocol}. This protocol version is deprecated, and support for it will be removed.\n\nSubmitting this form will try to migrate your MQTT broker configuration to use protocol version 5 to fix this issue.",
"title": "MQTT protocol change required"
}
}
+6 -4
View File
@@ -271,7 +271,7 @@ class MqttValve(MqttEntity, ValveEntity):
self._range, float(position_payload)
)
except ValueError:
_LOGGER.debug(
_LOGGER.warning(
"Ignoring non numeric payload '%s' received on topic '%s'",
position_payload,
msg.topic,
@@ -279,9 +279,9 @@ class MqttValve(MqttEntity, ValveEntity):
else:
percentage_payload = min(max(percentage_payload, 0), 100)
self._attr_current_valve_position = percentage_payload
# Reset opening/closing when a position update is received
# without an explicit opening/closing transitional state.
state = state or RESET_CLOSING_OPENING
# Reset closing and opening if the valve is fully opened or fully closed
if state is None and percentage_payload in (0, 100):
state = RESET_CLOSING_OPENING
position_set = True
if state_payload and state is None and not position_set:
_LOGGER.warning(
@@ -291,6 +291,8 @@ class MqttValve(MqttEntity, ValveEntity):
state_payload,
)
return
if state is None:
return
self._update_state(state)
@callback
@@ -1,12 +1,6 @@
"""The OVHcloud AI Endpoints integration."""
from openai import (
AsyncOpenAI,
AuthenticationError,
BadRequestError,
OpenAIError,
PermissionDeniedError,
)
from openai import AsyncOpenAI, AuthenticationError, BadRequestError, OpenAIError
from openai.types.chat import ChatCompletionUserMessageParam
from homeassistant.config_entries import ConfigEntry
@@ -58,7 +52,7 @@ async def async_setup_entry(
try:
await _validate_api_key(client)
except (AuthenticationError, PermissionDeniedError) as err:
except AuthenticationError as err:
raise ConfigEntryAuthFailed(err) from err
except OpenAIError as err:
raise ConfigEntryNotReady(err) from err
@@ -1,10 +1,9 @@
"""Config flow for the OVHcloud AI Endpoints integration."""
from collections.abc import Mapping
import logging
from typing import Any
from openai import AsyncOpenAI, AuthenticationError, OpenAIError, PermissionDeniedError
from openai import AsyncOpenAI, AuthenticationError, OpenAIError
import voluptuous as vol
from homeassistant.config_entries import (
@@ -31,8 +30,6 @@ from .const import CONF_PROMPT, DOMAIN, RECOMMENDED_CONVERSATION_OPTIONS
_LOGGER = logging.getLogger(__name__)
STEP_REAUTH_DATA_SCHEMA = vol.Schema({vol.Required(CONF_API_KEY): str})
class OVHcloudAIEndpointsConfigFlow(ConfigFlow, domain=DOMAIN):
"""Handle a config flow for OVHcloud AI Endpoints."""
@@ -58,7 +55,7 @@ class OVHcloudAIEndpointsConfigFlow(ConfigFlow, domain=DOMAIN):
client = _create_client(self.hass, user_input[CONF_API_KEY])
try:
await _validate_api_key(client)
except AuthenticationError, PermissionDeniedError:
except AuthenticationError:
errors["base"] = "invalid_auth"
except OpenAIError:
errors["base"] = "cannot_connect"
@@ -80,39 +77,6 @@ class OVHcloudAIEndpointsConfigFlow(ConfigFlow, domain=DOMAIN):
errors=errors,
)
async def async_step_reauth(
self, entry_data: Mapping[str, Any]
) -> ConfigFlowResult:
"""Perform reauth upon an API authentication error."""
return await self.async_step_reauth_confirm()
async def async_step_reauth_confirm(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Confirm reauthentication dialog."""
errors: dict[str, str] = {}
if user_input is not None:
client = _create_client(self.hass, user_input[CONF_API_KEY])
try:
await _validate_api_key(client)
except AuthenticationError, PermissionDeniedError:
errors["base"] = "invalid_auth"
except OpenAIError:
errors["base"] = "cannot_connect"
except Exception:
_LOGGER.exception("Unexpected exception")
errors["base"] = "unknown"
else:
return self.async_update_reload_and_abort(
self._get_reauth_entry(),
data_updates=user_input,
)
return self.async_show_form(
step_id="reauth_confirm",
data_schema=STEP_REAUTH_DATA_SCHEMA,
errors=errors,
)
class ConversationFlowHandler(ConfigSubentryFlow):
"""Handle conversation subentry flow."""
@@ -44,7 +44,7 @@ rules:
status: exempt
comment: the integration only integrates stateless entities
parallel-updates: todo
reauthentication-flow: done
reauthentication-flow: todo
test-coverage: done
# Gold
@@ -1,8 +1,7 @@
{
"config": {
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_service%]",
"reauth_successful": "[%key:common::config_flow::abort::reauth_successful%]"
"already_configured": "[%key:common::config_flow::abort::already_configured_service%]"
},
"error": {
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
@@ -10,15 +9,6 @@
"unknown": "[%key:common::config_flow::error::unknown%]"
},
"step": {
"reauth_confirm": {
"data": {
"api_key": "[%key:common::config_flow::data::api_key%]"
},
"data_description": {
"api_key": "[%key:component::ovhcloud_ai_endpoints::config::step::user::data_description::api_key%]"
},
"description": "The OVHcloud AI Endpoints API key is no longer valid. Please enter a new one."
},
"user": {
"data": {
"api_key": "[%key:common::config_flow::data::api_key%]"
+1 -1
View File
@@ -8,6 +8,6 @@
"iot_class": "local_push",
"loggers": ["wiim.sdk", "async_upnp_client"],
"quality_scale": "bronze",
"requirements": ["wiim==0.1.4"],
"requirements": ["wiim==0.1.2"],
"zeroconf": ["_linkplay._tcp.local."]
}
@@ -349,12 +349,15 @@ class WiimMediaPlayerEntity(WiimBaseEntity, MediaPlayerEntity):
sdk_status_str,
)
else:
self._device.playing_status = sdk_status
if sdk_status == SDKPlayingStatus.STOPPED:
LOGGER.debug(
"Device %s: TransportState is STOPPED."
" Resetting media position and metadata",
self.entity_id,
)
self._device.current_position = 0
self._device.current_track_duration = 0
self._attr_media_position_updated_at = None
self._attr_media_duration = None
self._attr_media_position = None
@@ -63,7 +63,7 @@ class MusicCastDeviceEntity(MusicCastEntity):
},
manufacturer=BRAND,
model=self.coordinator.data.model_name,
sw_version=str(self.coordinator.data.system_version),
sw_version=self.coordinator.data.system_version,
)
if self._zone_id == DEFAULT_ZONE:
+14
View File
@@ -3,5 +3,19 @@
"reload": {
"service": "mdi:reload"
}
},
"triggers": {
"entered": {
"trigger": "mdi:map-marker-plus"
},
"left": {
"trigger": "mdi:map-marker-minus"
},
"occupancy_cleared": {
"trigger": "mdi:account-off"
},
"occupancy_detected": {
"trigger": "mdi:account-group"
}
}
}
@@ -1,8 +1,74 @@
{
"common": {
"trigger_behavior_name": "Trigger when",
"trigger_for_name": "For at least",
"trigger_zone_description": "The zone to trigger on.",
"trigger_zone_name": "Zone"
},
"services": {
"reload": {
"description": "Reloads zones from the YAML-configuration.",
"name": "Reload zones"
}
},
"triggers": {
"entered": {
"description": "Triggers when one or more persons or device trackers enter a zone.",
"fields": {
"behavior": {
"name": "[%key:component::zone::common::trigger_behavior_name%]"
},
"for": {
"name": "[%key:component::zone::common::trigger_for_name%]"
},
"zone": {
"description": "[%key:component::zone::common::trigger_zone_description%]",
"name": "[%key:component::zone::common::trigger_zone_name%]"
}
},
"name": "Entered zone"
},
"left": {
"description": "Triggers when one or more persons or device trackers leave a zone.",
"fields": {
"behavior": {
"name": "[%key:component::zone::common::trigger_behavior_name%]"
},
"for": {
"name": "[%key:component::zone::common::trigger_for_name%]"
},
"zone": {
"description": "[%key:component::zone::common::trigger_zone_description%]",
"name": "[%key:component::zone::common::trigger_zone_name%]"
}
},
"name": "Left zone"
},
"occupancy_cleared": {
"description": "Triggers when a zone transitions from occupied to unoccupied.",
"fields": {
"for": {
"name": "[%key:component::zone::common::trigger_for_name%]"
},
"zone": {
"description": "[%key:component::zone::triggers::occupancy_detected::fields::zone::description%]",
"name": "[%key:component::zone::triggers::occupancy_detected::fields::zone::name%]"
}
},
"name": "Zone occupancy cleared"
},
"occupancy_detected": {
"description": "Triggers when a zone transitions to an occupied state.",
"fields": {
"for": {
"name": "[%key:component::zone::common::trigger_for_name%]"
},
"zone": {
"description": "The zone to monitor.",
"name": "Zone"
}
},
"name": "Zone occupancy detected"
}
}
}
+229 -76
View File
@@ -1,22 +1,26 @@
"""Offer zone automation rules."""
import logging
from typing import TYPE_CHECKING, Any, cast
import voluptuous as vol
from homeassistant.components.device_tracker import ATTR_IN_ZONES
from homeassistant.const import (
ATTR_FRIENDLY_NAME,
CONF_ENTITY_ID,
CONF_EVENT,
CONF_PLATFORM,
CONF_FOR,
CONF_OPTIONS,
CONF_TARGET,
CONF_ZONE,
)
from homeassistant.core import (
CALLBACK_TYPE,
Event,
EventStateChangedData,
HassJob,
HomeAssistant,
State,
callback,
)
from homeassistant.helpers import (
@@ -24,8 +28,18 @@ from homeassistant.helpers import (
entity_registry as er,
location,
)
from homeassistant.helpers.automation import (
DomainSpec,
move_top_level_schema_fields_to_options,
)
from homeassistant.helpers.event import async_track_state_change_event
from homeassistant.helpers.trigger import TriggerActionType, TriggerInfo
from homeassistant.helpers.trigger import (
ENTITY_STATE_TRIGGER_SCHEMA_WITH_BEHAVIOR,
EntityTriggerBase,
Trigger,
TriggerActionRunner,
TriggerConfig,
)
from homeassistant.helpers.typing import ConfigType
from . import condition
@@ -38,93 +52,232 @@ _LOGGER = logging.getLogger(__name__)
_EVENT_DESCRIPTION = {EVENT_ENTER: "entering", EVENT_LEAVE: "leaving"}
_TRIGGER_SCHEMA = cv.TRIGGER_BASE_SCHEMA.extend(
_LEGACY_OPTIONS_SCHEMA: dict[vol.Marker, Any] = {
vol.Required(CONF_ENTITY_ID): cv.entity_ids_or_uuids,
vol.Required(CONF_ZONE): cv.entity_id,
vol.Required(CONF_EVENT, default=DEFAULT_EVENT): vol.Any(EVENT_ENTER, EVENT_LEAVE),
}
_LEGACY_TRIGGER_OPTIONS_SCHEMA = vol.Schema(
{
vol.Required(CONF_PLATFORM): "zone",
vol.Required(CONF_ENTITY_ID): cv.entity_ids_or_uuids,
vol.Required(CONF_ZONE): cv.entity_id,
vol.Required(CONF_EVENT, default=DEFAULT_EVENT): vol.Any(
EVENT_ENTER, EVENT_LEAVE
),
vol.Required(CONF_OPTIONS): _LEGACY_OPTIONS_SCHEMA,
},
)
# New-style zone trigger schema
_ZONE_TRIGGER_SCHEMA = ENTITY_STATE_TRIGGER_SCHEMA_WITH_BEHAVIOR.extend(
{
vol.Required(CONF_OPTIONS): {
vol.Required(CONF_ZONE): cv.entity_domain("zone"),
},
}
)
async def async_validate_trigger_config(
hass: HomeAssistant, config: ConfigType
) -> ConfigType:
"""Validate trigger config."""
config = _TRIGGER_SCHEMA(config)
registry = er.async_get(hass)
config[CONF_ENTITY_ID] = er.async_validate_entity_ids(
registry, config[CONF_ENTITY_ID]
)
return config
_DOMAIN_SPECS: dict[str, DomainSpec] = {
"person": DomainSpec(),
"device_tracker": DomainSpec(),
}
async def async_attach_trigger(
hass: HomeAssistant,
config: ConfigType,
action: TriggerActionType,
trigger_info: TriggerInfo,
*,
platform_type: str = "zone",
) -> CALLBACK_TYPE:
"""Listen for state changes based on configuration."""
trigger_data = trigger_info["trigger_data"]
entity_id: list[str] = config[CONF_ENTITY_ID]
zone_entity_id: str = config[CONF_ZONE]
event: str = config[CONF_EVENT]
job = HassJob(action)
class LegacyZoneTrigger(Trigger):
"""Legacy zone trigger (platform: zone)."""
@callback
def zone_automation_listener(zone_event: Event[EventStateChangedData]) -> None:
"""Listen for state changes and calls action."""
entity = zone_event.data["entity_id"]
from_s = zone_event.data["old_state"]
to_s = zone_event.data["new_state"]
@classmethod
async def async_validate_complete_config(
cls, hass: HomeAssistant, complete_config: ConfigType
) -> ConfigType:
"""Validate complete config, migrating legacy format to options."""
complete_config = move_top_level_schema_fields_to_options(
complete_config, _LEGACY_OPTIONS_SCHEMA
)
return await super().async_validate_complete_config(hass, complete_config)
if (from_s and not location.has_location(from_s)) or (
to_s and not location.has_location(to_s)
):
return
@classmethod
async def async_validate_config(
cls, hass: HomeAssistant, config: ConfigType
) -> ConfigType:
"""Validate config."""
config = cast(ConfigType, _LEGACY_TRIGGER_OPTIONS_SCHEMA(config))
registry = er.async_get(hass)
config[CONF_OPTIONS][CONF_ENTITY_ID] = er.async_validate_entity_ids(
registry, config[CONF_OPTIONS][CONF_ENTITY_ID]
)
return config
if not (zone_state := hass.states.get(zone_entity_id)):
_LOGGER.warning(
(
"Automation '%s' is referencing non-existing zone '%s' in a zone"
" trigger"
),
trigger_info["name"],
zone_entity_id,
def __init__(self, hass: HomeAssistant, config: TriggerConfig) -> None:
"""Initialize trigger."""
super().__init__(hass, config)
if TYPE_CHECKING:
assert config.options is not None
self._options = config.options
async def async_attach_runner(
self, run_action: TriggerActionRunner
) -> CALLBACK_TYPE:
"""Listen for state changes based on configuration."""
entity_id: list[str] = self._options[CONF_ENTITY_ID]
zone_entity_id: str = self._options[CONF_ZONE]
event: str = self._options[CONF_EVENT]
@callback
def zone_automation_listener(zone_event: Event[EventStateChangedData]) -> None:
"""Listen for state changes and calls action."""
entity = zone_event.data["entity_id"]
from_s = zone_event.data["old_state"]
to_s = zone_event.data["new_state"]
if (from_s and not location.has_location(from_s)) or (
to_s and not location.has_location(to_s)
):
return
if not (zone_state := self._hass.states.get(zone_entity_id)):
_LOGGER.warning(
"Non-existing zone '%s' in a zone trigger",
zone_entity_id,
)
return
from_match = (
condition.zone(self._hass, zone_state, from_s) if from_s else False
)
return
to_match = condition.zone(self._hass, zone_state, to_s) if to_s else False
from_match = condition.zone(hass, zone_state, from_s) if from_s else False
to_match = condition.zone(hass, zone_state, to_s) if to_s else False
if (event == EVENT_ENTER and not from_match and to_match) or (
event == EVENT_LEAVE and from_match and not to_match
):
description = (
f"{entity} {_EVENT_DESCRIPTION[event]}"
f" {zone_state.attributes[ATTR_FRIENDLY_NAME]}"
)
hass.async_run_hass_job(
job,
{
"trigger": {
**trigger_data,
"platform": platform_type,
if (event == EVENT_ENTER and not from_match and to_match) or (
event == EVENT_LEAVE and from_match and not to_match
):
description = f"{entity} {_EVENT_DESCRIPTION[event]} {zone_state.attributes[ATTR_FRIENDLY_NAME]}"
run_action(
{
"entity_id": entity,
"from_state": from_s,
"to_state": to_s,
"zone": zone_state,
"event": event,
"description": description,
}
},
to_s.context if to_s else None,
)
},
description,
to_s.context if to_s else None,
)
return async_track_state_change_event(hass, entity_id, zone_automation_listener)
return async_track_state_change_event(
self._hass, entity_id, zone_automation_listener
)
class ZoneTriggerBase(EntityTriggerBase):
"""Base for zone-based triggers targeting person and device_tracker entities."""
_domain_specs = _DOMAIN_SPECS
_schema = _ZONE_TRIGGER_SCHEMA
def __init__(self, hass: HomeAssistant, config: TriggerConfig) -> None:
"""Initialize the trigger."""
super().__init__(hass, config)
self._zone: str = self._options[CONF_ZONE]
def _in_target_zone(self, state: State) -> bool:
"""Check if the entity is in the selected zone."""
in_zones = state.attributes.get(ATTR_IN_ZONES) or ()
return self._zone in in_zones
class EnteredZoneTrigger(ZoneTriggerBase):
"""Trigger when an entity enters the selected zone."""
def is_valid_transition(self, from_state: State, to_state: State) -> bool:
"""Check that the entity was not already in the selected zone."""
return not self._in_target_zone(from_state)
def is_valid_state(self, state: State) -> bool:
"""Check that the entity is now in the selected zone."""
return self._in_target_zone(state)
class LeftZoneTrigger(ZoneTriggerBase):
"""Trigger when an entity leaves the selected zone."""
def is_valid_transition(self, from_state: State, to_state: State) -> bool:
"""Check that the entity was previously in the selected zone."""
return self._in_target_zone(from_state)
def is_valid_state(self, state: State) -> bool:
"""Check that the entity is no longer in the selected zone."""
return not self._in_target_zone(state)
_OCCUPANCY_TRIGGER_SCHEMA = vol.Schema(
{
vol.Required(CONF_OPTIONS, default={}): {
vol.Required(CONF_ZONE): cv.entity_id,
vol.Optional(CONF_FOR): cv.positive_time_period,
},
}
)
class _ZoneOccupancyTriggerBase(EntityTriggerBase):
"""Base for zone occupancy triggers (single zone, no behavior)."""
_domain_specs = {"zone": DomainSpec()}
_schema = _OCCUPANCY_TRIGGER_SCHEMA
@classmethod
async def async_validate_config(
cls, hass: HomeAssistant, config: ConfigType
) -> ConfigType:
"""Validate config and synthesize a target from the zone option."""
config = cast(ConfigType, cls._schema(config))
config[CONF_TARGET] = {CONF_ENTITY_ID: [config[CONF_OPTIONS][CONF_ZONE]]}
return config
@staticmethod
def _occupancy_count(state: State) -> int | None:
"""Return the zone's persons-in-zone count; None if unparsable."""
try:
return int(state.state)
except TypeError, ValueError:
return None
@classmethod
def _is_occupied(cls, state: State) -> bool:
"""Return True if the zone has at least one occupant."""
count = cls._occupancy_count(state)
return count is not None and count >= 1
class OccupancyDetectedTrigger(_ZoneOccupancyTriggerBase):
"""Trigger when a zone transitions to an occupied state."""
def is_valid_state(self, state: State) -> bool:
"""Check that the zone is occupied."""
return self._is_occupied(state)
def is_valid_transition(self, from_state: State, to_state: State) -> bool:
"""Check that the zone was previously not occupied."""
return not self._is_occupied(from_state)
class OccupancyClearedTrigger(_ZoneOccupancyTriggerBase):
"""Trigger when a zone transitions from occupied to unoccupied."""
def is_valid_state(self, state: State) -> bool:
"""Check that the zone is empty (count == 0)."""
return self._occupancy_count(state) == 0
def is_valid_transition(self, from_state: State, to_state: State) -> bool:
"""Check that the zone was previously occupied."""
return self._is_occupied(from_state)
TRIGGERS: dict[str, type[Trigger]] = {
"_": LegacyZoneTrigger,
"entered": EnteredZoneTrigger,
"left": LeftZoneTrigger,
"occupancy_detected": OccupancyDetectedTrigger,
"occupancy_cleared": OccupancyClearedTrigger,
}
async def async_get_triggers(hass: HomeAssistant) -> dict[str, type[Trigger]]:
"""Return the triggers for zones."""
return TRIGGERS
@@ -0,0 +1,42 @@
.trigger_zone: &trigger_zone
target:
entity:
domain:
- person
- device_tracker
fields:
behavior:
required: true
default: each
selector:
automation_behavior:
mode: trigger
for:
required: true
default: 00:00:00
selector:
duration:
zone:
required: true
selector:
entity:
domain: zone
entered: *trigger_zone
left: *trigger_zone
.trigger_occupancy: &trigger_occupancy
fields:
for:
required: true
default: 00:00:00
selector:
duration:
zone:
required: true
selector:
entity:
domain: zone
occupancy_detected: *trigger_occupancy
occupancy_cleared: *trigger_occupancy
+1 -1
View File
@@ -14,7 +14,7 @@ if TYPE_CHECKING:
APPLICATION_NAME: Final = "HomeAssistant"
MAJOR_VERSION: Final = 2026
MINOR_VERSION: Final = 7
MINOR_VERSION: Final = 6
PATCH_VERSION: Final = "0.dev0"
__short_version__: Final = f"{MAJOR_VERSION}.{MINOR_VERSION}"
__version__: Final = f"{__short_version__}.{PATCH_VERSION}"
+8 -1
View File
@@ -1714,7 +1714,14 @@ def async_extract_entities(trigger_conf: dict) -> list[str]:
return [trigger_conf[CONF_OPTIONS][CONF_ENTITY_ID]]
if trigger_conf[CONF_PLATFORM] == "zone":
return trigger_conf[CONF_ENTITY_ID] + [trigger_conf[CONF_ZONE]] # type: ignore[no-any-return]
options = trigger_conf[CONF_OPTIONS]
return [*options[CONF_ENTITY_ID], options[CONF_ZONE]]
if trigger_conf[CONF_PLATFORM] in ("zone.entered", "zone.left"):
return [
*async_extract_targets(trigger_conf, CONF_ENTITY_ID),
trigger_conf[CONF_OPTIONS][CONF_ZONE],
]
if trigger_conf[CONF_PLATFORM] == "geo_location":
return [trigger_conf[CONF_ZONE]]
+1 -1
View File
@@ -148,7 +148,7 @@ iso4217!=1.10.20220401
# protobuf must be in package constraints for the wheel
# builder to build binary wheels
protobuf==7.34.1
protobuf==6.32.0
# faust-cchardet: Ensure we have a version we can build wheels
# 2.1.18 is the first version that works with our wheel builder
+1 -1
View File
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "homeassistant"
version = "2026.7.0.dev0"
version = "2026.6.0.dev0"
license = "Apache-2.0"
license-files = ["LICENSE*", "homeassistant/backports/LICENSE*"]
description = "Open-source home automation platform running on Python 3."
+5 -5
View File
@@ -1122,19 +1122,19 @@ goodwe==0.4.10
google-api-python-client==2.71.0
# homeassistant.components.google_pubsub
google-cloud-pubsub==2.38.0
google-cloud-pubsub==2.29.0
# homeassistant.components.google_cloud
google-cloud-speech==2.38.0
google-cloud-speech==2.31.1
# homeassistant.components.google_cloud
google-cloud-texttospeech==2.36.0
google-cloud-texttospeech==2.25.1
# homeassistant.components.google_generative_ai_conversation
google-genai==1.59.0
# homeassistant.components.google_travel_time
google-maps-routing==0.10.0
google-maps-routing==0.6.15
# homeassistant.components.nest
google-nest-sdm==9.1.2
@@ -3357,7 +3357,7 @@ whois==0.9.27
wiffi==1.1.2
# homeassistant.components.wiim
wiim==0.1.4
wiim==0.1.2
# homeassistant.components.wirelesstag
wirelesstagpy==0.8.1
+1 -1
View File
@@ -46,7 +46,7 @@ types-caldav==1.3.0.20250516
types-chardet==0.1.5
types-decorator==5.2.0.20260408
types-pexpect==4.9.0.20260408
types-protobuf==7.34.1.20260408
types-protobuf==6.32.1.20260221
types-psutil==7.2.2.20260408
types-pyserial==3.5.0.20260408
types-python-dateutil==2.9.0.20260408
+1 -1
View File
@@ -132,7 +132,7 @@ iso4217!=1.10.20220401
# protobuf must be in package constraints for the wheel
# builder to build binary wheels
protobuf==7.34.1
protobuf==6.32.0
# faust-cchardet: Ensure we have a version we can build wheels
# 2.1.18 is the first version that works with our wheel builder
-30
View File
@@ -2088,36 +2088,6 @@ async def test_receive_backup_path_traversal(
assert resp.status == 400
@pytest.mark.parametrize(
"name",
[
"/absolute/path",
"../parent",
"with/slash",
],
)
async def test_receive_backup_rejects_unsafe_inner_name(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
name: str,
) -> None:
"""Test receive backup rejects an inner name that would escape the backup dir."""
await setup_backup_integration(hass)
client = await hass_client()
backup = replace(TEST_BACKUP_ABC123, name=name)
with patch(
"homeassistant.components.backup.manager.read_backup",
return_value=backup,
):
resp = await client.post(
"/api/backup/upload?agent_id=backup.local",
data={"file": StringIO("test")},
)
assert resp.status == 400
async def test_receive_backup_busy_manager(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
-32
View File
@@ -14,7 +14,6 @@ import pytest
import securetar
from homeassistant.components.backup import DOMAIN, AddonInfo, AgentBackup, Folder
from homeassistant.components.backup.models import InvalidBackupFilename
from homeassistant.components.backup.util import (
DecryptedBackupStreamer,
EncryptedBackupStreamer,
@@ -159,37 +158,6 @@ def test_read_backup(backup_json_content: bytes, expected_backup: AgentBackup) -
assert backup == expected_backup
@pytest.mark.parametrize(
"name",
[
"/absolute/path",
"../parent",
"with/slash",
"with\\backslash",
"C:\\drive\\path",
"",
".",
"..",
],
)
def test_read_backup_rejects_unsafe_name(name: str) -> None:
"""Test that read_backup rejects names that could escape the backup directory."""
backup_json_content = (
b'{"compressed":true,"date":"2024-12-02T07:23:58.261875-05:00","homeassistant":'
b'{"exclude_database":true,"version":"2024.12.0.dev0"},"name":"'
+ name.encode().replace(b"\\", b"\\\\")
+ b'","protected":true,"slug":"455645fe","type":"partial","version":2}'
)
mock_path = Mock()
mock_path.stat.return_value.st_size = 1234
with patch("homeassistant.components.backup.util.tarfile.open") as mock_open_tar:
tar_ctx = mock_open_tar.return_value.__enter__.return_value
tar_ctx.extractfile.return_value.read.return_value = backup_json_content
with pytest.raises(InvalidBackupFilename):
read_backup(mock_path)
@pytest.mark.parametrize(
("backup", "password", "validation_result", "expected_messages"),
[
+29 -10
View File
@@ -1586,12 +1586,12 @@ async def _validate_trigger_options(
options: dict[str, Any] | None,
*,
valid: bool,
supports_target: bool = True,
) -> None:
"""Assert that a trigger accepts or rejects the given options during validation."""
trigger_config: dict[str, Any] = {
CONF_PLATFORM: trigger,
CONF_TARGET: {ATTR_LABEL_ID: "test_label"},
}
trigger_config: dict[str, Any] = {CONF_PLATFORM: trigger}
if supports_target:
trigger_config[CONF_TARGET] = {ATTR_LABEL_ID: "test_label"}
if options is not None:
trigger_config[CONF_OPTIONS] = options
if valid:
@@ -1608,6 +1608,7 @@ async def assert_trigger_options_supported(
*,
supports_behavior: bool,
supports_duration: bool,
supports_target: bool = True,
) -> None:
"""Assert which options a trigger supports.
@@ -1624,9 +1625,15 @@ async def assert_trigger_options_supported(
# Minimal config should always be valid
supports_empty = not bool(base_options)
await _validate_trigger_options(hass, trigger, None, valid=supports_empty)
await _validate_trigger_options(hass, trigger, {}, valid=supports_empty)
await _validate_trigger_options(hass, trigger, base_options, valid=True)
await _validate_trigger_options(
hass, trigger, None, valid=supports_empty, supports_target=supports_target
)
await _validate_trigger_options(
hass, trigger, {}, valid=supports_empty, supports_target=supports_target
)
await _validate_trigger_options(
hass, trigger, base_options, valid=True, supports_target=supports_target
)
def _merge(extra: dict[str, Any]) -> dict[str, Any]:
return {**(base_options or {}), **extra}
@@ -1634,18 +1641,30 @@ async def assert_trigger_options_supported(
# Behavior
for behavior in ("each", "first", "all"):
await _validate_trigger_options(
hass, trigger, _merge({"behavior": behavior}), valid=supports_behavior
hass,
trigger,
_merge({"behavior": behavior}),
valid=supports_behavior,
supports_target=supports_target,
)
# Duration
for for_value in ({"seconds": 5}, "00:00:05", 5):
await _validate_trigger_options(
hass, trigger, _merge({"for": for_value}), valid=supports_duration
hass,
trigger,
_merge({"for": for_value}),
valid=supports_duration,
supports_target=supports_target,
)
# Unknown option should always be rejected
await _validate_trigger_options(
hass, trigger, _merge({"unknown_option": True}), valid=False
hass,
trigger,
_merge({"unknown_option": True}),
valid=False,
supports_target=supports_target,
)
@@ -1379,31 +1379,6 @@ def test_base_tracker_entity() -> None:
entity.state_attributes # noqa: B018
def test_battery_level_override_deprecation_warning(
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test that overriding battery_level in a subclass logs a warning."""
error_message = "is overriding the deprecated battery_level property"
caplog.clear()
class _SubclassWithOverride(TrackerEntity):
@property
def battery_level(self) -> int | None:
return 50
assert error_message in caplog.text
assert _SubclassWithOverride.__name__ in caplog.text
# No warning for a subclass that does not override battery_level
caplog.clear()
class _SubclassWithoutOverride(TrackerEntity):
pass
assert error_message not in caplog.text
async def test_attr_location_name_deprecation_warning(
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
-1
View File
@@ -78,7 +78,6 @@ FIXTURES = [
"mock_pressure_sensor",
"mock_pump",
"mock_room_airconditioner",
"mock_soil_sensor",
"mock_solar_inverter",
"mock_speaker",
"mock_switch_unit",
@@ -1,87 +0,0 @@
{
"node_id": 101,
"date_commissioned": "2024-11-27T00:00:00.000000",
"last_interview": "2024-11-27T00:00:00.000000",
"interview_version": 2,
"attributes": {
"0/29/0": [
{
"0": 22,
"1": 1
}
],
"0/29/1": [
4, 29, 31, 40, 42, 43, 44, 48, 49, 50, 51, 52, 53, 54, 55, 59, 60, 62, 63,
64, 65
],
"0/29/2": [41],
"0/29/3": [1],
"0/29/65532": 0,
"0/29/65533": 1,
"0/29/65528": [],
"0/29/65529": [],
"0/29/65531": [0, 1, 2, 3, 65528, 65529, 65531, 65532, 65533],
"0/40/0": 1,
"0/40/1": "Nabu Casa",
"0/40/2": 65521,
"0/40/3": "Mock SoilSensor",
"0/40/4": 32768,
"0/40/5": "Mock Soil Sensor",
"0/40/6": "XX",
"0/40/7": 0,
"0/40/8": "v1.0",
"0/40/9": 1,
"0/40/10": "v1.0",
"0/40/11": "20241127",
"0/40/12": "",
"0/40/13": "",
"0/40/14": "",
"0/40/15": "TEST_SN",
"0/40/16": false,
"0/40/17": true,
"0/40/18": "mock-soil-sensor",
"0/40/19": {
"0": 3,
"1": 3
},
"0/40/65532": 0,
"0/40/65533": 1,
"0/40/65528": [],
"0/40/65529": [],
"0/40/65531": [
0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
65528, 65529, 65531, 65532, 65533
],
"1/3/65529": [0, 64],
"1/3/65531": [0, 1, 65528, 65529, 65531, 65532, 65533],
"1/29/0": [
{
"0": 69,
"1": 1
}
],
"1/29/1": [3, 29, 57, 1072, 40],
"1/29/2": [],
"1/29/3": [9, 10],
"1/29/65532": null,
"1/29/65533": 1,
"1/29/65528": [],
"1/29/65529": [],
"1/29/65531": [0, 1, 2, 3, 65528, 65529, 65531, 65532, 65533],
"1/1072/0": {
"0": 17,
"1": true,
"2": 0,
"3": 100,
"4": []
},
"1/1072/1": 50,
"1/1072/65532": 0,
"1/1072/65533": 1,
"1/1072/65528": [],
"1/1072/65529": [],
"1/1072/65531": [0, 1, 65528, 65529, 65531, 65532, 65533]
},
"available": true,
"attribute_subscriptions": []
}
@@ -18619,61 +18619,6 @@
'state': '0.0',
})
# ---
# name: test_sensors[mock_soil_sensor][sensor.mock_soil_sensor_moisture-entry]
EntityRegistryEntrySnapshot({
'aliases': list([
None,
]),
'area_id': None,
'capabilities': dict({
'state_class': <SensorStateClass.MEASUREMENT: 'measurement'>,
}),
'config_entry_id': <ANY>,
'config_subentry_id': <ANY>,
'device_class': None,
'device_id': <ANY>,
'disabled_by': None,
'domain': 'sensor',
'entity_category': None,
'entity_id': 'sensor.mock_soil_sensor_moisture',
'has_entity_name': True,
'hidden_by': None,
'icon': None,
'id': <ANY>,
'labels': set({
}),
'name': None,
'object_id_base': 'Moisture',
'options': dict({
}),
'original_device_class': <SensorDeviceClass.MOISTURE: 'moisture'>,
'original_icon': None,
'original_name': 'Moisture',
'platform': 'matter',
'previous_unique_id': None,
'suggested_object_id': None,
'supported_features': 0,
'translation_key': None,
'unique_id': '00000000000004D2-0000000000000065-MatterNodeDevice-1-SoilMoistureSensor-1072-1',
'unit_of_measurement': '%',
})
# ---
# name: test_sensors[mock_soil_sensor][sensor.mock_soil_sensor_moisture-state]
StateSnapshot({
'attributes': ReadOnlyDict({
'device_class': 'moisture',
'friendly_name': 'Mock Soil Sensor Moisture',
'state_class': <SensorStateClass.MEASUREMENT: 'measurement'>,
'unit_of_measurement': '%',
}),
'context': <ANY>,
'entity_id': 'sensor.mock_soil_sensor_moisture',
'last_changed': <ANY>,
'last_reported': <ANY>,
'last_updated': <ANY>,
'state': '50',
})
# ---
# name: test_sensors[mock_solar_inverter][sensor.mock_solar_inverter_active_current-entry]
EntityRegistryEntrySnapshot({
'aliases': list([
-19
View File
@@ -85,25 +85,6 @@ async def test_humidity_sensor(
assert state.state == "40.0"
@pytest.mark.parametrize("node_fixture", ["mock_soil_sensor"])
async def test_soil_moisture_sensor(
hass: HomeAssistant,
matter_client: MagicMock,
matter_node: MatterNode,
) -> None:
"""Test soil moisture sensor."""
state = hass.states.get("sensor.mock_soil_sensor_moisture")
assert state
assert state.state == "50"
set_node_attribute(matter_node, 1, 1072, 1, 75)
await trigger_subscription_callback(hass, matter_client)
state = hass.states.get("sensor.mock_soil_sensor_moisture")
assert state
assert state.state == "75"
@pytest.mark.parametrize("node_fixture", ["mock_light_sensor"])
async def test_light_sensor(
hass: HomeAssistant,
+5 -7
View File
@@ -270,7 +270,7 @@ async def test_state_via_state_topic_through_position(
) -> None:
"""Test the controlling state via topic through position.
Test it is still possible to process a `opening` or `closing`
Test is still possible to process a `opening` or `closing`
state update. Additional we test json messages can be
processed containing both position and state. Incoming
rendered positions are clamped between 0..100.
@@ -308,7 +308,7 @@ async def test_opening_closing_state_is_reset(
) -> None:
"""Test the controlling state via topic through position.
Test an `opening` or `closing` state update is reset
Test a `opening` or `closing` state update is reset
correctly after sequential updates.
"""
await mqtt_mock_entry()
@@ -320,13 +320,11 @@ async def test_opening_closing_state_is_reset(
messages = [
('{"position": 0, "state": "opening"}', ValveState.OPENING, 0),
('{"position": 50, "state": "opening"}', ValveState.OPENING, 50),
# Position-only update at intermediate position resets opening state
('{"position": 60}', ValveState.OPEN, 60),
('{"position": 60}', ValveState.OPENING, 60),
('{"position": 100, "state": "opening"}', ValveState.OPENING, 100),
('{"position": 100, "state": null}', ValveState.OPEN, 100),
('{"position": 90, "state": "closing"}', ValveState.CLOSING, 90),
# Position-only update at intermediate position resets closing state
('{"position": 40}', ValveState.OPEN, 40),
('{"position": 40}', ValveState.CLOSING, 40),
('{"position": 0}', ValveState.CLOSED, 0),
('{"position": 10}', ValveState.OPEN, 10),
('{"position": 0, "state": "opening"}', ValveState.OPENING, 0),
@@ -440,7 +438,7 @@ async def test_state_via_state_trough_position_with_alt_range(
asserted_state: str,
valve_position: int | None,
) -> None:
"""Test controlling state via position with an alternative range.
"""Test controlling state via position with alternative range.
Test is still possible to process a `opening` or `closing`
state update. Additional we test json messages can be
@@ -246,75 +246,3 @@ async def test_subentry_entry_not_loaded(
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "entry_not_loaded"
async def test_reauth_flow(
hass: HomeAssistant,
mock_openai_client: AsyncMock,
mock_config_entry: MockConfigEntry,
) -> None:
"""Test the reauth flow updates the API key."""
mock_config_entry.add_to_hass(hass)
result = await mock_config_entry.start_reauth_flow(hass)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "reauth_confirm"
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{CONF_API_KEY: "new_key"},
)
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "reauth_successful"
assert mock_config_entry.data[CONF_API_KEY] == "new_key"
@pytest.mark.parametrize(
("exception", "error"),
[
(
AuthenticationError(
message="invalid key",
response=httpx.Response(
status_code=401,
request=httpx.Request(method="POST", url="https://example.com"),
),
body=None,
),
"invalid_auth",
),
(OpenAIError("boom"), "cannot_connect"),
(Exception("boom"), "unknown"),
],
)
async def test_reauth_flow_errors(
hass: HomeAssistant,
mock_openai_client: AsyncMock,
mock_config_entry: MockConfigEntry,
exception: Exception,
error: str,
) -> None:
"""Test errors during reauth and recovery."""
mock_config_entry.add_to_hass(hass)
result = await mock_config_entry.start_reauth_flow(hass)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "reauth_confirm"
mock_openai_client.chat.completions.create.side_effect = exception
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{CONF_API_KEY: "new_key"},
)
assert result["type"] is FlowResultType.FORM
assert result["errors"] == {"base": error}
mock_openai_client.chat.completions.create.side_effect = None
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{CONF_API_KEY: "new_key"},
)
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "reauth_successful"
assert mock_config_entry.data[CONF_API_KEY] == "new_key"
-1
View File
@@ -38,6 +38,5 @@ async def fire_transport_update(
"""Trigger the registered AVTransport callback on the mock device."""
assert mock_device.av_transport_event_callback is not None
mock_device.event_data = {"TransportState": transport_state.value}
mock_device.playing_status = transport_state
mock_device.av_transport_event_callback(MagicMock(), [])
await hass.async_block_till_done()
+411 -9
View File
@@ -1,14 +1,36 @@
"""The tests for the location automation."""
from datetime import timedelta
from typing import Any
from freezegun.api import FrozenDateTimeFactory
import pytest
import voluptuous as vol
from homeassistant.components import automation, zone
from homeassistant.const import ATTR_ENTITY_ID, ENTITY_MATCH_ALL, SERVICE_TURN_OFF
from homeassistant.const import (
ATTR_ENTITY_ID,
ENTITY_MATCH_ALL,
SERVICE_TURN_OFF,
STATE_UNAVAILABLE,
STATE_UNKNOWN,
)
from homeassistant.core import Context, HomeAssistant, ServiceCall
from homeassistant.helpers import entity_registry as er
from homeassistant.helpers.trigger import async_validate_trigger_config
from homeassistant.setup import async_setup_component
from tests.common import mock_component
from tests.common import async_fire_time_changed, mock_component
from tests.components.common import (
TriggerStateDescription,
assert_trigger_behavior_all,
assert_trigger_behavior_each,
assert_trigger_behavior_first,
assert_trigger_options_supported,
parametrize_target_entities,
parametrize_trigger_states,
target_entities,
)
@pytest.fixture(autouse=True)
@@ -343,10 +365,7 @@ async def test_unknown_zone(
},
)
assert (
"Automation 'My Automation' is referencing non-existing zone"
" 'zone.no_such_zone' in a zone trigger" not in caplog.text
)
assert "Non-existing zone 'zone.no_such_zone' in a zone trigger" not in caplog.text
hass.states.async_set(
"test.entity",
@@ -356,7 +375,390 @@ async def test_unknown_zone(
)
await hass.async_block_till_done()
assert (
"Automation 'My Automation' is referencing non-existing zone"
" 'zone.no_such_zone' in a zone trigger" in caplog.text
assert "Non-existing zone 'zone.no_such_zone' in a zone trigger" in caplog.text
# --- New-style zone trigger tests ---
ZONE_HOME = "zone.home"
ZONE_WORK = "zone.work"
IN_ZONES_HOME = {"in_zones": [ZONE_HOME]}
IN_ZONES_WORK = {"in_zones": [ZONE_WORK]}
IN_ZONES_NONE: dict[str, list[str]] = {"in_zones": []}
TRIGGER_ZONE = ZONE_HOME
@pytest.mark.parametrize(
("trigger_key", "base_options", "supports_behavior", "supports_duration"),
[
("zone.entered", {"zone": TRIGGER_ZONE}, True, True),
("zone.left", {"zone": TRIGGER_ZONE}, True, True),
],
)
async def test_zone_trigger_options_validation(
hass: HomeAssistant,
trigger_key: str,
base_options: dict[str, Any] | None,
supports_behavior: bool,
supports_duration: bool,
) -> None:
"""Test that zone triggers support the expected options."""
await assert_trigger_options_supported(
hass,
trigger_key,
base_options,
supports_behavior=supports_behavior,
supports_duration=supports_duration,
)
@pytest.mark.parametrize("trigger_key", ["zone.entered", "zone.left"])
async def test_zone_trigger_rejects_non_zone_entity_id(
hass: HomeAssistant, trigger_key: str
) -> None:
"""Test that the zone option must reference entities in the zone domain."""
with pytest.raises(vol.Invalid):
await async_validate_trigger_config(
hass,
[
{
"platform": trigger_key,
"target": {"entity_id": "person.alice"},
"options": {"zone": "person.alice"},
}
],
)
@pytest.fixture
async def target_zone_entities(
hass: HomeAssistant, domain: str
) -> dict[str, list[str]]:
"""Create multiple zone-trackable entities associated with different targets."""
return await target_entities(hass, domain, domain_excluded="sensor")
_ZONE_TRIGGER_STATES = [
*parametrize_trigger_states(
trigger="zone.entered",
trigger_options={"zone": TRIGGER_ZONE},
target_states=[
("home", IN_ZONES_HOME),
],
other_states=[
("not_home", IN_ZONES_NONE),
("Work", IN_ZONES_WORK),
],
),
*parametrize_trigger_states(
trigger="zone.left",
trigger_options={"zone": TRIGGER_ZONE},
target_states=[
("not_home", IN_ZONES_NONE),
("Work", IN_ZONES_WORK),
],
other_states=[
("home", IN_ZONES_HOME),
],
),
]
def _parametrize_zone_target_entities() -> list[tuple[dict[str, Any], str, int, str]]:
"""Parametrize target entities for all supported zone trigger domains."""
return [
(*params, domain)
for domain in ("person", "device_tracker")
for params in parametrize_target_entities(domain)
]
@pytest.mark.parametrize(
("trigger_target_config", "entity_id", "entities_in_target", "domain"),
_parametrize_zone_target_entities(),
)
@pytest.mark.parametrize(
("trigger", "trigger_options", "states"),
_ZONE_TRIGGER_STATES,
)
async def test_zone_trigger_behavior_each(
hass: HomeAssistant,
target_zone_entities: dict[str, list[str]],
trigger_target_config: dict[str, Any],
entity_id: str,
entities_in_target: int,
trigger: str,
trigger_options: dict[str, Any],
states: list[TriggerStateDescription],
) -> None:
"""Test zone triggers fire when any targeted entity changes."""
await assert_trigger_behavior_each(
hass,
target_entities=target_zone_entities,
trigger_target_config=trigger_target_config,
entity_id=entity_id,
entities_in_target=entities_in_target,
trigger=trigger,
trigger_options=trigger_options,
states=states,
)
@pytest.mark.parametrize(
("trigger_target_config", "entity_id", "entities_in_target", "domain"),
_parametrize_zone_target_entities(),
)
@pytest.mark.parametrize(
("trigger", "trigger_options", "states"),
_ZONE_TRIGGER_STATES,
)
async def test_zone_trigger_behavior_first(
hass: HomeAssistant,
target_zone_entities: dict[str, list[str]],
trigger_target_config: dict[str, Any],
entity_id: str,
entities_in_target: int,
trigger: str,
trigger_options: dict[str, Any],
states: list[TriggerStateDescription],
) -> None:
"""Test zone triggers fire when first targeted entity changes."""
await assert_trigger_behavior_first(
hass,
target_entities=target_zone_entities,
trigger_target_config=trigger_target_config,
entity_id=entity_id,
entities_in_target=entities_in_target,
trigger=trigger,
trigger_options=trigger_options,
states=states,
)
@pytest.mark.parametrize(
("trigger_target_config", "entity_id", "entities_in_target", "domain"),
_parametrize_zone_target_entities(),
)
@pytest.mark.parametrize(
("trigger", "trigger_options", "states"),
_ZONE_TRIGGER_STATES,
)
async def test_zone_trigger_behavior_all(
hass: HomeAssistant,
target_zone_entities: dict[str, list[str]],
trigger_target_config: dict[str, Any],
entity_id: str,
entities_in_target: int,
trigger: str,
trigger_options: dict[str, Any],
states: list[TriggerStateDescription],
) -> None:
"""Test zone triggers fire when last targeted entity changes."""
await assert_trigger_behavior_all(
hass,
target_entities=target_zone_entities,
trigger_target_config=trigger_target_config,
entity_id=entity_id,
entities_in_target=entities_in_target,
trigger=trigger,
trigger_options=trigger_options,
states=states,
)
# --- Zone occupancy trigger tests ---
@pytest.mark.usefixtures("enable_labs_preview_features")
@pytest.mark.parametrize(
("trigger_key"),
["zone.occupancy_detected", "zone.occupancy_cleared"],
)
async def test_zone_occupancy_trigger_options_validation(
hass: HomeAssistant,
trigger_key: str,
) -> None:
"""Test that occupancy triggers support the expected options."""
await assert_trigger_options_supported(
hass,
trigger_key,
{"zone": ZONE_HOME},
supports_behavior=False,
supports_duration=True,
supports_target=False,
)
@pytest.mark.usefixtures("enable_labs_preview_features")
@pytest.mark.parametrize(
("trigger_key", "from_state", "to_state", "should_fire"),
[
# occupancy_detected
pytest.param("zone.occupancy_detected", "0", "1", True, id="detected_0_to_1"),
pytest.param("zone.occupancy_detected", "0", "3", True, id="detected_0_to_3"),
pytest.param("zone.occupancy_detected", "1", "2", False, id="detected_1_to_2"),
pytest.param("zone.occupancy_detected", "2", "0", False, id="detected_2_to_0"),
pytest.param(
"zone.occupancy_detected",
STATE_UNKNOWN,
"1",
False,
id="detected_unknown_to_1",
),
pytest.param(
"zone.occupancy_detected",
STATE_UNAVAILABLE,
"1",
False,
id="detected_unavailable_to_1",
),
pytest.param(
"zone.occupancy_detected",
"0",
STATE_UNAVAILABLE,
False,
id="detected_0_to_unavailable",
),
# occupancy_cleared
pytest.param("zone.occupancy_cleared", "1", "0", True, id="cleared_1_to_0"),
pytest.param("zone.occupancy_cleared", "3", "0", True, id="cleared_3_to_0"),
pytest.param("zone.occupancy_cleared", "2", "1", False, id="cleared_2_to_1"),
pytest.param("zone.occupancy_cleared", "0", "1", False, id="cleared_0_to_1"),
pytest.param(
"zone.occupancy_cleared",
"1",
STATE_UNAVAILABLE,
False,
id="cleared_1_to_unavailable",
),
pytest.param(
"zone.occupancy_cleared",
"1",
STATE_UNKNOWN,
False,
id="cleared_1_to_unknown",
),
],
)
async def test_zone_occupancy_trigger_transitions(
hass: HomeAssistant,
service_calls: list[ServiceCall],
trigger_key: str,
from_state: str,
to_state: str,
should_fire: bool,
) -> None:
"""Test occupancy triggers fire on the expected numeric-state transitions."""
hass.states.async_set(ZONE_HOME, from_state)
await hass.async_block_till_done()
assert await async_setup_component(
hass,
automation.DOMAIN,
{
automation.DOMAIN: {
"trigger": {
"trigger": trigger_key,
"options": {"zone": ZONE_HOME},
},
"action": {"service": "test.automation"},
}
},
)
hass.states.async_set(ZONE_HOME, to_state)
await hass.async_block_till_done()
assert (len(service_calls) == 1) is should_fire
@pytest.mark.usefixtures("enable_labs_preview_features")
@pytest.mark.parametrize(
("trigger_key", "from_value", "to_value", "revert_value"),
[
("zone.occupancy_detected", "0", "1", "0"),
("zone.occupancy_cleared", "1", "0", "1"),
],
)
async def test_zone_occupancy_trigger_for_duration(
hass: HomeAssistant,
freezer: FrozenDateTimeFactory,
service_calls: list[ServiceCall],
trigger_key: str,
from_value: str,
to_value: str,
revert_value: str,
) -> None:
"""Test that `for` delays the firing and an early revert cancels it."""
hass.states.async_set(ZONE_HOME, from_value)
await hass.async_block_till_done()
assert await async_setup_component(
hass,
automation.DOMAIN,
{
automation.DOMAIN: {
"trigger": {
"trigger": trigger_key,
"options": {"zone": ZONE_HOME, "for": {"seconds": 5}},
},
"action": {"service": "test.automation"},
}
},
)
# Transition, then revert before the duration elapses -> no fire.
hass.states.async_set(ZONE_HOME, to_value)
await hass.async_block_till_done()
hass.states.async_set(ZONE_HOME, revert_value)
await hass.async_block_till_done()
freezer.tick(timedelta(seconds=10))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(service_calls) == 0
# Transition and hold past the duration -> fire once.
hass.states.async_set(ZONE_HOME, to_value)
await hass.async_block_till_done()
freezer.tick(timedelta(seconds=10))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(service_calls) == 1
@pytest.mark.usefixtures("enable_labs_preview_features")
async def test_zone_occupancy_trigger_payload(
hass: HomeAssistant, service_calls: list[ServiceCall]
) -> None:
"""Test the payload exposed to the action template."""
hass.states.async_set(ZONE_HOME, "0")
await hass.async_block_till_done()
assert await async_setup_component(
hass,
automation.DOMAIN,
{
automation.DOMAIN: {
"trigger": {
"trigger": "zone.occupancy_detected",
"options": {"zone": ZONE_HOME},
},
"action": {
"service": "test.automation",
"data_template": {
"some": (
"{{ trigger.entity_id }}"
" - {{ trigger.from_state.state }}"
" - {{ trigger.to_state.state }}"
" - {{ trigger.for }}"
)
},
},
}
},
)
hass.states.async_set(ZONE_HOME, "2")
await hass.async_block_till_done()
assert len(service_calls) == 1
assert service_calls[0].data["some"] == f"{ZONE_HOME} - 0 - 2 - None"
+137 -19
View File
@@ -14,6 +14,7 @@ from pytest_unordered import unordered
import voluptuous as vol
from homeassistant.components import automation
from homeassistant.components.device_automation import DEVICE_TRIGGER_BASE_SCHEMA
from homeassistant.components.sun import DOMAIN as SUN_DOMAIN
from homeassistant.components.system_health import DOMAIN as SYSTEM_HEALTH_DOMAIN
from homeassistant.components.tag import DOMAIN as TAG_DOMAIN
@@ -41,7 +42,11 @@ from homeassistant.core import (
callback,
)
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import config_validation as cv, trigger
from homeassistant.helpers import (
config_validation as cv,
device_registry as dr,
trigger,
)
from homeassistant.helpers.automation import (
DomainSpec,
move_top_level_schema_fields_to_options,
@@ -78,6 +83,7 @@ from homeassistant.util.unit_conversion import TemperatureConverter
from homeassistant.util.yaml.loader import parse_yaml
from tests.common import (
MockConfigEntry,
MockModule,
MockPlatform,
async_fire_time_changed,
@@ -4618,6 +4624,34 @@ async def test_entity_trigger_duration_cancelled_on_invalid_state(
unsub()
@pytest.fixture
def mock_test_modern_trigger(hass: HomeAssistant) -> None:
"""Register a mock 'test' integration and trigger platform exposing 'test.modern'."""
class MockModernTrigger(Trigger):
"""Mock modern trigger that accepts any options/target."""
@classmethod
async def async_validate_config(
cls, hass: HomeAssistant, config: ConfigType
) -> ConfigType:
return config
async def async_attach_runner(
self, run_action: TriggerActionRunner
) -> CALLBACK_TYPE:
return lambda: None
async def async_get_triggers(
hass: HomeAssistant,
) -> dict[str, type[Trigger]]:
return {"modern": MockModernTrigger}
mock_integration(hass, MockModule("test"))
mock_platform(hass, "test.trigger", Mock(async_get_triggers=async_get_triggers))
@pytest.mark.usefixtures("mock_test_modern_trigger")
@pytest.mark.parametrize(
("trigger_conf", "expected"),
[
@@ -4627,7 +4661,7 @@ async def test_entity_trigger_duration_cancelled_on_invalid_state(
id="state",
),
pytest.param(
{"platform": "numeric_state", "entity_id": ["sensor.a"]},
{"platform": "numeric_state", "entity_id": ["sensor.a"], "above": 5},
["sensor.a"],
id="numeric_state",
),
@@ -4639,36 +4673,61 @@ async def test_entity_trigger_duration_cancelled_on_invalid_state(
pytest.param(
{
"platform": "zone",
"entity_id": ["person.a"],
"zone": "zone.home",
"event": "enter",
"options": {
"entity_id": ["person.a"],
"zone": "zone.home",
"event": "enter",
},
},
["person.a", "zone.home"],
id="zone-legacy",
),
pytest.param(
{"platform": "geo_location", "zone": "zone.home"},
{
"platform": "zone.entered",
"target": {"entity_id": ["person.a", "device_tracker.b"]},
"options": {"zone": "zone.home"},
},
["person.a", "device_tracker.b", "zone.home"],
id="zone-entered-modern",
),
pytest.param(
{
"platform": "zone.left",
"target": {"entity_id": "person.a"},
"options": {"zone": "zone.home"},
},
["person.a", "zone.home"],
id="zone-left-modern",
),
pytest.param(
{"platform": "geo_location", "zone": "zone.home", "source": "test"},
["zone.home"],
id="geo_location",
),
pytest.param(
{"platform": "sun"},
{"platform": "sun", "event": "sunrise"},
["sun.sun"],
id="sun",
),
pytest.param(
{"platform": "event", "event_data": {"entity_id": "sensor.x"}},
{
"platform": "event",
"event_type": "test_event",
"event_data": {"entity_id": "sensor.x"},
},
["sensor.x"],
id="event-with-entity-id",
),
pytest.param(
{"platform": "event"},
{"platform": "event", "event_type": "test_event"},
[],
id="event-without-entity-id",
),
pytest.param(
{
"platform": "event",
"event_type": "test_event",
"event_data": {"entity_id": "not-a-valid-entity-id"},
},
[],
@@ -4677,6 +4736,7 @@ async def test_entity_trigger_duration_cancelled_on_invalid_state(
pytest.param(
{
"platform": "event",
"event_type": "test_event",
"event_data": {"entity_id": ["sensor.x", "sensor.y"]},
},
[],
@@ -4707,38 +4767,89 @@ async def test_entity_trigger_duration_cancelled_on_invalid_state(
),
],
)
def test_async_extract_entities(
trigger_conf: dict[str, Any], expected: list[str]
async def test_async_extract_entities(
hass: HomeAssistant,
trigger_conf: dict[str, Any],
expected: list[str],
) -> None:
"""Test extracting entities from various trigger config shapes."""
[trigger_conf] = await trigger.async_validate_trigger_config(hass, [trigger_conf])
assert trigger.async_extract_entities(trigger_conf) == expected
_MOCK_DEVICE_ID = "_mock_device_id_"
@pytest.fixture
async def mock_device_automation(hass: HomeAssistant) -> str:
"""Register a mock 'test' integration, device_trigger platform, and device.
Returns the device id, which tests substitute for _MOCK_DEVICE_ID in their
parametrized configs so the device platform branch can validate properly.
"""
mock_integration(hass, MockModule("test"))
mock_platform(
hass,
"test.device_trigger",
Mock(
TRIGGER_SCHEMA=DEVICE_TRIGGER_BASE_SCHEMA.extend({}, extra=vol.ALLOW_EXTRA)
),
)
config_entry = MockConfigEntry(domain="test")
config_entry.add_to_hass(hass)
device = dr.async_get(hass).async_get_or_create(
config_entry_id=config_entry.entry_id,
identifiers={("test", "test")},
)
return device.id
def _substitute(obj: Any, placeholder: str, value: str) -> Any:
"""Recursively replace `placeholder` with `value` inside lists/dicts."""
if isinstance(obj, dict):
return {k: _substitute(v, placeholder, value) for k, v in obj.items()}
if isinstance(obj, list):
return [_substitute(v, placeholder, value) for v in obj]
return value if obj == placeholder else obj
@pytest.mark.parametrize(
("trigger_conf", "expected"),
[
pytest.param(
{"platform": "device", "device_id": "abc123"},
["abc123"],
{
"platform": "device",
"device_id": _MOCK_DEVICE_ID,
"domain": "test",
},
[_MOCK_DEVICE_ID],
id="device",
),
pytest.param(
{"platform": "event", "event_data": {"device_id": "abc123"}},
{
"platform": "event",
"event_type": "test_event",
"event_data": {"device_id": "abc123"},
},
["abc123"],
id="event-with-device-id",
),
pytest.param(
{"platform": "event"},
{"platform": "event", "event_type": "test_event"},
[],
id="event-without-device-id",
),
pytest.param(
{"platform": "tag", "device_id": ["abc123", "def456"]},
{
"platform": "tag",
"tag_id": "mytag",
"device_id": ["abc123", "def456"],
},
["abc123", "def456"],
id="tag-with-device-id",
),
pytest.param(
{"platform": "tag"},
{"platform": "tag", "tag_id": "mytag"},
[],
id="tag-without-device-id",
),
@@ -4762,8 +4873,15 @@ def test_async_extract_entities(
),
],
)
def test_async_extract_devices(
trigger_conf: dict[str, Any], expected: list[str]
async def test_async_extract_devices(
hass: HomeAssistant,
mock_test_modern_trigger: None,
mock_device_automation: str,
trigger_conf: dict[str, Any],
expected: list[str],
) -> None:
"""Test extracting devices from various trigger config shapes."""
trigger_conf = _substitute(trigger_conf, _MOCK_DEVICE_ID, mock_device_automation)
expected = _substitute(expected, _MOCK_DEVICE_ID, mock_device_automation)
[trigger_conf] = await trigger.async_validate_trigger_config(hass, [trigger_conf])
assert trigger.async_extract_devices(trigger_conf) == expected