Compare commits

..

1 Commits

Author SHA1 Message Date
Erik
4b5fe4766e Add vacuum cleaner conditions 2026-01-21 16:52:16 +01:00
57 changed files with 1602 additions and 1072 deletions

View File

@@ -1,6 +1,6 @@
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.14.13
rev: v0.13.0
hooks:
- id: ruff-check
args:

View File

@@ -14,7 +14,7 @@
"name": "[%key:component::alarm_control_panel::common::condition_behavior_name%]"
}
},
"name": "Alarm is armed"
"name": "If an alarm is armed"
},
"is_armed_away": {
"description": "Tests if one or more alarms are armed in away mode.",
@@ -24,7 +24,7 @@
"name": "[%key:component::alarm_control_panel::common::condition_behavior_name%]"
}
},
"name": "Alarm is armed away"
"name": "If an alarm is armed away"
},
"is_armed_home": {
"description": "Tests if one or more alarms are armed in home mode.",
@@ -34,7 +34,7 @@
"name": "[%key:component::alarm_control_panel::common::condition_behavior_name%]"
}
},
"name": "Alarm is armed home"
"name": "If an alarm is armed home"
},
"is_armed_night": {
"description": "Tests if one or more alarms are armed in night mode.",
@@ -44,7 +44,7 @@
"name": "[%key:component::alarm_control_panel::common::condition_behavior_name%]"
}
},
"name": "Alarm is armed night"
"name": "If an alarm is armed night"
},
"is_armed_vacation": {
"description": "Tests if one or more alarms are armed in vacation mode.",
@@ -54,7 +54,7 @@
"name": "[%key:component::alarm_control_panel::common::condition_behavior_name%]"
}
},
"name": "Alarm is armed vacation"
"name": "If an alarm is armed vacation"
},
"is_disarmed": {
"description": "Tests if one or more alarms are disarmed.",
@@ -64,7 +64,7 @@
"name": "[%key:component::alarm_control_panel::common::condition_behavior_name%]"
}
},
"name": "Alarm is disarmed"
"name": "If an alarm is disarmed"
},
"is_triggered": {
"description": "Tests if one or more alarms are triggered.",
@@ -74,7 +74,7 @@
"name": "[%key:component::alarm_control_panel::common::condition_behavior_name%]"
}
},
"name": "Alarm is triggered"
"name": "If an alarm is triggered"
}
},
"device_automation": {

View File

@@ -5,14 +5,9 @@ from __future__ import annotations
import asyncio
import logging
from random import randrange
import sys
from typing import Any, cast
from pyatv import connect, exceptions, scan
from pyatv.conf import AppleTV
from pyatv.const import DeviceModel, Protocol
from pyatv.convert import model_str
from pyatv.interface import AppleTV as AppleTVInterface, DeviceListener
from homeassistant.components import zeroconf
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
@@ -29,7 +24,11 @@ from homeassistant.const import (
Platform,
)
from homeassistant.core import Event, HomeAssistant, callback
from homeassistant.exceptions import ConfigEntryAuthFailed, ConfigEntryNotReady
from homeassistant.exceptions import (
ConfigEntryAuthFailed,
ConfigEntryNotReady,
HomeAssistantError,
)
from homeassistant.helpers import device_registry as dr
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.dispatcher import async_dispatcher_send
@@ -43,6 +42,18 @@ from .const import (
SIGNAL_DISCONNECTED,
)
if sys.version_info < (3, 14):
from pyatv import connect, exceptions, scan
from pyatv.conf import AppleTV
from pyatv.const import DeviceModel, Protocol
from pyatv.convert import model_str
from pyatv.interface import AppleTV as AppleTVInterface, DeviceListener
else:
class DeviceListener:
"""Dummy class."""
_LOGGER = logging.getLogger(__name__)
DEFAULT_NAME_TV = "Apple TV"
@@ -53,25 +64,30 @@ BACKOFF_TIME_UPPER_LIMIT = 300 # Five minutes
PLATFORMS = [Platform.MEDIA_PLAYER, Platform.REMOTE]
AUTH_EXCEPTIONS = (
exceptions.AuthenticationError,
exceptions.InvalidCredentialsError,
exceptions.NoCredentialsError,
)
CONNECTION_TIMEOUT_EXCEPTIONS = (
OSError,
asyncio.CancelledError,
TimeoutError,
exceptions.ConnectionLostError,
exceptions.ConnectionFailedError,
)
DEVICE_EXCEPTIONS = (
exceptions.ProtocolError,
exceptions.NoServiceError,
exceptions.PairingError,
exceptions.BackOffError,
exceptions.DeviceIdMissingError,
)
if sys.version_info < (3, 14):
AUTH_EXCEPTIONS = (
exceptions.AuthenticationError,
exceptions.InvalidCredentialsError,
exceptions.NoCredentialsError,
)
CONNECTION_TIMEOUT_EXCEPTIONS = (
OSError,
asyncio.CancelledError,
TimeoutError,
exceptions.ConnectionLostError,
exceptions.ConnectionFailedError,
)
DEVICE_EXCEPTIONS = (
exceptions.ProtocolError,
exceptions.NoServiceError,
exceptions.PairingError,
exceptions.BackOffError,
exceptions.DeviceIdMissingError,
)
else:
AUTH_EXCEPTIONS = ()
CONNECTION_TIMEOUT_EXCEPTIONS = ()
DEVICE_EXCEPTIONS = ()
type AppleTvConfigEntry = ConfigEntry[AppleTVManager]
@@ -79,6 +95,10 @@ type AppleTvConfigEntry = ConfigEntry[AppleTVManager]
async def async_setup_entry(hass: HomeAssistant, entry: AppleTvConfigEntry) -> bool:
"""Set up a config entry for Apple TV."""
if sys.version_info >= (3, 14):
raise HomeAssistantError(
"Apple TV is not supported on Python 3.14. Please use Python 3.13."
)
manager = AppleTVManager(hass, entry)
if manager.is_on:

View File

@@ -8,7 +8,7 @@
"integration_type": "device",
"iot_class": "local_push",
"loggers": ["pyatv", "srptools"],
"requirements": ["pyatv==0.17.0"],
"requirements": ["pyatv==0.16.1;python_version<'3.14'"],
"zeroconf": [
"_mediaremotetv._tcp.local.",
"_companion-link._tcp.local.",

View File

@@ -239,15 +239,6 @@ class AppleTvMediaPlayer(
"""
self.async_write_ha_state()
@callback
def volume_device_update(
self, output_device: OutputDevice, old_level: float, new_level: float
) -> None:
"""Output device volume was updated.
This is a callback function from pyatv.interface.AudioListener.
"""
@callback
def outputdevices_update(
self, old_devices: list[OutputDevice], new_devices: list[OutputDevice]

View File

@@ -14,7 +14,7 @@
"name": "[%key:component::assist_satellite::common::condition_behavior_name%]"
}
},
"name": "Satellite is idle"
"name": "If a satellite is idle"
},
"is_listening": {
"description": "Tests if one or more Assist satellites are listening.",
@@ -24,7 +24,7 @@
"name": "[%key:component::assist_satellite::common::condition_behavior_name%]"
}
},
"name": "Satellite is listening"
"name": "If a satellite is listening"
},
"is_processing": {
"description": "Tests if one or more Assist satellites are processing.",
@@ -34,7 +34,7 @@
"name": "[%key:component::assist_satellite::common::condition_behavior_name%]"
}
},
"name": "Satellite is processing"
"name": "If a satellite is processing"
},
"is_responding": {
"description": "Tests if one or more Assist satellites are responding.",
@@ -44,7 +44,7 @@
"name": "[%key:component::assist_satellite::common::condition_behavior_name%]"
}
},
"name": "Satellite is responding"
"name": "If a satellite is responding"
}
},
"entity_component": {

View File

@@ -128,6 +128,7 @@ _EXPERIMENTAL_CONDITION_PLATFORMS = {
"fan",
"light",
"siren",
"vacuum",
}
_EXPERIMENTAL_TRIGGER_PLATFORMS = {

View File

@@ -14,7 +14,7 @@
"name": "[%key:component::fan::common::condition_behavior_name%]"
}
},
"name": "Fan is off"
"name": "If a fan is off"
},
"is_on": {
"description": "Tests if one or more fans are on.",
@@ -24,7 +24,7 @@
"name": "[%key:component::fan::common::condition_behavior_name%]"
}
},
"name": "Fan is on"
"name": "If a fan is on"
}
},
"device_automation": {

View File

@@ -12,6 +12,9 @@
},
"non_methane_hydrocarbons": {
"default": "mdi:molecule"
},
"ozone": {
"default": "mdi:molecule"
}
}
}

View File

@@ -154,8 +154,8 @@ AIR_QUALITY_SENSOR_TYPES: tuple[AirQualitySensorEntityDescription, ...] = (
),
AirQualitySensorEntityDescription(
key="o3",
translation_key="ozone",
state_class=SensorStateClass.MEASUREMENT,
device_class=SensorDeviceClass.OZONE,
native_unit_of_measurement_fn=lambda x: x.pollutants.o3.concentration.units,
exists_fn=lambda x: "o3" in {p.code for p in x.pollutants},
value_fn=lambda x: x.pollutants.o3.concentration.value,

View File

@@ -211,6 +211,9 @@
"non_methane_hydrocarbons": {
"name": "Non-methane hydrocarbons"
},
"ozone": {
"name": "[%key:component::sensor::entity_component::ozone::name%]"
},
"uaqi": {
"name": "Universal Air Quality Index"
},

View File

@@ -7,5 +7,5 @@
"integration_type": "device",
"iot_class": "local_polling",
"quality_scale": "silver",
"requirements": ["hdfury==1.4.2"]
"requirements": ["hdfury==1.3.1"]
}

View File

@@ -49,7 +49,7 @@
"name": "[%key:component::light::common::condition_behavior_name%]"
}
},
"name": "Light is off"
"name": "If a light is off"
},
"is_on": {
"description": "Tests if one or more lights are on.",
@@ -59,7 +59,7 @@
"name": "[%key:component::light::common::condition_behavior_name%]"
}
},
"name": "Light is on"
"name": "If a light is on"
}
},
"device_automation": {

View File

@@ -43,7 +43,6 @@ ATTR_ICON = "icon"
ATTR_MARKDOWN = "markdown"
ATTR_PRIORITY = "priority"
ATTR_TAGS = "tags"
ATTR_SEQUENCE_ID = "sequence_id"
SERVICE_PUBLISH_SCHEMA = cv.make_entity_service_schema(
{
@@ -61,7 +60,6 @@ SERVICE_PUBLISH_SCHEMA = cv.make_entity_service_schema(
vol.Optional(ATTR_EMAIL): vol.Email(),
vol.Optional(ATTR_CALL): cv.string,
vol.Optional(ATTR_ICON): vol.All(vol.Url(), vol.Coerce(URL)),
vol.Optional(ATTR_SEQUENCE_ID): cv.string,
}
)

View File

@@ -88,8 +88,3 @@ publish:
type: url
autocomplete: url
example: https://example.org/logo.png
sequence_id:
required: false
selector:
text:
example: "Mc3otamDNcpJ"

View File

@@ -1,7 +1,6 @@
{
"common": {
"add_topic_description": "Set up a topic for notifications.",
"sequence_id": "Sequence ID",
"topic": "Topic"
},
"config": {
@@ -172,9 +171,6 @@
"icon": { "name": "Icon" },
"message": { "name": "Message" },
"priority": { "name": "Priority" },
"sequence_id": {
"name": "[%key:component::ntfy::common::sequence_id%]"
},
"tags": { "name": "Tags" },
"time": { "name": "Time" },
"title": { "name": "Title" },
@@ -360,10 +356,6 @@
"description": "All messages have a priority that defines how urgently your phone notifies you, depending on the configured vibration patterns, notification sounds, and visibility in the notification drawer or pop-over.",
"name": "Message priority"
},
"sequence_id": {
"description": "Enter a message or sequence ID to update an existing notification, or specify a sequence ID to reference later when updating, clearing (mark as read and dismiss), or deleting a notification.",
"name": "[%key:component::ntfy::common::sequence_id%]"
},
"tags": {
"description": "Add tags or emojis to the notification. Emojis (using shortcodes like smile) will appear in the notification title or message. Other tags will be displayed below the notification content.",
"name": "Tags/Emojis"

View File

@@ -7,9 +7,6 @@
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
"unknown_license_plate": "Unknown license plate"
},
"initiate_flow": {
"user": "Add vehicle"
},
"step": {
"user": {
"data": {

View File

@@ -49,8 +49,8 @@ DEFAULT_NAME = "Template Select"
SELECT_COMMON_SCHEMA = vol.Schema(
{
vol.Required(ATTR_OPTIONS): cv.template,
vol.Required(CONF_SELECT_OPTION): cv.SCRIPT_SCHEMA,
vol.Optional(ATTR_OPTIONS): cv.template,
vol.Optional(CONF_SELECT_OPTION): cv.SCRIPT_SCHEMA,
vol.Optional(CONF_STATE): cv.template,
}
)

View File

@@ -8,7 +8,6 @@ import dataclasses
from uiprotect.data import (
NVR,
Camera,
Event,
ModelType,
MountType,
ProtectAdoptableDeviceModel,
@@ -645,31 +644,6 @@ class ProtectEventBinarySensor(EventEntityMixin, BinarySensorEntity):
self._attr_is_on = False
self._attr_extra_state_attributes = {}
@callback
def _find_active_event_with_object_type(
self, device: ProtectDeviceType
) -> Event | None:
"""Find an active event containing this sensor's object type.
Fallback for issue #152133: last_smart_detect_event_ids may not update
immediately when a new detection type is added to an ongoing event.
"""
obj_type = self.entity_description.ufp_obj_type
if obj_type is None or not isinstance(device, Camera):
return None
# Check known active event IDs from camera first (fast path)
for event_id in device.last_smart_detect_event_ids.values():
if (
event_id
and (event := self.data.api.bootstrap.events.get(event_id))
and event.end is None
and obj_type in event.smart_detect_types
):
return event
return None
@callback
def _async_update_device_from_protect(self, device: ProtectDeviceType) -> None:
description = self.entity_description
@@ -677,15 +651,9 @@ class ProtectEventBinarySensor(EventEntityMixin, BinarySensorEntity):
prev_event = self._event
prev_event_end = self._event_end
super()._async_update_device_from_protect(device)
event = description.get_event_obj(device)
if event is None:
# Fallback for #152133: check active events directly
event = self._find_active_event_with_object_type(device)
if event:
if event := description.get_event_obj(device):
self._event = event
self._event_end = event.end
self._event_end = event.end if event else None
if not (
event

View File

@@ -41,7 +41,7 @@
"iot_class": "local_push",
"loggers": ["uiprotect", "unifi_discovery"],
"quality_scale": "platinum",
"requirements": ["uiprotect==10.0.1", "unifi-discovery==1.2.0"],
"requirements": ["uiprotect==10.0.0", "unifi-discovery==1.2.0"],
"ssdp": [
{
"manufacturer": "Ubiquiti Networks",

View File

@@ -0,0 +1,21 @@
"""Provides conditions for vacuum cleaners."""
from homeassistant.core import HomeAssistant
from homeassistant.helpers.condition import Condition, make_entity_state_condition
from .const import DOMAIN, VacuumActivity
CONDITIONS: dict[str, type[Condition]] = {
"is_cleaning": make_entity_state_condition(DOMAIN, VacuumActivity.CLEANING),
"is_docked": make_entity_state_condition(DOMAIN, VacuumActivity.DOCKED),
"is_encountering_an_error": make_entity_state_condition(
DOMAIN, VacuumActivity.ERROR
),
"is_paused": make_entity_state_condition(DOMAIN, VacuumActivity.PAUSED),
"is_returning": make_entity_state_condition(DOMAIN, VacuumActivity.RETURNING),
}
async def async_get_conditions(hass: HomeAssistant) -> dict[str, type[Condition]]:
"""Return the conditions for vacuum cleaners."""
return CONDITIONS

View File

@@ -0,0 +1,20 @@
.condition_common: &condition_common
target:
entity:
domain: vacuum
fields:
behavior:
required: true
default: any
selector:
select:
options:
- all
- any
translation_key: condition_behavior
is_cleaning: *condition_common
is_docked: *condition_common
is_encountering_an_error: *condition_common
is_paused: *condition_common
is_returning: *condition_common

View File

@@ -1,4 +1,21 @@
{
"conditions": {
"is_cleaning": {
"condition": "mdi:play"
},
"is_docked": {
"condition": "mdi:home-import-outline"
},
"is_encountering_an_error": {
"condition": "mdi:alert-circle-outline"
},
"is_paused": {
"condition": "mdi:pause"
},
"is_returning": {
"condition": "mdi:home-import-outline"
}
},
"entity_component": {
"_": {
"default": "mdi:robot-vacuum"

View File

@@ -1,8 +1,62 @@
{
"common": {
"condition_behavior_description": "How the state should match on the targeted vacuum cleaners.",
"condition_behavior_name": "Behavior",
"trigger_behavior_description": "The behavior of the targeted vacuum cleaners to trigger on.",
"trigger_behavior_name": "Behavior"
},
"conditions": {
"is_cleaning": {
"description": "Tests if one or more vacuum cleaners are cleaning.",
"fields": {
"behavior": {
"description": "[%key:component::vacuum::common::condition_behavior_description%]",
"name": "[%key:component::vacuum::common::condition_behavior_name%]"
}
},
"name": "If a vacuum cleaner is cleaning"
},
"is_docked": {
"description": "Tests if one or more vacuum cleaners are docked.",
"fields": {
"behavior": {
"description": "[%key:component::vacuum::common::condition_behavior_description%]",
"name": "[%key:component::vacuum::common::condition_behavior_name%]"
}
},
"name": "If a vacuum cleaner is docked"
},
"is_encountering_an_error": {
"description": "Tests if one or more vacuum cleaners are encountering an error.",
"fields": {
"behavior": {
"description": "[%key:component::vacuum::common::condition_behavior_description%]",
"name": "[%key:component::vacuum::common::condition_behavior_name%]"
}
},
"name": "If a vacuum cleaner is encountering an error"
},
"is_paused": {
"description": "Tests if one or more vacuum cleaners are paused.",
"fields": {
"behavior": {
"description": "[%key:component::vacuum::common::condition_behavior_description%]",
"name": "[%key:component::vacuum::common::condition_behavior_name%]"
}
},
"name": "If a vacuum cleaner is paused"
},
"is_returning": {
"description": "Tests if one or more vacuum cleaners are returning to the dock.",
"fields": {
"behavior": {
"description": "[%key:component::vacuum::common::condition_behavior_description%]",
"name": "[%key:component::vacuum::common::condition_behavior_name%]"
}
},
"name": "If a vacuum cleaner is returning"
}
},
"device_automation": {
"action_type": {
"clean": "Let {entity_name} clean",
@@ -36,6 +90,12 @@
}
},
"selector": {
"condition_behavior": {
"options": {
"all": "All",
"any": "Any"
}
},
"trigger_behavior": {
"options": {
"any": "Any",

View File

@@ -188,52 +188,6 @@ class BaseUnitConverter:
return (from_unit in cls._UNIT_INVERSES) != (to_unit in cls._UNIT_INVERSES)
class ApparentPowerConverter(BaseUnitConverter):
"""Utility to convert apparent power values."""
UNIT_CLASS = "apparent_power"
_UNIT_CONVERSION: dict[str | None, float] = {
UnitOfApparentPower.MILLIVOLT_AMPERE: 1 * 1000,
UnitOfApparentPower.VOLT_AMPERE: 1,
UnitOfApparentPower.KILO_VOLT_AMPERE: 1 / 1000,
}
VALID_UNITS = {
UnitOfApparentPower.MILLIVOLT_AMPERE,
UnitOfApparentPower.VOLT_AMPERE,
UnitOfApparentPower.KILO_VOLT_AMPERE,
}
class AreaConverter(BaseUnitConverter):
"""Utility to convert area values."""
UNIT_CLASS = "area"
_UNIT_CONVERSION: dict[str | None, float] = {
UnitOfArea.SQUARE_METERS: 1,
UnitOfArea.SQUARE_CENTIMETERS: 1 / _CM2_TO_M2,
UnitOfArea.SQUARE_MILLIMETERS: 1 / _MM2_TO_M2,
UnitOfArea.SQUARE_KILOMETERS: 1 / _KM2_TO_M2,
UnitOfArea.SQUARE_INCHES: 1 / _IN2_TO_M2,
UnitOfArea.SQUARE_FEET: 1 / _FT2_TO_M2,
UnitOfArea.SQUARE_YARDS: 1 / _YD2_TO_M2,
UnitOfArea.SQUARE_MILES: 1 / _MI2_TO_M2,
UnitOfArea.ACRES: 1 / _ACRE_TO_M2,
UnitOfArea.HECTARES: 1 / _HECTARE_TO_M2,
}
VALID_UNITS = set(UnitOfArea)
class BloodGlucoseConcentrationConverter(BaseUnitConverter):
"""Utility to convert blood glucose concentration values."""
UNIT_CLASS = "blood_glucose_concentration"
_UNIT_CONVERSION: dict[str | None, float] = {
UnitOfBloodGlucoseConcentration.MILLIGRAMS_PER_DECILITER: 18,
UnitOfBloodGlucoseConcentration.MILLIMOLE_PER_LITER: 1,
}
VALID_UNITS = set(UnitOfBloodGlucoseConcentration)
class CarbonMonoxideConcentrationConverter(BaseUnitConverter):
"""Convert carbon monoxide ratio to mass per volume.
@@ -259,16 +213,36 @@ class CarbonMonoxideConcentrationConverter(BaseUnitConverter):
}
class ConductivityConverter(BaseUnitConverter):
"""Utility to convert electric current values."""
class NitrogenDioxideConcentrationConverter(BaseUnitConverter):
"""Convert nitrogen dioxide ratio to mass per volume."""
UNIT_CLASS = "conductivity"
UNIT_CLASS = "nitrogen_dioxide"
_UNIT_CONVERSION: dict[str | None, float] = {
UnitOfConductivity.MICROSIEMENS_PER_CM: 1,
UnitOfConductivity.MILLISIEMENS_PER_CM: 1e-3,
UnitOfConductivity.SIEMENS_PER_CM: 1e-6,
CONCENTRATION_PARTS_PER_BILLION: 1e9,
CONCENTRATION_MICROGRAMS_PER_CUBIC_METER: (
_NITROGEN_DIOXIDE_MOLAR_MASS / _AMBIENT_IDEAL_GAS_MOLAR_VOLUME * 1e6
),
}
VALID_UNITS = {
CONCENTRATION_PARTS_PER_BILLION,
CONCENTRATION_MICROGRAMS_PER_CUBIC_METER,
}
class SulphurDioxideConcentrationConverter(BaseUnitConverter):
"""Convert sulphur dioxide ratio to mass per volume."""
UNIT_CLASS = "sulphur_dioxide"
_UNIT_CONVERSION: dict[str | None, float] = {
CONCENTRATION_PARTS_PER_BILLION: 1e9,
CONCENTRATION_MICROGRAMS_PER_CUBIC_METER: (
_SULPHUR_DIOXIDE_MOLAR_MASS / _AMBIENT_IDEAL_GAS_MOLAR_VOLUME * 1e6
),
}
VALID_UNITS = {
CONCENTRATION_PARTS_PER_BILLION,
CONCENTRATION_MICROGRAMS_PER_CUBIC_METER,
}
VALID_UNITS = set(UnitOfConductivity)
class DataRateConverter(BaseUnitConverter):
@@ -292,6 +266,25 @@ class DataRateConverter(BaseUnitConverter):
VALID_UNITS = set(UnitOfDataRate)
class AreaConverter(BaseUnitConverter):
"""Utility to convert area values."""
UNIT_CLASS = "area"
_UNIT_CONVERSION: dict[str | None, float] = {
UnitOfArea.SQUARE_METERS: 1,
UnitOfArea.SQUARE_CENTIMETERS: 1 / _CM2_TO_M2,
UnitOfArea.SQUARE_MILLIMETERS: 1 / _MM2_TO_M2,
UnitOfArea.SQUARE_KILOMETERS: 1 / _KM2_TO_M2,
UnitOfArea.SQUARE_INCHES: 1 / _IN2_TO_M2,
UnitOfArea.SQUARE_FEET: 1 / _FT2_TO_M2,
UnitOfArea.SQUARE_YARDS: 1 / _YD2_TO_M2,
UnitOfArea.SQUARE_MILES: 1 / _MI2_TO_M2,
UnitOfArea.ACRES: 1 / _ACRE_TO_M2,
UnitOfArea.HECTARES: 1 / _HECTARE_TO_M2,
}
VALID_UNITS = set(UnitOfArea)
class DistanceConverter(BaseUnitConverter):
"""Utility to convert distance values."""
@@ -320,28 +313,27 @@ class DistanceConverter(BaseUnitConverter):
}
class DurationConverter(BaseUnitConverter):
"""Utility to convert duration values."""
class BloodGlucoseConcentrationConverter(BaseUnitConverter):
"""Utility to convert blood glucose concentration values."""
UNIT_CLASS = "duration"
UNIT_CLASS = "blood_glucose_concentration"
_UNIT_CONVERSION: dict[str | None, float] = {
UnitOfTime.MICROSECONDS: 1000000,
UnitOfTime.MILLISECONDS: 1000,
UnitOfTime.SECONDS: 1,
UnitOfTime.MINUTES: 1 / _MIN_TO_SEC,
UnitOfTime.HOURS: 1 / _HRS_TO_SECS,
UnitOfTime.DAYS: 1 / _DAYS_TO_SECS,
UnitOfTime.WEEKS: 1 / (7 * _DAYS_TO_SECS),
UnitOfBloodGlucoseConcentration.MILLIGRAMS_PER_DECILITER: 18,
UnitOfBloodGlucoseConcentration.MILLIMOLE_PER_LITER: 1,
}
VALID_UNITS = {
UnitOfTime.MICROSECONDS,
UnitOfTime.MILLISECONDS,
UnitOfTime.SECONDS,
UnitOfTime.MINUTES,
UnitOfTime.HOURS,
UnitOfTime.DAYS,
UnitOfTime.WEEKS,
VALID_UNITS = set(UnitOfBloodGlucoseConcentration)
class ConductivityConverter(BaseUnitConverter):
"""Utility to convert electric current values."""
UNIT_CLASS = "conductivity"
_UNIT_CONVERSION: dict[str | None, float] = {
UnitOfConductivity.MICROSIEMENS_PER_CM: 1,
UnitOfConductivity.MILLISIEMENS_PER_CM: 1e-3,
UnitOfConductivity.SIEMENS_PER_CM: 1e-6,
}
VALID_UNITS = set(UnitOfConductivity)
class ElectricCurrentConverter(BaseUnitConverter):
@@ -470,51 +462,19 @@ class MassConverter(BaseUnitConverter):
}
class MassVolumeConcentrationConverter(BaseUnitConverter):
"""Utility to convert mass volume concentration values."""
class ApparentPowerConverter(BaseUnitConverter):
"""Utility to convert apparent power values."""
UNIT_CLASS = "concentration"
UNIT_CLASS = "apparent_power"
_UNIT_CONVERSION: dict[str | None, float] = {
CONCENTRATION_MICROGRAMS_PER_CUBIC_METER: 1000000.0, # 1000 µg/m³ = 1 mg/m³
CONCENTRATION_MILLIGRAMS_PER_CUBIC_METER: 1000.0, # 1000 mg/m³ = 1 g/m³
CONCENTRATION_GRAMS_PER_CUBIC_METER: 1.0,
UnitOfApparentPower.MILLIVOLT_AMPERE: 1 * 1000,
UnitOfApparentPower.VOLT_AMPERE: 1,
UnitOfApparentPower.KILO_VOLT_AMPERE: 1 / 1000,
}
VALID_UNITS = {
CONCENTRATION_MICROGRAMS_PER_CUBIC_METER,
CONCENTRATION_MILLIGRAMS_PER_CUBIC_METER,
CONCENTRATION_GRAMS_PER_CUBIC_METER,
}
class NitrogenDioxideConcentrationConverter(BaseUnitConverter):
"""Convert nitrogen dioxide ratio to mass per volume."""
UNIT_CLASS = "nitrogen_dioxide"
_UNIT_CONVERSION: dict[str | None, float] = {
CONCENTRATION_PARTS_PER_BILLION: 1e9,
CONCENTRATION_MICROGRAMS_PER_CUBIC_METER: (
_NITROGEN_DIOXIDE_MOLAR_MASS / _AMBIENT_IDEAL_GAS_MOLAR_VOLUME * 1e6
),
}
VALID_UNITS = {
CONCENTRATION_PARTS_PER_BILLION,
CONCENTRATION_MICROGRAMS_PER_CUBIC_METER,
}
class OzoneConcentrationConverter(BaseUnitConverter):
"""Convert ozone ratio to mass per volume."""
UNIT_CLASS = "ozone"
_UNIT_CONVERSION: dict[str | None, float] = {
CONCENTRATION_PARTS_PER_BILLION: 1e9,
CONCENTRATION_MICROGRAMS_PER_CUBIC_METER: (
_OZONE_MOLAR_MASS / _AMBIENT_IDEAL_GAS_MOLAR_VOLUME * 1e6
),
}
VALID_UNITS = {
CONCENTRATION_PARTS_PER_BILLION,
CONCENTRATION_MICROGRAMS_PER_CUBIC_METER,
UnitOfApparentPower.MILLIVOLT_AMPERE,
UnitOfApparentPower.VOLT_AMPERE,
UnitOfApparentPower.KILO_VOLT_AMPERE,
}
@@ -603,6 +563,22 @@ class ReactivePowerConverter(BaseUnitConverter):
}
class OzoneConcentrationConverter(BaseUnitConverter):
"""Convert ozone ratio to mass per volume."""
UNIT_CLASS = "ozone"
_UNIT_CONVERSION: dict[str | None, float] = {
CONCENTRATION_PARTS_PER_BILLION: 1e9,
CONCENTRATION_MICROGRAMS_PER_CUBIC_METER: (
_OZONE_MOLAR_MASS / _AMBIENT_IDEAL_GAS_MOLAR_VOLUME * 1e6
),
}
VALID_UNITS = {
CONCENTRATION_PARTS_PER_BILLION,
CONCENTRATION_MICROGRAMS_PER_CUBIC_METER,
}
class SpeedConverter(BaseUnitConverter):
"""Utility to convert speed values."""
@@ -703,22 +679,6 @@ class SpeedConverter(BaseUnitConverter):
return float(0.836 * beaufort ** (3 / 2))
class SulphurDioxideConcentrationConverter(BaseUnitConverter):
"""Convert sulphur dioxide ratio to mass per volume."""
UNIT_CLASS = "sulphur_dioxide"
_UNIT_CONVERSION: dict[str | None, float] = {
CONCENTRATION_PARTS_PER_BILLION: 1e9,
CONCENTRATION_MICROGRAMS_PER_CUBIC_METER: (
_SULPHUR_DIOXIDE_MOLAR_MASS / _AMBIENT_IDEAL_GAS_MOLAR_VOLUME * 1e6
),
}
VALID_UNITS = {
CONCENTRATION_PARTS_PER_BILLION,
CONCENTRATION_MICROGRAMS_PER_CUBIC_METER,
}
class TemperatureConverter(BaseUnitConverter):
"""Utility to convert temperature values."""
@@ -889,6 +849,22 @@ class UnitlessRatioConverter(BaseUnitConverter):
}
class MassVolumeConcentrationConverter(BaseUnitConverter):
"""Utility to convert mass volume concentration values."""
UNIT_CLASS = "concentration"
_UNIT_CONVERSION: dict[str | None, float] = {
CONCENTRATION_MICROGRAMS_PER_CUBIC_METER: 1000000.0, # 1000 µg/m³ = 1 mg/m³
CONCENTRATION_MILLIGRAMS_PER_CUBIC_METER: 1000.0, # 1000 mg/m³ = 1 g/m³
CONCENTRATION_GRAMS_PER_CUBIC_METER: 1.0,
}
VALID_UNITS = {
CONCENTRATION_MICROGRAMS_PER_CUBIC_METER,
CONCENTRATION_MILLIGRAMS_PER_CUBIC_METER,
CONCENTRATION_GRAMS_PER_CUBIC_METER,
}
class VolumeConverter(BaseUnitConverter):
"""Utility to convert volume values."""
@@ -951,3 +927,27 @@ class VolumeFlowRateConverter(BaseUnitConverter):
UnitOfVolumeFlowRate.GALLONS_PER_DAY,
UnitOfVolumeFlowRate.MILLILITERS_PER_SECOND,
}
class DurationConverter(BaseUnitConverter):
"""Utility to convert duration values."""
UNIT_CLASS = "duration"
_UNIT_CONVERSION: dict[str | None, float] = {
UnitOfTime.MICROSECONDS: 1000000,
UnitOfTime.MILLISECONDS: 1000,
UnitOfTime.SECONDS: 1,
UnitOfTime.MINUTES: 1 / _MIN_TO_SEC,
UnitOfTime.HOURS: 1 / _HRS_TO_SECS,
UnitOfTime.DAYS: 1 / _DAYS_TO_SECS,
UnitOfTime.WEEKS: 1 / (7 * _DAYS_TO_SECS),
}
VALID_UNITS = {
UnitOfTime.MICROSECONDS,
UnitOfTime.MILLISECONDS,
UnitOfTime.SECONDS,
UnitOfTime.MINUTES,
UnitOfTime.HOURS,
UnitOfTime.DAYS,
UnitOfTime.WEEKS,
}

View File

@@ -676,7 +676,7 @@ exclude_lines = [
]
[tool.ruff]
required-version = ">=0.14.13"
required-version = ">=0.13.0"
[tool.ruff.lint]
select = [

6
requirements_all.txt generated
View File

@@ -1184,7 +1184,7 @@ hassil==3.5.0
hdate[astral]==1.1.2
# homeassistant.components.hdfury
hdfury==1.4.2
hdfury==1.3.1
# homeassistant.components.heatmiser
heatmiserV3==2.0.4
@@ -1909,7 +1909,7 @@ pyatag==0.3.5.3
pyatmo==9.2.3
# homeassistant.components.apple_tv
pyatv==0.17.0
pyatv==0.16.1;python_version<'3.14'
# homeassistant.components.aussie_broadband
pyaussiebb==0.1.5
@@ -3080,7 +3080,7 @@ typedmonarchmoney==0.4.4
uasiren==0.0.1
# homeassistant.components.unifiprotect
uiprotect==10.0.1
uiprotect==10.0.0
# homeassistant.components.landisgyr_heat_meter
ultraheat-api==0.5.7

View File

@@ -1051,7 +1051,7 @@ hassil==3.5.0
hdate[astral]==1.1.2
# homeassistant.components.hdfury
hdfury==1.4.2
hdfury==1.3.1
# homeassistant.components.here_travel_time
here-routing==1.2.0
@@ -1637,7 +1637,7 @@ pyatag==0.3.5.3
pyatmo==9.2.3
# homeassistant.components.apple_tv
pyatv==0.17.0
pyatv==0.16.1;python_version<'3.14'
# homeassistant.components.aussie_broadband
pyaussiebb==0.1.5
@@ -2577,7 +2577,7 @@ typedmonarchmoney==0.4.4
uasiren==0.0.1
# homeassistant.components.unifiprotect
uiprotect==10.0.1
uiprotect==10.0.0
# homeassistant.components.landisgyr_heat_meter
ultraheat-api==0.5.7

View File

@@ -1,5 +1,5 @@
# Automatically generated from .pre-commit-config.yaml by gen_requirements_all.py, do not edit
codespell==2.4.1
ruff==0.14.13
ruff==0.13.0
yamllint==1.37.1

View File

@@ -26,7 +26,7 @@ RUN --mount=from=ghcr.io/astral-sh/uv:0.9.17,source=/uv,target=/bin/uv \
-r /usr/src/homeassistant/requirements.txt \
pipdeptree==2.26.1 \
tqdm==4.67.1 \
ruff==0.14.13
ruff==0.13.0
LABEL "name"="hassfest"
LABEL "maintainer"="Home Assistant <hello@home-assistant.io>"

View File

@@ -117,6 +117,7 @@ FORBIDDEN_PACKAGE_EXCEPTIONS: dict[str, dict[str, set[str]]] = {
"airthings": {"airthings-cloud": {"async-timeout"}},
"ampio": {"asmog": {"async-timeout"}},
"apache_kafka": {"aiokafka": {"async-timeout"}},
"apple_tv": {"pyatv": {"async-timeout"}},
"blackbird": {
# https://github.com/koolsb/pyblackbird/issues/12
# pyblackbird > pyserial-asyncio

View File

@@ -1,6 +1,9 @@
"""Tests for Apple TV."""
import sys
import pytest
# Make asserts in the common module display differences
pytest.register_assert_rewrite("tests.components.apple_tv.common")
if sys.version_info < (3, 14):
# Make asserts in the common module display differences
pytest.register_assert_rewrite("tests.components.apple_tv.common")

View File

@@ -1,14 +1,20 @@
"""Fixtures for component."""
from collections.abc import Generator
import sys
from unittest.mock import AsyncMock, MagicMock, patch
from pyatv import conf
from pyatv.const import PairingRequirement, Protocol
from pyatv.support import http
import pytest
from .common import MockPairingHandler, airplay_service, create_conf, mrp_service
if sys.version_info < (3, 14):
from pyatv import conf
from pyatv.const import PairingRequirement, Protocol
from pyatv.support import http
from .common import MockPairingHandler, airplay_service, create_conf, mrp_service
if sys.version_info >= (3, 14):
collect_ignore_glob = ["test_*.py"]
@pytest.fixture(autouse=True, name="mock_scan")

View File

@@ -1,13 +1 @@
"""Tests for the GitHub integration."""
from homeassistant.core import HomeAssistant
from tests.common import MockConfigEntry
async def setup_integration(hass: HomeAssistant, config_entry: MockConfigEntry) -> None:
"""Method for setting up the component."""
config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()

View File

@@ -0,0 +1,53 @@
"""Common helpers for GitHub integration tests."""
from __future__ import annotations
import json
from homeassistant.components.github.const import CONF_REPOSITORIES, DOMAIN
from homeassistant.config_entries import ConfigEntryState
from homeassistant.core import HomeAssistant
from tests.common import MockConfigEntry, async_load_fixture
from tests.test_util.aiohttp import AiohttpClientMocker
MOCK_ACCESS_TOKEN = "gho_16C7e42F292c6912E7710c838347Ae178B4a"
TEST_REPOSITORY = "octocat/Hello-World"
async def setup_github_integration(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
aioclient_mock: AiohttpClientMocker,
add_entry_to_hass: bool = True,
) -> None:
"""Mock setting up the integration."""
headers = json.loads(await async_load_fixture(hass, "base_headers.json", DOMAIN))
for idx, repository in enumerate(mock_config_entry.options[CONF_REPOSITORIES]):
aioclient_mock.get(
f"https://api.github.com/repos/{repository}",
json={
**json.loads(await async_load_fixture(hass, "repository.json", DOMAIN)),
"full_name": repository,
"id": idx,
},
headers=headers,
)
aioclient_mock.get(
f"https://api.github.com/repos/{repository}/events",
json=[],
headers=headers,
)
aioclient_mock.post(
"https://api.github.com/graphql",
json=json.loads(await async_load_fixture(hass, "graphql.json", DOMAIN)),
headers=headers,
)
if add_entry_to_hass:
mock_config_entry.add_to_hass(hass)
setup_result = await hass.config_entries.async_setup(mock_config_entry.entry_id)
await hass.async_block_till_done()
assert setup_result
assert mock_config_entry.state is ConfigEntryState.LOADED

View File

@@ -1,27 +1,18 @@
"""conftest for the GitHub integration."""
import asyncio
from collections.abc import Generator
from unittest.mock import AsyncMock, MagicMock, patch
from unittest.mock import patch
from aiogithubapi import (
GitHubLoginDeviceModel,
GitHubLoginOauthModel,
GitHubRateLimitModel,
)
import pytest
from homeassistant.components.github.const import CONF_REPOSITORIES, DOMAIN
from homeassistant.const import CONF_ACCESS_TOKEN
from homeassistant.core import HomeAssistant
from .const import MOCK_ACCESS_TOKEN, TEST_REPOSITORY
from .common import MOCK_ACCESS_TOKEN, TEST_REPOSITORY, setup_github_integration
from tests.common import (
MockConfigEntry,
async_load_json_object_fixture,
load_json_object_fixture,
)
from tests.common import MockConfigEntry
from tests.test_util.aiohttp import AiohttpClientMocker
@pytest.fixture
@@ -43,93 +34,11 @@ def mock_setup_entry() -> Generator[None]:
@pytest.fixture
def device_activation_event() -> asyncio.Event:
"""Fixture to provide an asyncio event for device activation."""
return asyncio.Event()
@pytest.fixture
def github_device_client(
async def init_integration(
hass: HomeAssistant,
device_activation_event: asyncio.Event,
) -> Generator[AsyncMock]:
"""Mock GitHub device client."""
with patch(
"homeassistant.components.github.config_flow.GitHubDeviceAPI",
autospec=True,
) as github_client_mock:
client = github_client_mock.return_value
register_object = AsyncMock()
register_object.data = GitHubLoginDeviceModel(
load_json_object_fixture("device_register.json", DOMAIN)
)
client.register.return_value = register_object
async def mock_api_device_activation(device_code) -> AsyncMock:
# Simulate the device activation process
await device_activation_event.wait()
activate_object = AsyncMock()
activate_object.data = GitHubLoginOauthModel(
await async_load_json_object_fixture(
hass, "device_activate.json", DOMAIN
)
)
return activate_object
client.activation = mock_api_device_activation
yield client
@pytest.fixture
def github_client(hass: HomeAssistant) -> Generator[AsyncMock]:
"""Mock GitHub device client."""
with (
patch(
"homeassistant.components.github.config_flow.GitHubAPI",
autospec=True,
) as github_client_mock,
patch("homeassistant.components.github.GitHubAPI", new=github_client_mock),
patch(
"homeassistant.components.github.diagnostics.GitHubAPI",
new=github_client_mock,
),
):
client = github_client_mock.return_value
client.user.starred = AsyncMock(
side_effect=[
MagicMock(
is_last_page=False,
next_page_number=2,
last_page_number=2,
data=[MagicMock(full_name="home-assistant/core")],
),
MagicMock(
is_last_page=True,
data=[MagicMock(full_name="home-assistant/frontend")],
),
]
)
client.user.repos = AsyncMock(
side_effect=[
MagicMock(
is_last_page=False,
next_page_number=2,
last_page_number=2,
data=[MagicMock(full_name="home-assistant/operating-system")],
),
MagicMock(
is_last_page=True,
data=[MagicMock(full_name="esphome/esphome")],
),
]
)
rate_limit_mock = AsyncMock()
rate_limit_mock.data = GitHubRateLimitModel(
load_json_object_fixture("rate_limit.json", DOMAIN)
)
client.rate_limit.return_value = rate_limit_mock
graphql_mock = AsyncMock()
graphql_mock.data = load_json_object_fixture("graphql.json", DOMAIN)
client.graphql.return_value = graphql_mock
client.repos.events.subscribe = AsyncMock()
yield client
mock_config_entry: MockConfigEntry,
aioclient_mock: AiohttpClientMocker,
) -> MockConfigEntry:
"""Set up the GitHub integration for testing."""
await setup_github_integration(hass, mock_config_entry, aioclient_mock)
return mock_config_entry

View File

@@ -1,4 +0,0 @@
"""Constants for GitHub integration tests."""
MOCK_ACCESS_TOKEN = "gho_16C7e42F292c6912E7710c838347Ae178B4a"
TEST_REPOSITORY = "octocat/Hello-World"

View File

@@ -0,0 +1,29 @@
{
"Server": "GitHub.com",
"Date": "Mon, 1 Jan 1970 00:00:00 GMT",
"Content-Type": "application/json; charset=utf-8",
"Transfer-Encoding": "chunked",
"Cache-Control": "private, max-age=60, s-maxage=60",
"Vary": "Accept, Authorization, Cookie, X-GitHub-OTP",
"Etag": "W/\"1234567890abcdefghijklmnopqrstuvwxyz\"",
"X-OAuth-Scopes": "",
"X-Accepted-OAuth-Scopes": "",
"github-authentication-token-expiration": "1970-01-01 01:00:00 UTC",
"X-GitHub-Media-Type": "github.v3; param=raw; format=json",
"X-RateLimit-Limit": "5000",
"X-RateLimit-Remaining": "4999",
"X-RateLimit-Reset": "1",
"X-RateLimit-Used": "1",
"X-RateLimit-Resource": "core",
"Access-Control-Expose-Headers": "ETag, Link, Location, Retry-After, X-GitHub-OTP, X-RateLimit-Limit, X-RateLimit-Remaining, X-RateLimit-Used, X-RateLimit-Resource, X-RateLimit-Reset, X-OAuth-Scopes, X-Accepted-OAuth-Scopes, X-Poll-Interval, X-GitHub-Media-Type, Deprecation, Sunset",
"Access-Control-Allow-Origin": "*",
"Strict-Transport-Security": "max-age=31536000; includeSubdomains; preload",
"X-Frame-Options": "deny",
"X-Content-Type-Options": "nosniff",
"X-XSS-Protection": "0",
"Referrer-Policy": "origin-when-cross-origin, strict-origin-when-cross-origin",
"Content-Security-Policy": "default-src 'none'",
"Content-Encoding": "gzip",
"Permissions-Policy": "",
"X-GitHub-Request-Id": "12A3:45BC:6D7890:12EF34:5678G901"
}

View File

@@ -1,5 +0,0 @@
{
"access_token": "gho_16C7e42F292c6912E7710c838347Ae178B4a",
"token_type": "bearer",
"scope": ""
}

View File

@@ -1,7 +0,0 @@
{
"device_code": "3584d83530557fdd1f46af8289938c8ef79f9dc5",
"user_code": "WDJB-MJHT",
"verification_uri": "https://github.com/login/device",
"expires_in": 900,
"interval": 5
}

View File

@@ -1 +0,0 @@
{ "resources": { "core": { "remaining": 100, "limit": 100 } } }

View File

@@ -1,100 +1,146 @@
"""Test the GitHub config flow."""
import asyncio
from unittest.mock import AsyncMock, MagicMock
from unittest.mock import AsyncMock, MagicMock, patch
from aiogithubapi import GitHubException
from freezegun.api import FrozenDateTimeFactory
import pytest
from homeassistant import config_entries
from homeassistant.components.github.config_flow import get_repositories
from homeassistant.components.github.const import (
CONF_REPOSITORIES,
DEFAULT_REPOSITORIES,
DOMAIN,
)
from homeassistant.config_entries import SOURCE_USER
from homeassistant.const import CONF_ACCESS_TOKEN
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResultType, UnknownFlow
from .const import MOCK_ACCESS_TOKEN
from .common import MOCK_ACCESS_TOKEN
from tests.common import MockConfigEntry
from tests.test_util.aiohttp import AiohttpClientMocker
async def test_full_user_flow_implementation(
hass: HomeAssistant,
mock_setup_entry: None,
github_device_client: AsyncMock,
github_client: AsyncMock,
device_activation_event: asyncio.Event,
aioclient_mock: AiohttpClientMocker,
freezer: FrozenDateTimeFactory,
) -> None:
"""Test the full manual user flow from start to finish."""
aioclient_mock.post(
"https://github.com/login/device/code",
json={
"device_code": "3584d83530557fdd1f46af8289938c8ef79f9dc5",
"user_code": "WDJB-MJHT",
"verification_uri": "https://github.com/login/device",
"expires_in": 900,
"interval": 5,
},
headers={"Content-Type": "application/json"},
)
# User has not yet entered the code
aioclient_mock.post(
"https://github.com/login/oauth/access_token",
json={"error": "authorization_pending"},
headers={"Content-Type": "application/json"},
)
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
DOMAIN,
context={"source": config_entries.SOURCE_USER},
)
assert result["step_id"] == "device"
assert result["type"] is FlowResultType.SHOW_PROGRESS
device_activation_event.set()
# User enters the code
aioclient_mock.clear_requests()
aioclient_mock.post(
"https://github.com/login/oauth/access_token",
json={
CONF_ACCESS_TOKEN: MOCK_ACCESS_TOKEN,
"token_type": "bearer",
"scope": "",
},
headers={"Content-Type": "application/json"},
)
freezer.tick(10)
await hass.async_block_till_done()
result = await hass.config_entries.flow.async_configure(result["flow_id"])
assert result["step_id"] == "repositories"
assert result["type"] is FlowResultType.FORM
assert not result["errors"]
schema = result["data_schema"]
repositories = schema.schema[CONF_REPOSITORIES].options
assert len(repositories) == 4
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input={CONF_REPOSITORIES: DEFAULT_REPOSITORIES}
result["flow_id"],
user_input={
CONF_REPOSITORIES: DEFAULT_REPOSITORIES,
},
)
assert result["title"] == ""
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["data"] == {CONF_ACCESS_TOKEN: MOCK_ACCESS_TOKEN}
assert result["options"] == {CONF_REPOSITORIES: DEFAULT_REPOSITORIES}
assert "data" in result
assert result["data"][CONF_ACCESS_TOKEN] == MOCK_ACCESS_TOKEN
assert "options" in result
assert result["options"][CONF_REPOSITORIES] == DEFAULT_REPOSITORIES
async def test_flow_with_registration_failure(
hass: HomeAssistant,
github_device_client: AsyncMock,
aioclient_mock: AiohttpClientMocker,
) -> None:
"""Test flow with registration failure of the device."""
github_device_client.register.side_effect = GitHubException("Registration failed")
aioclient_mock.post(
"https://github.com/login/device/code",
exc=GitHubException("Registration failed"),
)
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
DOMAIN,
context={"source": config_entries.SOURCE_USER},
)
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "could_not_register"
assert result.get("reason") == "could_not_register"
async def test_flow_with_activation_failure(
hass: HomeAssistant,
github_device_client: AsyncMock,
device_activation_event: asyncio.Event,
aioclient_mock: AiohttpClientMocker,
freezer: FrozenDateTimeFactory,
) -> None:
"""Test flow with activation failure of the device."""
async def mock_api_device_activation(device_code) -> None:
# Simulate the device activation process
await device_activation_event.wait()
raise GitHubException("Activation failed")
github_device_client.activation = mock_api_device_activation
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
aioclient_mock.post(
"https://github.com/login/device/code",
json={
"device_code": "3584d83530557fdd1f46af8289938c8ef79f9dc5",
"user_code": "WDJB-MJHT",
"verification_uri": "https://github.com/login/device",
"expires_in": 900,
"interval": 5,
},
headers={"Content-Type": "application/json"},
)
# User has not yet entered the code
aioclient_mock.post(
"https://github.com/login/oauth/access_token",
json={"error": "authorization_pending"},
headers={"Content-Type": "application/json"},
)
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_USER},
)
assert result["step_id"] == "device"
assert result["type"] is FlowResultType.SHOW_PROGRESS
device_activation_event.set()
# Activation fails
aioclient_mock.clear_requests()
aioclient_mock.post(
"https://github.com/login/oauth/access_token",
exc=GitHubException("Activation failed"),
)
freezer.tick(10)
await hass.async_block_till_done()
result = await hass.config_entries.flow.async_configure(result["flow_id"])
@@ -103,14 +149,30 @@ async def test_flow_with_activation_failure(
async def test_flow_with_remove_while_activating(
hass: HomeAssistant, github_device_client: AsyncMock
hass: HomeAssistant,
aioclient_mock: AiohttpClientMocker,
) -> None:
"""Test flow with user canceling while activating."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
aioclient_mock.post(
"https://github.com/login/device/code",
json={
"device_code": "3584d83530557fdd1f46af8289938c8ef79f9dc5",
"user_code": "WDJB-MJHT",
"verification_uri": "https://github.com/login/device",
"expires_in": 900,
"interval": 5,
},
headers={"Content-Type": "application/json"},
)
aioclient_mock.post(
"https://github.com/login/oauth/access_token",
json={"error": "authorization_pending"},
headers={"Content-Type": "application/json"},
)
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_USER},
)
assert result["step_id"] == "device"
assert result["type"] is FlowResultType.SHOW_PROGRESS
@@ -132,88 +194,84 @@ async def test_already_configured(
mock_config_entry.add_to_hass(hass)
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
DOMAIN,
context={"source": config_entries.SOURCE_USER},
)
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "already_configured"
assert result.get("reason") == "already_configured"
async def test_no_repositories(
hass: HomeAssistant,
mock_setup_entry: None,
github_device_client: AsyncMock,
github_client: AsyncMock,
device_activation_event: asyncio.Event,
) -> None:
"""Test the full manual user flow from start to finish."""
async def test_starred_pagination_with_paginated_result(hass: HomeAssistant) -> None:
"""Test pagination of starred repositories with paginated result."""
with patch(
"homeassistant.components.github.config_flow.GitHubAPI",
return_value=MagicMock(
user=MagicMock(
starred=AsyncMock(
return_value=MagicMock(
is_last_page=False,
next_page_number=2,
last_page_number=2,
data=[MagicMock(full_name="home-assistant/core")],
)
),
repos=AsyncMock(
return_value=MagicMock(
is_last_page=False,
next_page_number=2,
last_page_number=2,
data=[MagicMock(full_name="awesome/reposiotry")],
)
),
)
),
):
repos = await get_repositories(hass, MOCK_ACCESS_TOKEN)
github_client.user.repos.side_effect = [MagicMock(is_last_page=True, data=[])]
github_client.user.starred.side_effect = [MagicMock(is_last_page=True, data=[])]
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["step_id"] == "device"
assert result["type"] is FlowResultType.SHOW_PROGRESS
device_activation_event.set()
await hass.async_block_till_done()
result = await hass.config_entries.flow.async_configure(result["flow_id"])
assert result["step_id"] == "repositories"
assert result["type"] is FlowResultType.FORM
assert not result["errors"]
schema = result["data_schema"]
repositories = schema.schema[CONF_REPOSITORIES].options
assert len(repositories) == 2
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input={CONF_REPOSITORIES: DEFAULT_REPOSITORIES}
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert len(repos) == 2
assert repos[-1] == DEFAULT_REPOSITORIES[0]
async def test_exception_during_repository_fetch(
hass: HomeAssistant,
mock_setup_entry: None,
github_device_client: AsyncMock,
github_client: AsyncMock,
device_activation_event: asyncio.Event,
) -> None:
"""Test the full manual user flow from start to finish."""
async def test_starred_pagination_with_no_starred(hass: HomeAssistant) -> None:
"""Test pagination of starred repositories with no starred."""
with patch(
"homeassistant.components.github.config_flow.GitHubAPI",
return_value=MagicMock(
user=MagicMock(
starred=AsyncMock(
return_value=MagicMock(
is_last_page=True,
data=[],
)
),
repos=AsyncMock(
return_value=MagicMock(
is_last_page=True,
data=[],
)
),
)
),
):
repos = await get_repositories(hass, MOCK_ACCESS_TOKEN)
github_client.user.repos.side_effect = GitHubException()
assert len(repos) == 2
assert repos == DEFAULT_REPOSITORIES
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["step_id"] == "device"
assert result["type"] is FlowResultType.SHOW_PROGRESS
async def test_starred_pagination_with_exception(hass: HomeAssistant) -> None:
"""Test pagination of starred repositories with exception."""
with patch(
"homeassistant.components.github.config_flow.GitHubAPI",
return_value=MagicMock(
user=MagicMock(starred=AsyncMock(side_effect=GitHubException("Error")))
),
):
repos = await get_repositories(hass, MOCK_ACCESS_TOKEN)
device_activation_event.set()
await hass.async_block_till_done()
result = await hass.config_entries.flow.async_configure(result["flow_id"])
assert result["step_id"] == "repositories"
assert result["type"] is FlowResultType.FORM
assert not result["errors"]
schema = result["data_schema"]
repositories = schema.schema[CONF_REPOSITORIES].options
assert len(repositories) == 2
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input={CONF_REPOSITORIES: DEFAULT_REPOSITORIES}
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert len(repos) == 2
assert repos == DEFAULT_REPOSITORIES
async def test_options_flow(

View File

@@ -1,56 +1,89 @@
"""Test GitHub diagnostics."""
from unittest.mock import AsyncMock
import json
from aiogithubapi import GitHubException
import pytest
from homeassistant.components.github.const import CONF_REPOSITORIES, DOMAIN
from homeassistant.core import HomeAssistant
from . import setup_integration
from .common import setup_github_integration
from tests.common import MockConfigEntry
from tests.common import MockConfigEntry, async_load_fixture
from tests.components.diagnostics import get_diagnostics_for_config_entry
from tests.test_util.aiohttp import AiohttpClientMocker
from tests.typing import ClientSessionGenerator
# This tests needs to be adjusted to remove lingering tasks
@pytest.mark.parametrize("expected_lingering_tasks", [True])
async def test_entry_diagnostics(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
mock_config_entry: MockConfigEntry,
github_client: AsyncMock,
aioclient_mock: AiohttpClientMocker,
) -> None:
"""Test config entry diagnostics."""
await setup_integration(hass, mock_config_entry)
mock_config_entry.add_to_hass(hass)
hass.config_entries.async_update_entry(
mock_config_entry,
options={CONF_REPOSITORIES: ["home-assistant/core"]},
)
response_json = json.loads(await async_load_fixture(hass, "graphql.json", DOMAIN))
response_json["data"]["repository"]["full_name"] = "home-assistant/core"
aioclient_mock.post(
"https://api.github.com/graphql",
json=response_json,
headers=json.loads(await async_load_fixture(hass, "base_headers.json", DOMAIN)),
)
aioclient_mock.get(
"https://api.github.com/rate_limit",
json={"resources": {"core": {"remaining": 100, "limit": 100}}},
headers={"Content-Type": "application/json"},
)
await setup_github_integration(
hass, mock_config_entry, aioclient_mock, add_entry_to_hass=False
)
result = await get_diagnostics_for_config_entry(
hass,
hass_client,
mock_config_entry,
)
assert result["options"]["repositories"] == ["octocat/Hello-World"]
assert result["options"]["repositories"] == ["home-assistant/core"]
assert result["rate_limit"] == {
"resources": {"core": {"remaining": 100, "limit": 100}}
}
assert (
result["repositories"]["octocat/Hello-World"]["full_name"]
== "octocat/Hello-World"
result["repositories"]["home-assistant/core"]["full_name"]
== "home-assistant/core"
)
# This tests needs to be adjusted to remove lingering tasks
@pytest.mark.parametrize("expected_lingering_tasks", [True])
async def test_entry_diagnostics_exception(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
mock_config_entry: MockConfigEntry,
github_client: AsyncMock,
init_integration: MockConfigEntry,
aioclient_mock: AiohttpClientMocker,
) -> None:
"""Test config entry diagnostics with exception for ratelimit."""
await setup_integration(hass, mock_config_entry)
github_client.rate_limit.side_effect = GitHubException("error")
aioclient_mock.get(
"https://api.github.com/rate_limit",
exc=GitHubException("error"),
)
result = await get_diagnostics_for_config_entry(
hass,
hass_client,
mock_config_entry,
init_integration,
)
assert result["rate_limit"]["error"] == "error"
assert (
result["rate_limit"]["error"]
== "Unexpected exception for 'https://api.github.com/rate_limit' with - error"
)

View File

@@ -1,23 +1,24 @@
"""Test the GitHub init file."""
from unittest.mock import AsyncMock
import pytest
from homeassistant.components.github import CONF_REPOSITORIES
from homeassistant.core import HomeAssistant
from homeassistant.helpers import device_registry as dr, entity_registry as er, icon
from . import setup_integration
from .common import setup_github_integration
from tests.common import MockConfigEntry
from tests.test_util.aiohttp import AiohttpClientMocker
# This tests needs to be adjusted to remove lingering tasks
@pytest.mark.parametrize("expected_lingering_tasks", [True])
async def test_device_registry_cleanup(
hass: HomeAssistant,
device_registry: dr.DeviceRegistry,
mock_config_entry: MockConfigEntry,
github_client: AsyncMock,
aioclient_mock: AiohttpClientMocker,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test that we remove untracked repositories from the device registry."""
@@ -26,7 +27,9 @@ async def test_device_registry_cleanup(
mock_config_entry,
options={CONF_REPOSITORIES: ["home-assistant/core"]},
)
await setup_integration(hass, mock_config_entry)
await setup_github_integration(
hass, mock_config_entry, aioclient_mock, add_entry_to_hass=False
)
devices = dr.async_entries_for_config_entry(
registry=device_registry,
@@ -55,10 +58,12 @@ async def test_device_registry_cleanup(
assert len(devices) == 0
# This tests needs to be adjusted to remove lingering tasks
@pytest.mark.parametrize("expected_lingering_tasks", [True])
async def test_subscription_setup(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
github_client: AsyncMock,
aioclient_mock: AiohttpClientMocker,
) -> None:
"""Test that we setup event subscription."""
mock_config_entry.add_to_hass(hass)
@@ -67,14 +72,21 @@ async def test_subscription_setup(
options={CONF_REPOSITORIES: ["home-assistant/core"]},
pref_disable_polling=False,
)
await setup_integration(hass, mock_config_entry)
github_client.repos.events.subscribe.assert_called_once()
await setup_github_integration(
hass, mock_config_entry, aioclient_mock, add_entry_to_hass=False
)
assert (
"https://api.github.com/repos/home-assistant/core/events" in x[1]
for x in aioclient_mock.mock_calls
)
# This tests needs to be adjusted to remove lingering tasks
@pytest.mark.parametrize("expected_lingering_tasks", [True])
async def test_subscription_setup_polling_disabled(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
github_client: AsyncMock,
aioclient_mock: AiohttpClientMocker,
) -> None:
"""Test that we do not setup event subscription if polling is disabled."""
mock_config_entry.add_to_hass(hass)
@@ -83,8 +95,13 @@ async def test_subscription_setup_polling_disabled(
options={CONF_REPOSITORIES: ["home-assistant/core"]},
pref_disable_polling=True,
)
await setup_integration(hass, mock_config_entry)
github_client.repos.events.subscribe.assert_not_called()
await setup_github_integration(
hass, mock_config_entry, aioclient_mock, add_entry_to_hass=False
)
assert (
"https://api.github.com/repos/home-assistant/core/events" not in x[1]
for x in aioclient_mock.mock_calls
)
# Prove that we subscribed if the user enabled polling again
hass.config_entries.async_update_entry(
@@ -92,20 +109,23 @@ async def test_subscription_setup_polling_disabled(
)
assert await hass.config_entries.async_reload(mock_config_entry.entry_id)
await hass.async_block_till_done()
github_client.repos.events.subscribe.assert_called_once()
assert (
"https://api.github.com/repos/home-assistant/core/events" in x[1]
for x in aioclient_mock.mock_calls
)
# This tests needs to be adjusted to remove lingering tasks
@pytest.mark.parametrize("expected_lingering_tasks", [True])
async def test_sensor_icons(
hass: HomeAssistant,
github_client: AsyncMock,
mock_config_entry: MockConfigEntry,
init_integration: MockConfigEntry,
entity_registry: er.EntityRegistry,
) -> None:
"""Test to ensure that all sensor entities have an icon definition."""
await setup_integration(hass, mock_config_entry)
entities = er.async_entries_for_config_entry(
entity_registry,
config_entry_id=mock_config_entry.entry_id,
config_entry_id=init_integration.entry_id,
)
icons = await icon.async_get_icons(hass, "entity", integrations=["github"])

View File

@@ -1,36 +1,50 @@
"""Test GitHub sensor."""
from unittest.mock import AsyncMock
import json
from freezegun.api import FrozenDateTimeFactory
import pytest
from homeassistant.components.github.const import FALLBACK_UPDATE_INTERVAL
from homeassistant.const import STATE_UNAVAILABLE
from homeassistant.components.github.const import DOMAIN, FALLBACK_UPDATE_INTERVAL
from homeassistant.core import HomeAssistant
from homeassistant.util import dt as dt_util
from . import setup_integration
from .common import TEST_REPOSITORY
from tests.common import MockConfigEntry, async_fire_time_changed
from tests.common import MockConfigEntry, async_fire_time_changed, async_load_fixture
from tests.test_util.aiohttp import AiohttpClientMocker
TEST_SENSOR_ENTITY = "sensor.octocat_hello_world_latest_release"
# This tests needs to be adjusted to remove lingering tasks
@pytest.mark.parametrize("expected_lingering_tasks", [True])
async def test_sensor_updates_with_empty_release_array(
hass: HomeAssistant,
github_client: AsyncMock,
mock_config_entry: MockConfigEntry,
freezer: FrozenDateTimeFactory,
init_integration: MockConfigEntry,
aioclient_mock: AiohttpClientMocker,
) -> None:
"""Test the sensor updates by default GitHub sensors."""
await setup_integration(hass, mock_config_entry)
state = hass.states.get(TEST_SENSOR_ENTITY)
assert state.state == "v1.0.0"
github_client.graphql.return_value.data["data"]["repository"]["release"] = None
response_json = json.loads(await async_load_fixture(hass, "graphql.json", DOMAIN))
response_json["data"]["repository"]["release"] = None
headers = json.loads(await async_load_fixture(hass, "base_headers.json", DOMAIN))
freezer.tick(FALLBACK_UPDATE_INTERVAL)
async_fire_time_changed(hass)
aioclient_mock.clear_requests()
aioclient_mock.get(
f"https://api.github.com/repos/{TEST_REPOSITORY}/events",
json=[],
headers=headers,
)
aioclient_mock.post(
"https://api.github.com/graphql",
json=response_json,
headers=headers,
)
async_fire_time_changed(hass, dt_util.utcnow() + FALLBACK_UPDATE_INTERVAL)
await hass.async_block_till_done()
new_state = hass.states.get(TEST_SENSOR_ENTITY)
assert new_state.state == STATE_UNAVAILABLE
assert new_state.state == "unavailable"

View File

@@ -484,14 +484,14 @@
'object_id_base': 'Ozone',
'options': dict({
}),
'original_device_class': <SensorDeviceClass.OZONE: 'ozone'>,
'original_device_class': None,
'original_icon': None,
'original_name': 'Ozone',
'platform': 'google_air_quality',
'previous_unique_id': None,
'suggested_object_id': None,
'supported_features': 0,
'translation_key': None,
'translation_key': 'ozone',
'unique_id': 'o3_10.1_20.1',
'unit_of_measurement': 'ppb',
})
@@ -500,7 +500,6 @@
StateSnapshot({
'attributes': ReadOnlyDict({
'attribution': 'Data provided by Google Air Quality',
'device_class': 'ozone',
'friendly_name': 'Home Ozone',
'state_class': <SensorStateClass.MEASUREMENT: 'measurement'>,
'unit_of_measurement': 'ppb',

View File

@@ -57,7 +57,6 @@ def mock_aiontfy() -> Generator[AsyncMock]:
actions=[],
attachment=None,
content_type=None,
sequence_id="Mc3otamDNcpJ",
)
resp.to_dict.return_value = {
@@ -75,7 +74,6 @@ def mock_aiontfy() -> Generator[AsyncMock]:
"actions": [],
"attachment": None,
"content_type": None,
"sequence_id": "Mc3otamDNcpJ",
}
async def mock_ws(

View File

@@ -59,7 +59,6 @@
'id': 'h6Y2hKA5sy0U',
'message': 'Hello',
'priority': 3,
'sequence_id': 'Mc3otamDNcpJ',
'tags': list([
'octopus',
]),

View File

@@ -101,7 +101,6 @@ async def test_event(
"time": datetime(2025, 3, 28, 17, 58, 46, tzinfo=UTC),
"title": "Title",
"topic": "mytopic",
"sequence_id": "Mc3otamDNcpJ",
}

View File

@@ -22,7 +22,6 @@ from homeassistant.components.ntfy.notify import (
ATTR_ICON,
ATTR_MARKDOWN,
ATTR_PRIORITY,
ATTR_SEQUENCE_ID,
ATTR_TAGS,
SERVICE_PUBLISH,
)
@@ -61,7 +60,6 @@ async def test_ntfy_publish(
ATTR_MARKDOWN: True,
ATTR_PRIORITY: "5",
ATTR_TAGS: ["partying_face", "grin"],
ATTR_SEQUENCE_ID: "Mc3otamDNcpJ",
},
blocking=True,
)
@@ -78,7 +76,6 @@ async def test_ntfy_publish(
markdown=True,
icon=URL("https://example.org/logo.png"),
delay="86430.0s",
sequence_id="Mc3otamDNcpJ",
)
)

View File

@@ -52,13 +52,10 @@ def make_test_trigger(*entities: str) -> dict:
async def async_trigger(
hass: HomeAssistant,
entity_id: str,
state: str | None = None,
attributes: dict | None = None,
hass: HomeAssistant, entity_id: str, state: str | None = None
) -> None:
"""Trigger a state change."""
hass.states.async_set(entity_id, state, attributes)
hass.states.async_set(entity_id, state)
await hass.async_block_till_done()

View File

@@ -236,8 +236,8 @@ BINARY_SENSOR_OPTIONS = {
"on",
{"one": "on", "two": "off"},
{},
{"options": "{{ ['off', 'on', 'auto'] }}", "select_option": []},
{"options": "{{ ['off', 'on', 'auto'] }}", "select_option": []},
{"options": "{{ ['off', 'on', 'auto'] }}"},
{"options": "{{ ['off', 'on', 'auto'] }}"},
{},
),
(
@@ -458,8 +458,8 @@ async def test_config_flow(
(
"select",
{"state": "{{ states('select.one') }}"},
{"options": "{{ ['off', 'on', 'auto'] }}", "select_option": []},
{"options": "{{ ['off', 'on', 'auto'] }}", "select_option": []},
{"options": "{{ ['off', 'on', 'auto'] }}"},
{"options": "{{ ['off', 'on', 'auto'] }}"},
),
(
"update",
@@ -734,8 +734,8 @@ async def test_config_flow_device(
{"state": "{{ states('select.two') }}"},
["on", "off"],
{"one": "on", "two": "off"},
{"options": "{{ ['off', 'on', 'auto'] }}", "select_option": []},
{"options": "{{ ['off', 'on', 'auto'] }}", "select_option": []},
{"options": "{{ ['off', 'on', 'auto'] }}"},
{"options": "{{ ['off', 'on', 'auto'] }}"},
"state",
),
(
@@ -1606,8 +1606,8 @@ async def test_option_flow_sensor_preview_config_entry_removed(
(
"select",
{"state": "{{ states('select.one') }}"},
{"options": "{{ ['off', 'on', 'auto'] }}", "select_option": []},
{"options": "{{ ['off', 'on', 'auto'] }}", "select_option": []},
{"options": "{{ ['off', 'on', 'auto'] }}"},
{"options": "{{ ['off', 'on', 'auto'] }}"},
),
(
"switch",

View File

@@ -405,12 +405,10 @@ async def async_yaml_patch_helper(hass: HomeAssistant, filename: str) -> None:
"name": "My template",
"state": "{{ 'on' }}",
"options": "{{ ['off', 'on', 'auto'] }}",
"select_option": [],
},
{
"state": "{{ 'on' }}",
"options": "{{ ['off', 'on', 'auto'] }}",
"select_option": [],
},
),
(

View File

@@ -5,7 +5,13 @@ from typing import Any
import pytest
from syrupy.assertion import SnapshotAssertion
from homeassistant.components import number
from homeassistant import setup
from homeassistant.components import number, template
from homeassistant.components.input_number import (
ATTR_VALUE as INPUT_NUMBER_ATTR_VALUE,
DOMAIN as INPUT_NUMBER_DOMAIN,
SERVICE_SET_VALUE as INPUT_NUMBER_SERVICE_SET_VALUE,
)
from homeassistant.components.number import (
ATTR_MAX,
ATTR_MIN,
@@ -26,60 +32,104 @@ from homeassistant.const import (
STATE_UNAVAILABLE,
STATE_UNKNOWN,
)
from homeassistant.core import HomeAssistant, ServiceCall
from homeassistant.core import Context, HomeAssistant, ServiceCall
from homeassistant.helpers import device_registry as dr, entity_registry as er
from homeassistant.setup import async_setup_component
from .conftest import (
ConfigurationStyle,
TemplatePlatformSetup,
async_get_flow_preview_state,
async_trigger,
make_test_trigger,
setup_and_test_nested_unique_id,
setup_and_test_unique_id,
setup_entity,
)
from .conftest import ConfigurationStyle, async_get_flow_preview_state
from tests.common import MockConfigEntry
from tests.common import MockConfigEntry, assert_setup_component, async_capture_events
from tests.typing import WebSocketGenerator
TEST_AVAILABILITY_ENTITY_ID = "binary_sensor.test_availability"
TEST_MAXIMUM_ENTITY_ID = "sensor.maximum"
TEST_MINIMUM_ENTITY_ID = "sensor.minimum"
_TEST_OBJECT_ID = "template_number"
_TEST_NUMBER = f"number.{_TEST_OBJECT_ID}"
# Represent for number's value
_VALUE_INPUT_NUMBER = "input_number.value"
# Represent for number's minimum
_MINIMUM_INPUT_NUMBER = "input_number.minimum"
# Represent for number's maximum
_MAXIMUM_INPUT_NUMBER = "input_number.maximum"
# Represent for number's step
_STEP_INPUT_NUMBER = "input_number.step"
# Config for `_VALUE_INPUT_NUMBER`
_VALUE_INPUT_NUMBER_CONFIG = {
"value": {
"min": 0.0,
"max": 100.0,
"name": "Value",
"step": 1.0,
"mode": "slider",
}
}
TEST_STATE_ENTITY_ID = "number.test_state"
TEST_STEP_ENTITY_ID = "sensor.step"
TEST_NUMBER = TemplatePlatformSetup(
number.DOMAIN,
None,
"template_number",
make_test_trigger(
TEST_AVAILABILITY_ENTITY_ID,
TEST_MAXIMUM_ENTITY_ID,
TEST_MINIMUM_ENTITY_ID,
TEST_STATE_ENTITY_ID,
TEST_STEP_ENTITY_ID,
),
)
TEST_SET_VALUE_ACTION = {
"action": "test.automation",
"data": {
"action": "set_value",
"caller": "{{ this.entity_id }}",
"value": "{{ value }}",
TEST_AVAILABILITY_ENTITY_ID = "binary_sensor.test_availability"
TEST_STATE_TRIGGER = {
"trigger": {
"trigger": "state",
"entity_id": [TEST_STATE_ENTITY_ID, TEST_AVAILABILITY_ENTITY_ID],
},
"variables": {"triggering_entity": "{{ trigger.entity_id }}"},
"action": [
{"event": "action_event", "event_data": {"what": "{{ triggering_entity }}"}}
],
}
TEST_REQUIRED = {"state": "0", "step": "1", "set_value": []}
async def async_setup_modern_format(
hass: HomeAssistant, count: int, number_config: dict[str, Any]
) -> None:
"""Do setup of number integration via new format."""
config = {"template": {"number": number_config}}
with assert_setup_component(count, template.DOMAIN):
assert await async_setup_component(
hass,
template.DOMAIN,
config,
)
await hass.async_block_till_done()
await hass.async_start()
await hass.async_block_till_done()
async def async_setup_trigger_format(
hass: HomeAssistant, count: int, number_config: dict[str, Any]
) -> None:
"""Do setup of number integration via trigger format."""
config = {"template": {**TEST_STATE_TRIGGER, "number": number_config}}
with assert_setup_component(count, template.DOMAIN):
assert await async_setup_component(
hass,
template.DOMAIN,
config,
)
await hass.async_block_till_done()
await hass.async_start()
await hass.async_block_till_done()
@pytest.fixture
async def setup_number(
hass: HomeAssistant,
count: int,
style: ConfigurationStyle,
config: dict[str, Any],
number_config: dict[str, Any],
) -> None:
"""Do setup of number integration."""
await setup_entity(hass, TEST_NUMBER, style, count, config)
if style == ConfigurationStyle.MODERN:
await async_setup_modern_format(
hass, count, {"name": _TEST_OBJECT_ID, **number_config}
)
if style == ConfigurationStyle.TRIGGER:
await async_setup_trigger_format(
hass, count, {"name": _TEST_OBJECT_ID, **number_config}
)
async def test_setup_config_entry(
@@ -116,135 +166,294 @@ async def test_setup_config_entry(
assert state == snapshot
@pytest.mark.parametrize(
("count", "config"),
[
(
1,
{
"state": "{{ 4 }}",
"set_value": {"service": "script.set_value"},
"step": "{{ 1 }}",
},
)
],
)
@pytest.mark.parametrize(
"style",
[ConfigurationStyle.MODERN, ConfigurationStyle.TRIGGER],
)
@pytest.mark.usefixtures("setup_number")
async def test_missing_optional_config(hass: HomeAssistant) -> None:
"""Test: missing optional template is ok."""
await async_trigger(hass, TEST_STATE_ENTITY_ID, "anything")
with assert_setup_component(1, "template"):
assert await setup.async_setup_component(
hass,
"template",
{
"template": {
"number": {
"state": "{{ 4 }}",
"set_value": {"service": "script.set_value"},
"step": "{{ 1 }}",
}
}
},
)
await hass.async_block_till_done()
await hass.async_start()
await hass.async_block_till_done()
_verify(hass, 4, 1, 0.0, 100.0, None)
@pytest.mark.parametrize(
("count", "config"),
[
(
0,
{
"state": "{{ 4 }}",
},
)
],
)
@pytest.mark.parametrize(
"style",
[ConfigurationStyle.MODERN, ConfigurationStyle.TRIGGER],
)
@pytest.mark.usefixtures("setup_number")
async def test_missing_required_keys(hass: HomeAssistant) -> None:
"""Test: missing required fields will fail."""
with assert_setup_component(0, "template"):
assert await setup.async_setup_component(
hass,
"template",
{
"template": {
"number": {
"state": "{{ 4 }}",
}
}
},
)
await hass.async_block_till_done()
await hass.async_start()
await hass.async_block_till_done()
assert hass.states.async_all("number") == []
@pytest.mark.parametrize(
("count", "config"),
[
(
1,
{
"state": "{{ 4 }}",
"set_value": {"service": "script.set_value"},
"min": "{{ 3 }}",
"max": "{{ 5 }}",
"step": "{{ 1 }}",
"unit_of_measurement": "beer",
},
)
],
)
@pytest.mark.parametrize(
"style",
[ConfigurationStyle.MODERN, ConfigurationStyle.TRIGGER],
)
@pytest.mark.usefixtures("setup_number")
async def test_all_optional_config(hass: HomeAssistant) -> None:
"""Test: including all optional templates is ok."""
await async_trigger(hass, TEST_STATE_ENTITY_ID, "anything")
with assert_setup_component(1, "template"):
assert await setup.async_setup_component(
hass,
"template",
{
"template": {
"number": {
"state": "{{ 4 }}",
"set_value": {"service": "script.set_value"},
"min": "{{ 3 }}",
"max": "{{ 5 }}",
"step": "{{ 1 }}",
"unit_of_measurement": "beer",
}
}
},
)
await hass.async_block_till_done()
await hass.async_start()
await hass.async_block_till_done()
_verify(hass, 4, 1, 3, 5, "beer")
@pytest.mark.parametrize(
("count", "config"),
[
(
1,
{
"state": f"{{{{ states('{TEST_STATE_ENTITY_ID}') | float(1.0) }}}}",
"step": f"{{{{ states('{TEST_STEP_ENTITY_ID}') | float(5.0) }}}}",
"min": f"{{{{ states('{TEST_MINIMUM_ENTITY_ID}') | float(0.0) }}}}",
"max": f"{{{{ states('{TEST_MAXIMUM_ENTITY_ID}') | float(100.0) }}}}",
"set_value": [TEST_SET_VALUE_ACTION],
},
)
],
)
@pytest.mark.parametrize(
"style",
[ConfigurationStyle.MODERN, ConfigurationStyle.TRIGGER],
)
@pytest.mark.usefixtures("setup_number")
async def test_template_number(
async def test_templates_with_entities(
hass: HomeAssistant, entity_registry: er.EntityRegistry, calls: list[ServiceCall]
) -> None:
"""Test templates with values from other entities."""
await async_trigger(hass, TEST_STATE_ENTITY_ID, 4)
await async_trigger(hass, TEST_STEP_ENTITY_ID, 1)
await async_trigger(hass, TEST_MINIMUM_ENTITY_ID, 3)
await async_trigger(hass, TEST_MAXIMUM_ENTITY_ID, 5)
with assert_setup_component(4, "input_number"):
assert await setup.async_setup_component(
hass,
"input_number",
{
"input_number": {
**_VALUE_INPUT_NUMBER_CONFIG,
"step": {
"min": 0.0,
"max": 100.0,
"name": "Step",
"step": 1.0,
"mode": "slider",
},
"minimum": {
"min": 0.0,
"max": 100.0,
"name": "Minimum",
"step": 1.0,
"mode": "slider",
},
"maximum": {
"min": 0.0,
"max": 100.0,
"name": "Maximum",
"step": 1.0,
"mode": "slider",
},
}
},
)
with assert_setup_component(1, "template"):
assert await setup.async_setup_component(
hass,
"template",
{
"template": {
"unique_id": "b",
"number": {
"state": f"{{{{ states('{_VALUE_INPUT_NUMBER}') }}}}",
"step": f"{{{{ states('{_STEP_INPUT_NUMBER}') }}}}",
"min": f"{{{{ states('{_MINIMUM_INPUT_NUMBER}') }}}}",
"max": f"{{{{ states('{_MAXIMUM_INPUT_NUMBER}') }}}}",
"set_value": [
{
"service": "input_number.set_value",
"data_template": {
"entity_id": _VALUE_INPUT_NUMBER,
"value": "{{ value }}",
},
},
{
"service": "test.automation",
"data_template": {
"action": "set_value",
"caller": "{{ this.entity_id }}",
"value": "{{ value }}",
},
},
],
"optimistic": True,
"unique_id": "a",
},
}
},
)
hass.states.async_set(_VALUE_INPUT_NUMBER, 4)
hass.states.async_set(_STEP_INPUT_NUMBER, 1)
hass.states.async_set(_MINIMUM_INPUT_NUMBER, 3)
hass.states.async_set(_MAXIMUM_INPUT_NUMBER, 5)
await hass.async_block_till_done()
await hass.async_start()
await hass.async_block_till_done()
entry = entity_registry.async_get(_TEST_NUMBER)
assert entry
assert entry.unique_id == "b-a"
_verify(hass, 4, 1, 3, 5, None)
await async_trigger(hass, TEST_STATE_ENTITY_ID, 5)
await hass.services.async_call(
INPUT_NUMBER_DOMAIN,
INPUT_NUMBER_SERVICE_SET_VALUE,
{CONF_ENTITY_ID: _VALUE_INPUT_NUMBER, INPUT_NUMBER_ATTR_VALUE: 5},
blocking=True,
)
await hass.async_block_till_done()
_verify(hass, 5, 1, 3, 5, None)
await async_trigger(hass, TEST_STEP_ENTITY_ID, 2)
await hass.services.async_call(
INPUT_NUMBER_DOMAIN,
INPUT_NUMBER_SERVICE_SET_VALUE,
{CONF_ENTITY_ID: _STEP_INPUT_NUMBER, INPUT_NUMBER_ATTR_VALUE: 2},
blocking=True,
)
await hass.async_block_till_done()
_verify(hass, 5, 2, 3, 5, None)
await async_trigger(hass, TEST_MINIMUM_ENTITY_ID, 2)
await hass.services.async_call(
INPUT_NUMBER_DOMAIN,
INPUT_NUMBER_SERVICE_SET_VALUE,
{CONF_ENTITY_ID: _MINIMUM_INPUT_NUMBER, INPUT_NUMBER_ATTR_VALUE: 2},
blocking=True,
)
await hass.async_block_till_done()
_verify(hass, 5, 2, 2, 5, None)
await async_trigger(hass, TEST_MAXIMUM_ENTITY_ID, 6)
await hass.services.async_call(
INPUT_NUMBER_DOMAIN,
INPUT_NUMBER_SERVICE_SET_VALUE,
{CONF_ENTITY_ID: _MAXIMUM_INPUT_NUMBER, INPUT_NUMBER_ATTR_VALUE: 6},
blocking=True,
)
await hass.async_block_till_done()
_verify(hass, 5, 2, 2, 6, None)
await hass.services.async_call(
NUMBER_DOMAIN,
NUMBER_SERVICE_SET_VALUE,
{CONF_ENTITY_ID: TEST_NUMBER.entity_id, NUMBER_ATTR_VALUE: 2},
{CONF_ENTITY_ID: _TEST_NUMBER, NUMBER_ATTR_VALUE: 2},
blocking=True,
)
_verify(hass, 2, 2, 2, 6, None)
# Check this variable can be used in set_value script
assert len(calls) == 1
assert calls[-1].data["action"] == "set_value"
assert calls[-1].data["caller"] == TEST_NUMBER.entity_id
assert calls[-1].data["caller"] == _TEST_NUMBER
assert calls[-1].data["value"] == 2
await async_trigger(hass, TEST_STATE_ENTITY_ID, 2)
_verify(hass, 2, 2, 2, 6, None)
async def test_trigger_number(hass: HomeAssistant) -> None:
"""Test trigger based template number."""
events = async_capture_events(hass, "test_number_event")
assert await setup.async_setup_component(
hass,
"template",
{
"template": [
{"invalid": "config"},
# Config after invalid should still be set up
{
"unique_id": "listening-test-event",
"trigger": {"platform": "event", "event_type": "test_event"},
"number": [
{
"name": "Hello Name",
"unique_id": "hello_name-id",
"state": "{{ trigger.event.data.beers_drank }}",
"min": "{{ trigger.event.data.min_beers }}",
"max": "{{ trigger.event.data.max_beers }}",
"step": "{{ trigger.event.data.step }}",
"unit_of_measurement": "beer",
"set_value": {
"event": "test_number_event",
"event_data": {"entity_id": "{{ this.entity_id }}"},
},
"optimistic": True,
},
],
},
],
},
)
await hass.async_block_till_done()
await hass.async_start()
await hass.async_block_till_done()
state = hass.states.get("number.hello_name")
assert state is not None
assert state.state == STATE_UNKNOWN
assert state.attributes["min"] == 0.0
assert state.attributes["max"] == 100.0
assert state.attributes["step"] == 1.0
assert state.attributes["unit_of_measurement"] == "beer"
context = Context()
hass.bus.async_fire(
"test_event",
{
"beers_drank": 3,
"min_beers": 1.0,
"max_beers": 5.0,
"step": 0.5,
},
context=context,
)
await hass.async_block_till_done()
state = hass.states.get("number.hello_name")
assert state is not None
assert state.state == "3.0"
assert state.attributes["min"] == 1.0
assert state.attributes["max"] == 5.0
assert state.attributes["step"] == 0.5
await hass.services.async_call(
NUMBER_DOMAIN,
NUMBER_SERVICE_SET_VALUE,
{CONF_ENTITY_ID: "number.hello_name", NUMBER_ATTR_VALUE: 2},
blocking=True,
)
assert len(events) == 1
assert events[0].event_type == "test_number_event"
entity_id = events[0].data.get("entity_id")
assert entity_id is not None
assert entity_id == "number.hello_name"
def _verify(
@@ -256,7 +465,7 @@ def _verify(
expected_unit_of_measurement: str | None,
) -> None:
"""Verify number's state."""
state = hass.states.get(TEST_NUMBER.entity_id)
state = hass.states.get(_TEST_NUMBER)
attributes = state.attributes
assert state.state == str(float(expected_value))
assert attributes.get(ATTR_STEP) == float(expected_step)
@@ -271,7 +480,7 @@ def _verify(
[(ConfigurationStyle.MODERN, ""), (ConfigurationStyle.TRIGGER, None)],
)
@pytest.mark.parametrize(
("config", "attribute", "expected"),
("number_config", "attribute", "expected"),
[
(
{
@@ -299,12 +508,13 @@ async def test_templated_optional_config(
initial_expected_state: str | None,
) -> None:
"""Test optional config templates."""
state = hass.states.get(TEST_NUMBER.entity_id)
state = hass.states.get(_TEST_NUMBER)
assert state.attributes.get(attribute) == initial_expected_state
await async_trigger(hass, TEST_STATE_ENTITY_ID, "1")
state = hass.states.async_set(TEST_STATE_ENTITY_ID, "1")
await hass.async_block_till_done()
state = hass.states.get(TEST_NUMBER.entity_id)
state = hass.states.get(_TEST_NUMBER)
assert state.attributes[attribute] == expected
@@ -357,7 +567,7 @@ async def test_device_id(
@pytest.mark.parametrize(
("count", "config"),
("count", "number_config"),
[
(
1,
@@ -377,26 +587,26 @@ async def test_optimistic(hass: HomeAssistant) -> None:
await hass.services.async_call(
number.DOMAIN,
number.SERVICE_SET_VALUE,
{ATTR_ENTITY_ID: TEST_NUMBER.entity_id, "value": 4},
{ATTR_ENTITY_ID: _TEST_NUMBER, "value": 4},
blocking=True,
)
state = hass.states.get(TEST_NUMBER.entity_id)
state = hass.states.get(_TEST_NUMBER)
assert float(state.state) == 4
await hass.services.async_call(
number.DOMAIN,
number.SERVICE_SET_VALUE,
{ATTR_ENTITY_ID: TEST_NUMBER.entity_id, "value": 2},
{ATTR_ENTITY_ID: _TEST_NUMBER, "value": 2},
blocking=True,
)
state = hass.states.get(TEST_NUMBER.entity_id)
state = hass.states.get(_TEST_NUMBER)
assert float(state.state) == 2
@pytest.mark.parametrize(
("count", "config"),
("count", "number_config"),
[
(
1,
@@ -418,16 +628,16 @@ async def test_not_optimistic(hass: HomeAssistant) -> None:
await hass.services.async_call(
number.DOMAIN,
number.SERVICE_SET_VALUE,
{ATTR_ENTITY_ID: TEST_NUMBER.entity_id, "value": 4},
{ATTR_ENTITY_ID: _TEST_NUMBER, "value": 4},
blocking=True,
)
state = hass.states.get(TEST_NUMBER.entity_id)
state = hass.states.get(_TEST_NUMBER)
assert state.state == STATE_UNKNOWN
@pytest.mark.parametrize(
("count", "config"),
("count", "number_config"),
[
(
1,
@@ -446,29 +656,34 @@ async def test_not_optimistic(hass: HomeAssistant) -> None:
async def test_availability(hass: HomeAssistant) -> None:
"""Test configuration with optimistic state."""
await async_trigger(hass, TEST_STATE_ENTITY_ID, "4.0")
await async_trigger(hass, TEST_AVAILABILITY_ENTITY_ID, "on")
state = hass.states.get(TEST_NUMBER.entity_id)
hass.states.async_set(TEST_AVAILABILITY_ENTITY_ID, "on")
hass.states.async_set(TEST_STATE_ENTITY_ID, "4.0")
await hass.async_block_till_done()
state = hass.states.get(_TEST_NUMBER)
assert float(state.state) == 4
await async_trigger(hass, TEST_AVAILABILITY_ENTITY_ID, "off")
hass.states.async_set(TEST_AVAILABILITY_ENTITY_ID, "off")
await hass.async_block_till_done()
state = hass.states.get(TEST_NUMBER.entity_id)
state = hass.states.get(_TEST_NUMBER)
assert state.state == STATE_UNAVAILABLE
await async_trigger(hass, TEST_STATE_ENTITY_ID, "2.0")
hass.states.async_set(TEST_STATE_ENTITY_ID, "2.0")
await hass.async_block_till_done()
state = hass.states.get(TEST_NUMBER.entity_id)
state = hass.states.get(_TEST_NUMBER)
assert state.state == STATE_UNAVAILABLE
await async_trigger(hass, TEST_AVAILABILITY_ENTITY_ID, "on")
hass.states.async_set(TEST_AVAILABILITY_ENTITY_ID, "on")
await hass.async_block_till_done()
state = hass.states.get(TEST_NUMBER.entity_id)
state = hass.states.get(_TEST_NUMBER)
assert float(state.state) == 2
@pytest.mark.parametrize(
("count", "config"),
("count", "number_config"),
[
(
1,
@@ -487,17 +702,16 @@ async def test_availability(hass: HomeAssistant) -> None:
ConfigurationStyle.MODERN,
],
)
@pytest.mark.usefixtures("setup_number")
async def test_empty_action_config(hass: HomeAssistant) -> None:
async def test_empty_action_config(hass: HomeAssistant, setup_number) -> None:
"""Test configuration with empty script."""
await hass.services.async_call(
number.DOMAIN,
number.SERVICE_SET_VALUE,
{ATTR_ENTITY_ID: TEST_NUMBER.entity_id, "value": 4},
{ATTR_ENTITY_ID: _TEST_NUMBER, "value": 4},
blocking=True,
)
state = hass.states.get(TEST_NUMBER.entity_id)
state = hass.states.get(_TEST_NUMBER)
assert float(state.state) == 4
@@ -520,29 +734,3 @@ async def test_flow_preview(
)
assert state["state"] == "0.0"
@pytest.mark.parametrize(
"style",
[ConfigurationStyle.MODERN, ConfigurationStyle.TRIGGER],
)
async def test_unique_id(
hass: HomeAssistant,
style: ConfigurationStyle,
) -> None:
"""Test unique_id option only creates one vacuum per id."""
await setup_and_test_unique_id(hass, TEST_NUMBER, style, TEST_REQUIRED, "{{ 0 }}")
@pytest.mark.parametrize(
"style", [ConfigurationStyle.MODERN, ConfigurationStyle.TRIGGER]
)
async def test_nested_unique_id(
hass: HomeAssistant,
style: ConfigurationStyle,
entity_registry: er.EntityRegistry,
) -> None:
"""Test a template unique_id propagates to vacuum unique_ids."""
await setup_and_test_nested_unique_id(
hass, TEST_NUMBER, style, entity_registry, TEST_REQUIRED, "{{ 0 }}"
)

View File

@@ -6,7 +6,14 @@ import pytest
from syrupy.assertion import SnapshotAssertion
from homeassistant import setup
from homeassistant.components import select
from homeassistant.components import select, template
from homeassistant.components.input_select import (
ATTR_OPTION as INPUT_SELECT_ATTR_OPTION,
ATTR_OPTIONS as INPUT_SELECT_ATTR_OPTIONS,
DOMAIN as INPUT_SELECT_DOMAIN,
SERVICE_SELECT_OPTION as INPUT_SELECT_SERVICE_SELECT_OPTION,
SERVICE_SET_OPTIONS,
)
from homeassistant.components.select import (
ATTR_OPTION as SELECT_ATTR_OPTION,
ATTR_OPTIONS as SELECT_ATTR_OPTIONS,
@@ -24,46 +31,77 @@ from homeassistant.const import (
STATE_UNAVAILABLE,
STATE_UNKNOWN,
)
from homeassistant.core import HomeAssistant, ServiceCall
from homeassistant.core import Context, HomeAssistant, ServiceCall
from homeassistant.helpers import device_registry as dr, entity_registry as er
from homeassistant.setup import async_setup_component
from .conftest import (
ConfigurationStyle,
TemplatePlatformSetup,
async_get_flow_preview_state,
async_trigger,
make_test_trigger,
setup_and_test_nested_unique_id,
setup_and_test_unique_id,
setup_entity,
)
from .conftest import ConfigurationStyle, async_get_flow_preview_state
from tests.common import MockConfigEntry, assert_setup_component
from tests.common import MockConfigEntry, assert_setup_component, async_capture_events
from tests.conftest import WebSocketGenerator
_TEST_OBJECT_ID = "template_select"
_TEST_SELECT = f"select.{_TEST_OBJECT_ID}"
# Represent for select's current_option
_OPTION_INPUT_SELECT = "input_select.option"
TEST_STATE_ENTITY_ID = "select.test_state"
TEST_AVAILABILITY_ENTITY_ID = "binary_sensor.test_availability"
TEST_STATE_TRIGGER = {
"trigger": {
"trigger": "state",
"entity_id": [
_OPTION_INPUT_SELECT,
TEST_STATE_ENTITY_ID,
TEST_AVAILABILITY_ENTITY_ID,
],
},
"variables": {"triggering_entity": "{{ trigger.entity_id }}"},
"action": [
{"event": "action_event", "event_data": {"what": "{{ triggering_entity }}"}}
],
}
TEST_SELECT = TemplatePlatformSetup(
select.DOMAIN,
None,
"template_select",
make_test_trigger(TEST_STATE_ENTITY_ID, TEST_AVAILABILITY_ENTITY_ID),
)
TEST_OPTIONS_WITHOUT_STATE = {
TEST_OPTIONS = {
"state": "test",
"options": "{{ ['test', 'yes', 'no'] }}",
"select_option": [],
}
TEST_OPTIONS = {"state": "test", **TEST_OPTIONS_WITHOUT_STATE}
TEST_OPTION_ACTION = {
"action": "test.automation",
"data": {
"action": "select_option",
"caller": "{{ this.entity_id }}",
"option": "{{ option }}",
},
}
async def async_setup_modern_format(
hass: HomeAssistant, count: int, select_config: dict[str, Any]
) -> None:
"""Do setup of select integration via new format."""
config = {"template": {"select": select_config}}
with assert_setup_component(count, template.DOMAIN):
assert await async_setup_component(
hass,
template.DOMAIN,
config,
)
await hass.async_block_till_done()
await hass.async_start()
await hass.async_block_till_done()
async def async_setup_trigger_format(
hass: HomeAssistant, count: int, select_config: dict[str, Any]
) -> None:
"""Do setup of select integration via trigger format."""
config = {"template": {**TEST_STATE_TRIGGER, "select": select_config}}
with assert_setup_component(count, template.DOMAIN):
assert await async_setup_component(
hass,
template.DOMAIN,
config,
)
await hass.async_block_till_done()
await hass.async_start()
await hass.async_block_till_done()
@pytest.fixture
@@ -71,10 +109,17 @@ async def setup_select(
hass: HomeAssistant,
count: int,
style: ConfigurationStyle,
config: dict[str, Any],
select_config: dict[str, Any],
) -> None:
"""Do setup of select integration."""
await setup_entity(hass, TEST_SELECT, style, count, config)
if style == ConfigurationStyle.MODERN:
await async_setup_modern_format(
hass, count, {"name": _TEST_OBJECT_ID, **select_config}
)
if style == ConfigurationStyle.TRIGGER:
await async_setup_trigger_format(
hass, count, {"name": _TEST_OBJECT_ID, **select_config}
)
async def test_setup_config_entry(
@@ -91,7 +136,6 @@ async def test_setup_config_entry(
"template_type": "select",
"state": "{{ 'on' }}",
"options": "{{ ['off', 'on', 'auto'] }}",
"select_option": [],
},
title="My template",
)
@@ -105,24 +149,27 @@ async def test_setup_config_entry(
assert state == snapshot
@pytest.mark.parametrize("count", [1])
@pytest.mark.parametrize(
"config",
[
{
"state": "{{ 'a' }}",
"select_option": {"service": "script.select_option"},
"options": "{{ ['a', 'b'] }}",
},
],
)
@pytest.mark.parametrize(
"style", [ConfigurationStyle.MODERN, ConfigurationStyle.TRIGGER]
)
@pytest.mark.usefixtures("setup_select")
async def test_missing_optional_config(hass: HomeAssistant) -> None:
"""Test: missing optional template is ok."""
await async_trigger(hass, TEST_STATE_ENTITY_ID, "anything")
with assert_setup_component(1, "template"):
assert await setup.async_setup_component(
hass,
"template",
{
"template": {
"select": {
"state": "{{ 'a' }}",
"select_option": {"service": "script.select_option"},
"options": "{{ ['a', 'b'] }}",
}
}
},
)
await hass.async_block_till_done()
await hass.async_start()
await hass.async_block_till_done()
_verify(hass, "a", ["a", "b"])
@@ -155,85 +202,231 @@ async def test_multiple_configs(hass: HomeAssistant) -> None:
await hass.async_block_till_done()
_verify(hass, "a", ["a", "b"])
_verify(hass, "a", ["a", "b"], f"{TEST_SELECT.entity_id}_2")
_verify(hass, "a", ["a", "b"], f"{_TEST_SELECT}_2")
@pytest.mark.parametrize("count", [0])
@pytest.mark.parametrize(
"config",
[
{
"state": "{{ 'a' }}",
"select_option": {"service": "script.select_option"},
},
{
"state": "{{ 'a' }}",
"options": "{{ ['a', 'b'] }}",
},
],
)
@pytest.mark.parametrize(
"style", [ConfigurationStyle.MODERN, ConfigurationStyle.TRIGGER]
)
@pytest.mark.usefixtures("setup_select")
async def test_missing_required_keys(hass: HomeAssistant) -> None:
"""Test: missing required fields will fail."""
with assert_setup_component(0, "select"):
assert await setup.async_setup_component(
hass,
"select",
{
"template": {
"select": {
"state": "{{ 'a' }}",
"select_option": {"service": "script.select_option"},
}
}
},
)
with assert_setup_component(0, "select"):
assert await setup.async_setup_component(
hass,
"select",
{
"template": {
"select": {
"state": "{{ 'a' }}",
"options": "{{ ['a', 'b'] }}",
}
}
},
)
await hass.async_block_till_done()
await hass.async_start()
await hass.async_block_till_done()
assert hass.states.async_all("select") == []
@pytest.mark.parametrize(
("count", "config"),
[
(
1,
async def test_templates_with_entities(
hass: HomeAssistant, entity_registry: er.EntityRegistry, calls: list[ServiceCall]
) -> None:
"""Test templates with values from other entities."""
with assert_setup_component(1, "input_select"):
assert await setup.async_setup_component(
hass,
"input_select",
{
"options": "{{ state_attr('select.test_state', 'options') or [] }}",
"select_option": [TEST_OPTION_ACTION],
"state": "{{ states('select.test_state') }}",
"input_select": {
"option": {
"options": ["a", "b"],
"initial": "a",
"name": "Option",
},
}
},
)
],
)
@pytest.mark.parametrize(
"style", [ConfigurationStyle.MODERN, ConfigurationStyle.TRIGGER]
)
@pytest.mark.usefixtures("setup_select")
async def test_template_select(hass: HomeAssistant, calls: list[ServiceCall]) -> None:
"""Test templates with values from other entities."""
attributes = {"options": ["a", "b"]}
await async_trigger(hass, TEST_STATE_ENTITY_ID, "a", attributes)
with assert_setup_component(1, "template"):
assert await setup.async_setup_component(
hass,
"template",
{
"template": {
"unique_id": "b",
"select": {
"state": f"{{{{ states('{_OPTION_INPUT_SELECT}') }}}}",
"options": f"{{{{ state_attr('{_OPTION_INPUT_SELECT}', '{INPUT_SELECT_ATTR_OPTIONS}') }}}}",
"select_option": [
{
"service": "input_select.select_option",
"data_template": {
"entity_id": _OPTION_INPUT_SELECT,
"option": "{{ option }}",
},
},
{
"service": "test.automation",
"data_template": {
"action": "select_option",
"caller": "{{ this.entity_id }}",
"option": "{{ option }}",
},
},
],
"optimistic": True,
"unique_id": "a",
},
}
},
)
await hass.async_block_till_done()
await hass.async_start()
await hass.async_block_till_done()
entry = entity_registry.async_get(_TEST_SELECT)
assert entry
assert entry.unique_id == "b-a"
_verify(hass, "a", ["a", "b"])
await async_trigger(hass, TEST_STATE_ENTITY_ID, "b", attributes)
await hass.services.async_call(
INPUT_SELECT_DOMAIN,
INPUT_SELECT_SERVICE_SELECT_OPTION,
{CONF_ENTITY_ID: _OPTION_INPUT_SELECT, INPUT_SELECT_ATTR_OPTION: "b"},
blocking=True,
)
await hass.async_block_till_done()
_verify(hass, "b", ["a", "b"])
attributes = {"options": ["a", "b", "c"]}
await async_trigger(hass, TEST_STATE_ENTITY_ID, "b", attributes)
await hass.services.async_call(
INPUT_SELECT_DOMAIN,
SERVICE_SET_OPTIONS,
{
CONF_ENTITY_ID: _OPTION_INPUT_SELECT,
INPUT_SELECT_ATTR_OPTIONS: ["a", "b", "c"],
},
blocking=True,
)
await hass.async_block_till_done()
_verify(hass, "b", ["a", "b", "c"])
await hass.services.async_call(
SELECT_DOMAIN,
SELECT_SERVICE_SELECT_OPTION,
{CONF_ENTITY_ID: TEST_SELECT.entity_id, SELECT_ATTR_OPTION: "c"},
{CONF_ENTITY_ID: _TEST_SELECT, SELECT_ATTR_OPTION: "c"},
blocking=True,
)
_verify(hass, "c", ["a", "b", "c"])
# Check this variable can be used in set_value script
assert len(calls) == 1
assert calls[-1].data["action"] == "select_option"
assert calls[-1].data["caller"] == TEST_SELECT.entity_id
assert calls[-1].data["caller"] == _TEST_SELECT
assert calls[-1].data["option"] == "c"
await async_trigger(hass, TEST_STATE_ENTITY_ID, "c", attributes)
_verify(hass, "c", ["a", "b", "c"])
async def test_trigger_select(hass: HomeAssistant) -> None:
"""Test trigger based template select."""
events = async_capture_events(hass, "test_number_event")
action_events = async_capture_events(hass, "action_event")
assert await setup.async_setup_component(
hass,
"template",
{
"template": [
{"invalid": "config"},
# Config after invalid should still be set up
{
"unique_id": "listening-test-event",
"trigger": {"platform": "event", "event_type": "test_event"},
"variables": {"beer": "{{ trigger.event.data.beer }}"},
"action": [
{"event": "action_event", "event_data": {"beer": "{{ beer }}"}}
],
"select": [
{
"name": "Hello Name",
"unique_id": "hello_name-id",
"state": "{{ trigger.event.data.beer }}",
"options": "{{ trigger.event.data.beers }}",
"select_option": {
"event": "test_number_event",
"event_data": {
"entity_id": "{{ this.entity_id }}",
"beer": "{{ beer }}",
},
},
"optimistic": True,
},
],
},
],
},
)
await hass.async_block_till_done()
await hass.async_start()
await hass.async_block_till_done()
state = hass.states.get("select.hello_name")
assert state is not None
assert state.state == STATE_UNKNOWN
context = Context()
hass.bus.async_fire(
"test_event", {"beer": "duff", "beers": ["duff", "alamo"]}, context=context
)
await hass.async_block_till_done()
state = hass.states.get("select.hello_name")
assert state is not None
assert state.state == "duff"
assert state.attributes["options"] == ["duff", "alamo"]
assert len(action_events) == 1
assert action_events[0].event_type == "action_event"
beer = action_events[0].data.get("beer")
assert beer is not None
assert beer == "duff"
await hass.services.async_call(
SELECT_DOMAIN,
SELECT_SERVICE_SELECT_OPTION,
{CONF_ENTITY_ID: "select.hello_name", SELECT_ATTR_OPTION: "alamo"},
blocking=True,
)
assert len(events) == 1
assert events[0].event_type == "test_number_event"
entity_id = events[0].data.get("entity_id")
assert entity_id is not None
assert entity_id == "select.hello_name"
beer = events[0].data.get("beer")
assert beer is not None
assert beer == "duff"
def _verify(
hass: HomeAssistant,
expected_current_option: str,
expected_options: list[str],
entity_name: str = TEST_SELECT.entity_id,
entity_name: str = _TEST_SELECT,
) -> None:
"""Verify select's state."""
state = hass.states.get(entity_name)
@@ -248,7 +441,7 @@ def _verify(
[(ConfigurationStyle.MODERN, ""), (ConfigurationStyle.TRIGGER, None)],
)
@pytest.mark.parametrize(
("config", "attribute", "expected"),
("select_config", "attribute", "expected"),
[
(
{
@@ -276,13 +469,13 @@ async def test_templated_optional_config(
initial_expected_state: str | None,
) -> None:
"""Test optional config templates."""
state = hass.states.get(TEST_SELECT.entity_id)
state = hass.states.get(_TEST_SELECT)
assert state.attributes.get(attribute) == initial_expected_state
state = hass.states.async_set(TEST_STATE_ENTITY_ID, "yes")
await hass.async_block_till_done()
state = hass.states.get(TEST_SELECT.entity_id)
state = hass.states.get(_TEST_SELECT)
assert state.attributes[attribute] == expected
@@ -313,7 +506,6 @@ async def test_device_id(
"template_type": "select",
"state": "{{ 'on' }}",
"options": "{{ ['off', 'on', 'auto'] }}",
"select_option": [],
"device_id": device_entry.id,
},
title="My template",
@@ -329,7 +521,7 @@ async def test_device_id(
@pytest.mark.parametrize(
("count", "config"),
("count", "select_config"),
[
(
1,
@@ -348,22 +540,21 @@ async def test_device_id(
ConfigurationStyle.MODERN,
],
)
@pytest.mark.usefixtures("setup_select")
async def test_empty_action_config(hass: HomeAssistant) -> None:
async def test_empty_action_config(hass: HomeAssistant, setup_select) -> None:
"""Test configuration with empty script."""
await hass.services.async_call(
select.DOMAIN,
select.SERVICE_SELECT_OPTION,
{ATTR_ENTITY_ID: TEST_SELECT.entity_id, "option": "a"},
{ATTR_ENTITY_ID: _TEST_SELECT, "option": "a"},
blocking=True,
)
state = hass.states.get(TEST_SELECT.entity_id)
state = hass.states.get(_TEST_SELECT)
assert state.state == "a"
@pytest.mark.parametrize(
("count", "config"),
("count", "select_config"),
[
(
1,
@@ -382,7 +573,7 @@ async def test_empty_action_config(hass: HomeAssistant) -> None:
async def test_optimistic(hass: HomeAssistant) -> None:
"""Test configuration with optimistic state."""
state = hass.states.get(TEST_SELECT.entity_id)
state = hass.states.get(_TEST_SELECT)
assert state.state == STATE_UNKNOWN
# Ensure Trigger template entities update.
@@ -392,26 +583,26 @@ async def test_optimistic(hass: HomeAssistant) -> None:
await hass.services.async_call(
select.DOMAIN,
select.SERVICE_SELECT_OPTION,
{ATTR_ENTITY_ID: TEST_SELECT.entity_id, "option": "test"},
{ATTR_ENTITY_ID: _TEST_SELECT, "option": "test"},
blocking=True,
)
state = hass.states.get(TEST_SELECT.entity_id)
state = hass.states.get(_TEST_SELECT)
assert state.state == "test"
await hass.services.async_call(
select.DOMAIN,
select.SERVICE_SELECT_OPTION,
{ATTR_ENTITY_ID: TEST_SELECT.entity_id, "option": "yes"},
{ATTR_ENTITY_ID: _TEST_SELECT, "option": "yes"},
blocking=True,
)
state = hass.states.get(TEST_SELECT.entity_id)
state = hass.states.get(_TEST_SELECT)
assert state.state == "yes"
@pytest.mark.parametrize(
("count", "config"),
("count", "select_config"),
[
(
1,
@@ -438,16 +629,16 @@ async def test_not_optimistic(hass: HomeAssistant) -> None:
await hass.services.async_call(
select.DOMAIN,
select.SERVICE_SELECT_OPTION,
{ATTR_ENTITY_ID: TEST_SELECT.entity_id, "option": "test"},
{ATTR_ENTITY_ID: _TEST_SELECT, "option": "test"},
blocking=True,
)
state = hass.states.get(TEST_SELECT.entity_id)
state = hass.states.get(_TEST_SELECT)
assert state.state == STATE_UNKNOWN
@pytest.mark.parametrize(
("count", "config"),
("count", "select_config"),
[
(
1,
@@ -471,25 +662,25 @@ async def test_availability(hass: HomeAssistant) -> None:
hass.states.async_set(TEST_STATE_ENTITY_ID, "test")
await hass.async_block_till_done()
state = hass.states.get(TEST_SELECT.entity_id)
state = hass.states.get(_TEST_SELECT)
assert state.state == "test"
hass.states.async_set(TEST_AVAILABILITY_ENTITY_ID, "off")
await hass.async_block_till_done()
state = hass.states.get(TEST_SELECT.entity_id)
state = hass.states.get(_TEST_SELECT)
assert state.state == STATE_UNAVAILABLE
hass.states.async_set(TEST_STATE_ENTITY_ID, "yes")
await hass.async_block_till_done()
state = hass.states.get(TEST_SELECT.entity_id)
state = hass.states.get(_TEST_SELECT)
assert state.state == STATE_UNAVAILABLE
hass.states.async_set(TEST_AVAILABILITY_ENTITY_ID, "on")
await hass.async_block_till_done()
state = hass.states.get(TEST_SELECT.entity_id)
state = hass.states.get(_TEST_SELECT)
assert state.state == "yes"
@@ -507,36 +698,3 @@ async def test_flow_preview(
)
assert state["state"] == "test"
@pytest.mark.parametrize(
"style",
[ConfigurationStyle.MODERN, ConfigurationStyle.TRIGGER],
)
async def test_unique_id(
hass: HomeAssistant,
style: ConfigurationStyle,
) -> None:
"""Test unique_id option only creates one vacuum per id."""
await setup_and_test_unique_id(
hass, TEST_SELECT, style, TEST_OPTIONS_WITHOUT_STATE, "{{ 'test' }}"
)
@pytest.mark.parametrize(
"style", [ConfigurationStyle.MODERN, ConfigurationStyle.TRIGGER]
)
async def test_nested_unique_id(
hass: HomeAssistant,
style: ConfigurationStyle,
entity_registry: er.EntityRegistry,
) -> None:
"""Test a template unique_id propagates to vacuum unique_ids."""
await setup_and_test_nested_unique_id(
hass,
TEST_SELECT,
style,
entity_registry,
TEST_OPTIONS_WITHOUT_STATE,
"{{ 'test' }}",
)

View File

@@ -721,179 +721,3 @@ async def test_binary_sensor_person_detected(
ufp.ws_msg(mock_msg)
await hass.async_block_till_done()
assert len(state_changes) == 2
@pytest.mark.usefixtures("entity_registry_enabled_by_default")
async def test_binary_sensor_simultaneous_person_and_vehicle_detection(
hass: HomeAssistant,
ufp: MockUFPFixture,
doorbell: Camera,
unadopted_camera: Camera,
fixed_now: datetime,
) -> None:
"""Test that when an event is updated with additional detection types, both trigger.
This is a regression test for https://github.com/home-assistant/core/issues/152133
where an event starting with vehicle detection gets updated to also include person
detection (e.g., someone getting out of a car). Both sensors should be ON
simultaneously, not queued.
"""
await init_entry(hass, ufp, [doorbell, unadopted_camera])
assert_entity_counts(hass, Platform.BINARY_SENSOR, 15, 15)
doorbell.smart_detect_settings.object_types.append(SmartDetectObjectType.PERSON)
doorbell.smart_detect_settings.object_types.append(SmartDetectObjectType.VEHICLE)
# Get entity IDs for both person and vehicle detection
_, person_entity_id = await ids_from_device_description(
hass,
Platform.BINARY_SENSOR,
doorbell,
EVENT_SENSORS[3], # person detected
)
_, vehicle_entity_id = await ids_from_device_description(
hass,
Platform.BINARY_SENSOR,
doorbell,
EVENT_SENSORS[4], # vehicle detected
)
# Step 1: Initial event with only VEHICLE detection (car arriving)
event = Event(
model=ModelType.EVENT,
id="combined_event_id",
type=EventType.SMART_DETECT,
start=fixed_now - timedelta(seconds=5),
end=None, # Event is ongoing
score=90,
smart_detect_types=[SmartDetectObjectType.VEHICLE],
smart_detect_event_ids=[],
camera_id=doorbell.id,
api=ufp.api,
)
new_camera = doorbell.model_copy()
new_camera.is_smart_detected = True
new_camera.last_smart_detect_event_ids[SmartDetectObjectType.VEHICLE] = event.id
ufp.api.bootstrap.cameras = {new_camera.id: new_camera}
ufp.api.bootstrap.events = {event.id: event}
mock_msg = Mock()
mock_msg.changed_data = {}
mock_msg.new_obj = event
ufp.ws_msg(mock_msg)
await hass.async_block_till_done()
# Vehicle sensor should be ON
vehicle_state = hass.states.get(vehicle_entity_id)
assert vehicle_state
assert vehicle_state.state == STATE_ON, "Vehicle detection should be ON"
# Person sensor should still be OFF (no person detected yet)
person_state = hass.states.get(person_entity_id)
assert person_state
assert person_state.state == STATE_OFF, "Person detection should be OFF initially"
# Step 2: Same event gets updated to include PERSON detection
# (someone gets out of the car - Protect adds PERSON to the same event)
#
# BUG SCENARIO: UniFi Protect updates the event to include PERSON in
# smart_detect_types, BUT does NOT update last_smart_detect_event_ids[PERSON]
# until the event ends. This is the core issue reported in #152133.
updated_event = Event(
model=ModelType.EVENT,
id="combined_event_id", # Same event ID!
type=EventType.SMART_DETECT,
start=fixed_now - timedelta(seconds=5),
end=None, # Event still ongoing
score=90,
smart_detect_types=[
SmartDetectObjectType.VEHICLE,
SmartDetectObjectType.PERSON,
],
smart_detect_event_ids=[],
camera_id=doorbell.id,
api=ufp.api,
)
# IMPORTANT: The camera's last_smart_detect_event_ids is NOT updated for PERSON!
# This simulates the real bug where UniFi Protect doesn't immediately update
# the camera's last_smart_detect_event_ids when a new detection type is added
# to an ongoing event.
new_camera = doorbell.model_copy()
new_camera.is_smart_detected = True
# Only VEHICLE has the event ID - PERSON does not (simulating the bug)
new_camera.last_smart_detect_event_ids[SmartDetectObjectType.VEHICLE] = (
updated_event.id
)
# NOTE: We're NOT setting last_smart_detect_event_ids[PERSON] to simulate the bug!
ufp.api.bootstrap.cameras = {new_camera.id: new_camera}
ufp.api.bootstrap.events = {updated_event.id: updated_event}
mock_msg = Mock()
mock_msg.changed_data = {}
mock_msg.new_obj = updated_event
ufp.ws_msg(mock_msg)
await hass.async_block_till_done()
# CRITICAL: Both sensors should now be ON simultaneously
vehicle_state = hass.states.get(vehicle_entity_id)
assert vehicle_state
assert vehicle_state.state == STATE_ON, (
"Vehicle detection should still be ON after event update"
)
person_state = hass.states.get(person_entity_id)
assert person_state
assert person_state.state == STATE_ON, (
"Person detection should be ON immediately when added to event, "
"not waiting for vehicle detection to end"
)
# Verify both have correct attributes
assert vehicle_state.attributes[ATTR_EVENT_SCORE] == 90
assert person_state.attributes[ATTR_EVENT_SCORE] == 90
# Step 3: Event ends - both sensors should turn OFF
ended_event = Event(
model=ModelType.EVENT,
id="combined_event_id",
type=EventType.SMART_DETECT,
start=fixed_now - timedelta(seconds=5),
end=fixed_now, # Event ended now
score=90,
smart_detect_types=[
SmartDetectObjectType.VEHICLE,
SmartDetectObjectType.PERSON,
],
smart_detect_event_ids=[],
camera_id=doorbell.id,
api=ufp.api,
)
ufp.api.bootstrap.events = {ended_event.id: ended_event}
mock_msg = Mock()
mock_msg.changed_data = {}
mock_msg.new_obj = ended_event
ufp.ws_msg(mock_msg)
await hass.async_block_till_done()
# Both should be OFF now
vehicle_state = hass.states.get(vehicle_entity_id)
assert vehicle_state
assert vehicle_state.state == STATE_OFF, (
"Vehicle detection should be OFF after event ends"
)
person_state = hass.states.get(person_entity_id)
assert person_state
assert person_state.state == STATE_OFF, (
"Person detection should be OFF after event ends"
)

View File

@@ -0,0 +1,190 @@
"""Test vacuum conditions."""
from typing import Any
import pytest
from homeassistant.components.vacuum import VacuumActivity
from homeassistant.core import HomeAssistant
from tests.components import (
ConditionStateDescription,
assert_condition_gated_by_labs_flag,
create_target_condition,
other_states,
parametrize_condition_states_all,
parametrize_condition_states_any,
parametrize_target_entities,
set_or_remove_state,
target_entities,
)
@pytest.fixture
async def target_vacuums(hass: HomeAssistant) -> list[str]:
"""Create multiple vacuum entities associated with different targets."""
return (await target_entities(hass, "vacuum"))["included"]
@pytest.mark.parametrize(
"condition",
[
"vacuum.is_cleaning",
"vacuum.is_docked",
"vacuum.is_encountering_an_error",
"vacuum.is_paused",
"vacuum.is_returning",
],
)
async def test_vacuum_conditions_gated_by_labs_flag(
hass: HomeAssistant, caplog: pytest.LogCaptureFixture, condition: str
) -> None:
"""Test the vacuum conditions are gated by the labs flag."""
await assert_condition_gated_by_labs_flag(hass, caplog, condition)
@pytest.mark.usefixtures("enable_labs_preview_features")
@pytest.mark.parametrize(
("condition_target_config", "entity_id", "entities_in_target"),
parametrize_target_entities("vacuum"),
)
@pytest.mark.parametrize(
("condition", "condition_options", "states"),
[
*parametrize_condition_states_any(
condition="vacuum.is_cleaning",
target_states=[VacuumActivity.CLEANING],
other_states=other_states(VacuumActivity.CLEANING),
),
*parametrize_condition_states_any(
condition="vacuum.is_docked",
target_states=[VacuumActivity.DOCKED],
other_states=other_states(VacuumActivity.DOCKED),
),
*parametrize_condition_states_any(
condition="vacuum.is_encountering_an_error",
target_states=[VacuumActivity.ERROR],
other_states=other_states(VacuumActivity.ERROR),
),
*parametrize_condition_states_any(
condition="vacuum.is_paused",
target_states=[VacuumActivity.PAUSED],
other_states=other_states(VacuumActivity.PAUSED),
),
*parametrize_condition_states_any(
condition="vacuum.is_returning",
target_states=[VacuumActivity.RETURNING],
other_states=other_states(VacuumActivity.RETURNING),
),
],
)
async def test_vacuum_state_condition_behavior_any(
hass: HomeAssistant,
target_vacuums: list[str],
condition_target_config: dict,
entity_id: str,
entities_in_target: int,
condition: str,
condition_options: dict[str, Any],
states: list[ConditionStateDescription],
) -> None:
"""Test the vacuum state condition with the 'any' behavior."""
other_entity_ids = set(target_vacuums) - {entity_id}
# Set all vacuums, including the tested vacuum, to the initial state
for eid in target_vacuums:
set_or_remove_state(hass, eid, states[0]["included"])
await hass.async_block_till_done()
condition = await create_target_condition(
hass,
condition=condition,
target=condition_target_config,
behavior="any",
)
for state in states:
included_state = state["included"]
set_or_remove_state(hass, entity_id, included_state)
await hass.async_block_till_done()
assert condition(hass) == state["condition_true"]
# Check if changing other vacuums also passes the condition
for other_entity_id in other_entity_ids:
set_or_remove_state(hass, other_entity_id, included_state)
await hass.async_block_till_done()
assert condition(hass) == state["condition_true"]
@pytest.mark.usefixtures("enable_labs_preview_features")
@pytest.mark.parametrize(
("condition_target_config", "entity_id", "entities_in_target"),
parametrize_target_entities("vacuum"),
)
@pytest.mark.parametrize(
("condition", "condition_options", "states"),
[
*parametrize_condition_states_all(
condition="vacuum.is_cleaning",
target_states=[VacuumActivity.CLEANING],
other_states=other_states(VacuumActivity.CLEANING),
),
*parametrize_condition_states_all(
condition="vacuum.is_docked",
target_states=[VacuumActivity.DOCKED],
other_states=other_states(VacuumActivity.DOCKED),
),
*parametrize_condition_states_all(
condition="vacuum.is_encountering_an_error",
target_states=[VacuumActivity.ERROR],
other_states=other_states(VacuumActivity.ERROR),
),
*parametrize_condition_states_all(
condition="vacuum.is_paused",
target_states=[VacuumActivity.PAUSED],
other_states=other_states(VacuumActivity.PAUSED),
),
*parametrize_condition_states_all(
condition="vacuum.is_returning",
target_states=[VacuumActivity.RETURNING],
other_states=other_states(VacuumActivity.RETURNING),
),
],
)
async def test_vacuum_state_condition_behavior_all(
hass: HomeAssistant,
target_vacuums: list[str],
condition_target_config: dict,
entity_id: str,
entities_in_target: int,
condition: str,
condition_options: dict[str, Any],
states: list[ConditionStateDescription],
) -> None:
"""Test the vacuum state condition with the 'all' behavior."""
other_entity_ids = set(target_vacuums) - {entity_id}
# Set all vacuums, including the tested vacuum, to the initial state
for eid in target_vacuums:
set_or_remove_state(hass, eid, states[0]["included"])
await hass.async_block_till_done()
condition = await create_target_condition(
hass,
condition=condition,
target=condition_target_config,
behavior="all",
)
for state in states:
included_state = state["included"]
set_or_remove_state(hass, entity_id, included_state)
await hass.async_block_till_done()
assert condition(hass) == state["condition_true_first_entity"]
for other_entity_id in other_entity_ids:
set_or_remove_state(hass, other_entity_id, included_state)
await hass.async_block_till_done()
assert condition(hass) == state["condition_true"]