Compare commits

..

2 Commits

Author SHA1 Message Date
Jan Čermák
d1d105a3a2 Sleep twice as suggested in PR 2026-03-13 16:29:19 +01:00
Jan Čermák
3102fad342 Bump base image to 2026.02.0 with Python 3.14.3, use 3.14.3 in CI
This also bumps libcec used in the base image to 7.1.1, full changelog:
* https://github.com/home-assistant/docker/releases/tag/2026.02.0

Python changelog:
* https://docs.python.org/release/3.14.3/whatsnew/changelog.html
2026-03-13 16:28:33 +01:00
149 changed files with 665 additions and 7199 deletions

View File

@@ -14,7 +14,7 @@ env:
UV_HTTP_TIMEOUT: 60
UV_SYSTEM_PYTHON: "true"
# Base image version from https://github.com/home-assistant/docker
BASE_IMAGE_VERSION: "2026.01.0"
BASE_IMAGE_VERSION: "2026.02.0"
ARCHITECTURES: '["amd64", "aarch64"]'
permissions: {}

View File

@@ -1 +1 @@
3.14.2
3.14.3

4
CODEOWNERS generated
View File

@@ -186,8 +186,6 @@ build.json @home-assistant/supervisor
/tests/components/auth/ @home-assistant/core
/homeassistant/components/automation/ @home-assistant/core
/tests/components/automation/ @home-assistant/core
/homeassistant/components/autoskope/ @mcisk
/tests/components/autoskope/ @mcisk
/homeassistant/components/avea/ @pattyland
/homeassistant/components/awair/ @ahayworth @ricohageman
/tests/components/awair/ @ahayworth @ricohageman
@@ -1786,8 +1784,6 @@ build.json @home-assistant/supervisor
/tests/components/ukraine_alarm/ @PaulAnnekov
/homeassistant/components/unifi/ @Kane610
/tests/components/unifi/ @Kane610
/homeassistant/components/unifi_access/ @imhotep @RaHehl
/tests/components/unifi_access/ @imhotep @RaHehl
/homeassistant/components/unifi_direct/ @tofuSCHNITZEL
/homeassistant/components/unifiled/ @florisvdk
/homeassistant/components/unifiprotect/ @RaHehl

View File

@@ -2,7 +2,6 @@
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.automation import DomainSpec
from homeassistant.helpers.condition import (
Condition,
EntityStateConditionBase,
@@ -44,7 +43,7 @@ def make_entity_state_required_features_condition(
class CustomCondition(EntityStateRequiredFeaturesCondition):
"""Condition for entity state changes."""
_domain_specs = {domain: DomainSpec()}
_domain = domain
_states = {to_state}
_required_features = required_features

View File

@@ -2,7 +2,6 @@
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.automation import DomainSpec
from homeassistant.helpers.entity import get_supported_features
from homeassistant.helpers.trigger import (
EntityTargetStateTriggerBase,
@@ -45,7 +44,7 @@ def make_entity_state_trigger_required_features(
class CustomTrigger(EntityStateTriggerRequiredFeatures):
"""Trigger for entity state changes."""
_domain_specs = {domain: DomainSpec()}
_domains = {domain}
_to_states = {to_state}
_required_features = required_features

View File

@@ -1,53 +0,0 @@
"""The Autoskope integration."""
from __future__ import annotations
import aiohttp
from autoskope_client.api import AutoskopeApi
from autoskope_client.models import CannotConnect, InvalidAuth
from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_USERNAME, Platform
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryError, ConfigEntryNotReady
from homeassistant.helpers.aiohttp_client import async_create_clientsession
from .const import DEFAULT_HOST
from .coordinator import AutoskopeConfigEntry, AutoskopeDataUpdateCoordinator
PLATFORMS: list[Platform] = [Platform.DEVICE_TRACKER]
async def async_setup_entry(hass: HomeAssistant, entry: AutoskopeConfigEntry) -> bool:
"""Set up Autoskope from a config entry."""
session = async_create_clientsession(hass, cookie_jar=aiohttp.CookieJar())
api = AutoskopeApi(
host=entry.data.get(CONF_HOST, DEFAULT_HOST),
username=entry.data[CONF_USERNAME],
password=entry.data[CONF_PASSWORD],
session=session,
)
try:
await api.connect()
except InvalidAuth as err:
# Raise ConfigEntryError until reauth flow is implemented (then ConfigEntryAuthFailed)
raise ConfigEntryError(
"Authentication failed, please check credentials"
) from err
except CannotConnect as err:
raise ConfigEntryNotReady("Could not connect to Autoskope API") from err
coordinator = AutoskopeDataUpdateCoordinator(hass, api, entry)
await coordinator.async_config_entry_first_refresh()
entry.runtime_data = coordinator
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
return True
async def async_unload_entry(hass: HomeAssistant, entry: AutoskopeConfigEntry) -> bool:
"""Unload a config entry."""
return await hass.config_entries.async_unload_platforms(entry, PLATFORMS)

View File

@@ -1,89 +0,0 @@
"""Config flow for the Autoskope integration."""
from __future__ import annotations
from typing import Any
from autoskope_client.api import AutoskopeApi
from autoskope_client.models import CannotConnect, InvalidAuth
import voluptuous as vol
from homeassistant.config_entries import ConfigFlow, ConfigFlowResult
from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_USERNAME
from homeassistant.data_entry_flow import section
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.selector import (
TextSelector,
TextSelectorConfig,
TextSelectorType,
)
from .const import DEFAULT_HOST, DOMAIN, SECTION_ADVANCED_SETTINGS
STEP_USER_DATA_SCHEMA = vol.Schema(
{
vol.Required(CONF_USERNAME): str,
vol.Required(CONF_PASSWORD): TextSelector(
TextSelectorConfig(type=TextSelectorType.PASSWORD)
),
vol.Required(SECTION_ADVANCED_SETTINGS): section(
vol.Schema(
{
vol.Required(CONF_HOST, default=DEFAULT_HOST): TextSelector(
TextSelectorConfig(type=TextSelectorType.URL)
),
}
),
{"collapsed": True},
),
}
)
class AutoskopeConfigFlow(ConfigFlow, domain=DOMAIN):
"""Handle a config flow for Autoskope."""
VERSION = 1
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle the initial step."""
errors: dict[str, str] = {}
if user_input is not None:
username = user_input[CONF_USERNAME].lower()
host = user_input[SECTION_ADVANCED_SETTINGS][CONF_HOST].lower()
try:
cv.url(host)
except vol.Invalid:
errors["base"] = "invalid_url"
if not errors:
await self.async_set_unique_id(f"{username}@{host}")
self._abort_if_unique_id_configured()
try:
async with AutoskopeApi(
host=host,
username=username,
password=user_input[CONF_PASSWORD],
):
pass
except CannotConnect:
errors["base"] = "cannot_connect"
except InvalidAuth:
errors["base"] = "invalid_auth"
else:
return self.async_create_entry(
title=f"Autoskope ({username})",
data={
CONF_USERNAME: username,
CONF_PASSWORD: user_input[CONF_PASSWORD],
CONF_HOST: host,
},
)
return self.async_show_form(
step_id="user", data_schema=STEP_USER_DATA_SCHEMA, errors=errors
)

View File

@@ -1,9 +0,0 @@
"""Constants for the Autoskope integration."""
from datetime import timedelta
DOMAIN = "autoskope"
DEFAULT_HOST = "https://portal.autoskope.de"
SECTION_ADVANCED_SETTINGS = "advanced_settings"
UPDATE_INTERVAL = timedelta(seconds=60)

View File

@@ -1,60 +0,0 @@
"""Data update coordinator for the Autoskope integration."""
from __future__ import annotations
import logging
from autoskope_client.api import AutoskopeApi
from autoskope_client.models import CannotConnect, InvalidAuth, Vehicle
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryAuthFailed
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import DOMAIN, UPDATE_INTERVAL
_LOGGER = logging.getLogger(__name__)
type AutoskopeConfigEntry = ConfigEntry[AutoskopeDataUpdateCoordinator]
class AutoskopeDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Vehicle]]):
"""Class to manage fetching Autoskope data."""
config_entry: AutoskopeConfigEntry
def __init__(
self, hass: HomeAssistant, api: AutoskopeApi, entry: AutoskopeConfigEntry
) -> None:
"""Initialize the coordinator."""
super().__init__(
hass,
_LOGGER,
name=DOMAIN,
update_interval=UPDATE_INTERVAL,
config_entry=entry,
)
self.api = api
async def _async_update_data(self) -> dict[str, Vehicle]:
"""Fetch data from API endpoint."""
try:
vehicles = await self.api.get_vehicles()
return {vehicle.id: vehicle for vehicle in vehicles}
except InvalidAuth:
# Attempt to re-authenticate using stored credentials
try:
await self.api.authenticate()
# Retry the request after successful re-authentication
vehicles = await self.api.get_vehicles()
return {vehicle.id: vehicle for vehicle in vehicles}
except InvalidAuth as reauth_err:
raise ConfigEntryAuthFailed(
f"Authentication failed: {reauth_err}"
) from reauth_err
except CannotConnect as err:
raise UpdateFailed(f"Error communicating with API: {err}") from err

View File

@@ -1,145 +0,0 @@
"""Support for Autoskope device tracking."""
from __future__ import annotations
from autoskope_client.constants import MANUFACTURER
from autoskope_client.models import Vehicle
from homeassistant.components.device_tracker import SourceType, TrackerEntity
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import device_registry as dr
from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .const import DOMAIN
from .coordinator import AutoskopeConfigEntry, AutoskopeDataUpdateCoordinator
PARALLEL_UPDATES = 0
async def async_setup_entry(
hass: HomeAssistant,
entry: AutoskopeConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up Autoskope device tracker entities."""
coordinator: AutoskopeDataUpdateCoordinator = entry.runtime_data
tracked_vehicles: set[str] = set()
@callback
def update_entities() -> None:
"""Update entities based on coordinator data."""
current_vehicles = set(coordinator.data.keys())
vehicles_to_add = current_vehicles - tracked_vehicles
if vehicles_to_add:
new_entities = [
AutoskopeDeviceTracker(coordinator, vehicle_id)
for vehicle_id in vehicles_to_add
]
tracked_vehicles.update(vehicles_to_add)
async_add_entities(new_entities)
entry.async_on_unload(coordinator.async_add_listener(update_entities))
update_entities()
class AutoskopeDeviceTracker(
CoordinatorEntity[AutoskopeDataUpdateCoordinator], TrackerEntity
):
"""Representation of an Autoskope tracked device."""
_attr_has_entity_name = True
_attr_name: str | None = None
def __init__(
self, coordinator: AutoskopeDataUpdateCoordinator, vehicle_id: str
) -> None:
"""Initialize the TrackerEntity."""
super().__init__(coordinator)
self._vehicle_id = vehicle_id
self._attr_unique_id = vehicle_id
@callback
def _handle_coordinator_update(self) -> None:
"""Handle updated data from the coordinator."""
if (
self._vehicle_id in self.coordinator.data
and (device_entry := self.device_entry) is not None
and device_entry.name != self._vehicle_data.name
):
device_registry = dr.async_get(self.hass)
device_registry.async_update_device(
device_entry.id, name=self._vehicle_data.name
)
super()._handle_coordinator_update()
@property
def device_info(self) -> DeviceInfo:
"""Return device info for the vehicle."""
vehicle = self.coordinator.data[self._vehicle_id]
return DeviceInfo(
identifiers={(DOMAIN, str(vehicle.id))},
name=vehicle.name,
manufacturer=MANUFACTURER,
model=vehicle.model,
serial_number=vehicle.imei,
)
@property
def available(self) -> bool:
"""Return if entity is available."""
return (
super().available
and self.coordinator.data is not None
and self._vehicle_id in self.coordinator.data
)
@property
def _vehicle_data(self) -> Vehicle:
"""Return the vehicle data for the current entity."""
return self.coordinator.data[self._vehicle_id]
@property
def latitude(self) -> float | None:
"""Return latitude value of the device."""
if (vehicle := self._vehicle_data) and vehicle.position:
return float(vehicle.position.latitude)
return None
@property
def longitude(self) -> float | None:
"""Return longitude value of the device."""
if (vehicle := self._vehicle_data) and vehicle.position:
return float(vehicle.position.longitude)
return None
@property
def source_type(self) -> SourceType:
"""Return the source type of the device."""
return SourceType.GPS
@property
def location_accuracy(self) -> float:
"""Return the location accuracy of the device in meters."""
if (vehicle := self._vehicle_data) and vehicle.gps_quality:
if vehicle.gps_quality > 0:
# HDOP to estimated accuracy in meters
# HDOP of 1-2 = good (5-10m), 2-5 = moderate (10-25m), >5 = poor (>25m)
return float(max(5, int(vehicle.gps_quality * 5.0)))
return 0.0
@property
def icon(self) -> str:
"""Return the icon based on the vehicle's activity."""
if self._vehicle_id not in self.coordinator.data:
return "mdi:car-clock"
vehicle = self._vehicle_data
if vehicle.position:
if vehicle.position.park_mode:
return "mdi:car-brake-parking"
if vehicle.position.speed > 5: # Moving threshold: 5 km/h
return "mdi:car-arrow-right"
return "mdi:car"
return "mdi:car-clock"

View File

@@ -1,11 +0,0 @@
{
"domain": "autoskope",
"name": "Autoskope",
"codeowners": ["@mcisk"],
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/autoskope",
"integration_type": "hub",
"iot_class": "cloud_polling",
"quality_scale": "bronze",
"requirements": ["autoskope_client==1.4.1"]
}

View File

@@ -1,88 +0,0 @@
# + in comment indicates requirement for quality scale
# - in comment indicates issue to be fixed, not impacting quality scale
rules:
# Bronze
action-setup:
status: exempt
comment: |
Integration does not provide custom services.
appropriate-polling: done
brands: done
common-modules: done
config-flow-test-coverage: done
config-flow: done
dependency-transparency: done
docs-actions:
status: exempt
comment: |
Integration does not provide custom services.
docs-high-level-description: done
docs-installation-instructions: done
docs-removal-instructions: done
entity-event-setup: done
entity-unique-id: done
has-entity-name: done
runtime-data: done
test-before-configure: done
test-before-setup: done
unique-config-entry: done
# Silver
action-exceptions:
status: exempt
comment: |
Integration does not provide custom services.
config-entry-unloading: done
docs-configuration-parameters: done
docs-installation-parameters: done
entity-unavailable: done
integration-owner: done
log-when-unavailable: todo
parallel-updates: done
reauthentication-flow:
status: todo
comment: |
Reauthentication flow removed for initial PR, will be added in follow-up.
test-coverage: done
# Gold
devices: done
diagnostics: todo
discovery-update-info:
status: exempt
comment: |
Integration does not use discovery. Autoskope devices use NB-IoT/LTE-M (via IoT SIMs) and LoRaWAN.
discovery:
status: exempt
comment: |
Integration does not use discovery. Autoskope devices use NB-IoT/LTE-M (via IoT SIMs) and LoRaWAN.
docs-data-update: done
docs-examples: done
docs-known-limitations: done
docs-supported-devices: done
docs-supported-functions: done
docs-troubleshooting: done
docs-use-cases: done
dynamic-devices: done
entity-category: done
entity-device-class: done
entity-disabled-by-default:
status: exempt
comment: |
Only one entity type (device_tracker) is created, making this not applicable.
entity-translations: done
exception-translations: done
icon-translations: done
reconfiguration-flow:
status: todo
comment: |
Reconfiguration flow removed for initial PR, will be added in follow-up.
repair-issues: todo
stale-devices: done
# Platinum
async-dependency: done
inject-websession: done
strict-typing:
status: todo
comment: |
Integration needs to be added to .strict-typing file for full compliance.

View File

@@ -1,52 +0,0 @@
{
"config": {
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]"
},
"error": {
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
"invalid_auth": "[%key:common::config_flow::error::invalid_auth%]",
"invalid_url": "Invalid URL",
"unknown": "[%key:common::config_flow::error::unknown%]"
},
"step": {
"user": {
"data": {
"password": "[%key:common::config_flow::data::password%]",
"username": "[%key:common::config_flow::data::username%]"
},
"data_description": {
"password": "The password for your Autoskope account.",
"username": "The username for your Autoskope account."
},
"description": "Enter your Autoskope credentials.",
"sections": {
"advanced_settings": {
"data": {
"host": "API endpoint"
},
"data_description": {
"host": "The URL of your Autoskope API endpoint. Only change this if you use a white-label portal."
},
"name": "Advanced settings"
}
},
"title": "Connect to Autoskope"
}
}
},
"issues": {
"cannot_connect": {
"description": "Home Assistant could not connect to the Autoskope API at {host}. Please check the connection details and ensure the API endpoint is reachable.\n\nError: {error}",
"title": "Failed to connect to Autoskope"
},
"invalid_auth": {
"description": "Authentication with Autoskope failed for user {username}. Please re-authenticate the integration with the correct password.",
"title": "Invalid Autoskope authentication"
},
"low_battery": {
"description": "The battery voltage for vehicle {vehicle_name} ({vehicle_id}) is low ({value}V). Consider checking or replacing the battery.",
"title": "Low vehicle battery ({vehicle_name})"
}
}
}

View File

@@ -2,7 +2,6 @@
from homeassistant.const import STATE_UNAVAILABLE, STATE_UNKNOWN
from homeassistant.core import HomeAssistant, State
from homeassistant.helpers.automation import DomainSpec
from homeassistant.helpers.trigger import (
ENTITY_STATE_TRIGGER_SCHEMA,
EntityTriggerBase,
@@ -15,7 +14,7 @@ from . import DOMAIN
class ButtonPressedTrigger(EntityTriggerBase):
"""Trigger for button entity presses."""
_domain_specs = {DOMAIN: DomainSpec()}
_domains = {DOMAIN}
_schema = ENTITY_STATE_TRIGGER_SCHEMA
def is_valid_transition(self, from_state: State, to_state: State) -> bool:

View File

@@ -5,14 +5,13 @@ import voluptuous as vol
from homeassistant.const import ATTR_TEMPERATURE, CONF_OPTIONS
from homeassistant.core import HomeAssistant
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.automation import DomainSpec, NumericalDomainSpec
from homeassistant.helpers.trigger import (
ENTITY_STATE_TRIGGER_SCHEMA_FIRST_LAST,
EntityTargetStateTriggerBase,
Trigger,
TriggerConfig,
make_entity_numerical_state_changed_trigger,
make_entity_numerical_state_crossed_threshold_trigger,
make_entity_numerical_state_attribute_changed_trigger,
make_entity_numerical_state_attribute_crossed_threshold_trigger,
make_entity_target_state_attribute_trigger,
make_entity_target_state_trigger,
make_entity_transition_trigger,
@@ -36,7 +35,7 @@ HVAC_MODE_CHANGED_TRIGGER_SCHEMA = ENTITY_STATE_TRIGGER_SCHEMA_FIRST_LAST.extend
class HVACModeChangedTrigger(EntityTargetStateTriggerBase):
"""Trigger for entity state changes."""
_domain_specs = {DOMAIN: DomainSpec()}
_domains = {DOMAIN}
_schema = HVAC_MODE_CHANGED_TRIGGER_SCHEMA
def __init__(self, hass: HomeAssistant, config: TriggerConfig) -> None:
@@ -53,17 +52,17 @@ TRIGGERS: dict[str, type[Trigger]] = {
"started_drying": make_entity_target_state_attribute_trigger(
DOMAIN, ATTR_HVAC_ACTION, HVACAction.DRYING
),
"target_humidity_changed": make_entity_numerical_state_changed_trigger(
{DOMAIN: NumericalDomainSpec(value_source=ATTR_HUMIDITY)}
"target_humidity_changed": make_entity_numerical_state_attribute_changed_trigger(
{DOMAIN}, {DOMAIN: ATTR_HUMIDITY}
),
"target_humidity_crossed_threshold": make_entity_numerical_state_crossed_threshold_trigger(
{DOMAIN: NumericalDomainSpec(value_source=ATTR_HUMIDITY)}
"target_humidity_crossed_threshold": make_entity_numerical_state_attribute_crossed_threshold_trigger(
{DOMAIN}, {DOMAIN: ATTR_HUMIDITY}
),
"target_temperature_changed": make_entity_numerical_state_changed_trigger(
{DOMAIN: NumericalDomainSpec(value_source=ATTR_TEMPERATURE)}
"target_temperature_changed": make_entity_numerical_state_attribute_changed_trigger(
{DOMAIN}, {DOMAIN: ATTR_TEMPERATURE}
),
"target_temperature_crossed_threshold": make_entity_numerical_state_crossed_threshold_trigger(
{DOMAIN: NumericalDomainSpec(value_source=ATTR_TEMPERATURE)}
"target_temperature_crossed_threshold": make_entity_numerical_state_attribute_crossed_threshold_trigger(
{DOMAIN}, {DOMAIN: ATTR_TEMPERATURE}
),
"turned_off": make_entity_target_state_trigger(DOMAIN, HVACMode.OFF),
"turned_on": make_entity_transition_trigger(

View File

@@ -1,82 +1,81 @@
"""Provides triggers for covers."""
from dataclasses import dataclass
from homeassistant.const import STATE_OFF, STATE_ON, STATE_UNAVAILABLE, STATE_UNKNOWN
from homeassistant.core import HomeAssistant, State, split_entity_id
from homeassistant.helpers.automation import DomainSpec
from homeassistant.helpers.trigger import EntityTriggerBase, Trigger
from homeassistant.helpers.trigger import (
EntityTriggerBase,
Trigger,
get_device_class_or_undefined,
)
from .const import ATTR_IS_CLOSED, DOMAIN, CoverDeviceClass
@dataclass(frozen=True, slots=True)
class CoverDomainSpec(DomainSpec):
"""DomainSpec with a target value for comparison."""
target_value: str | bool | None = None
class CoverTriggerBase(EntityTriggerBase[CoverDomainSpec]):
class CoverTriggerBase(EntityTriggerBase):
"""Base trigger for cover state changes."""
def _get_value(self, state: State) -> str | bool | None:
"""Extract the relevant value from state based on domain spec."""
domain_spec = self._domain_specs[split_entity_id(state.entity_id)[0]]
if domain_spec.value_source is not None:
return state.attributes.get(domain_spec.value_source)
return state.state
_binary_sensor_target_state: str
_cover_is_closed_target_value: bool
_device_classes: dict[str, str]
def entity_filter(self, entities: set[str]) -> set[str]:
"""Filter entities by cover device class."""
entities = super().entity_filter(entities)
return {
entity_id
for entity_id in entities
if get_device_class_or_undefined(self._hass, entity_id)
== self._device_classes[split_entity_id(entity_id)[0]]
}
def is_valid_state(self, state: State) -> bool:
"""Check if the state matches the target cover state."""
domain_spec = self._domain_specs[split_entity_id(state.entity_id)[0]]
return self._get_value(state) == domain_spec.target_value
if split_entity_id(state.entity_id)[0] == DOMAIN:
return (
state.attributes.get(ATTR_IS_CLOSED)
== self._cover_is_closed_target_value
)
return state.state == self._binary_sensor_target_state
def is_valid_transition(self, from_state: State, to_state: State) -> bool:
"""Check if the transition is valid for a cover state change."""
if from_state.state in (STATE_UNAVAILABLE, STATE_UNKNOWN):
return False
if (from_value := self._get_value(from_state)) is None:
return False
return from_value != self._get_value(to_state)
if split_entity_id(from_state.entity_id)[0] == DOMAIN:
if (from_is_closed := from_state.attributes.get(ATTR_IS_CLOSED)) is None:
return False
return from_is_closed != to_state.attributes.get(ATTR_IS_CLOSED) # type: ignore[no-any-return]
return from_state.state != to_state.state
def make_cover_opened_trigger(
*, device_classes: dict[str, str]
*, device_classes: dict[str, str], domains: set[str] | None = None
) -> type[CoverTriggerBase]:
"""Create a trigger cover_opened."""
class CoverOpenedTrigger(CoverTriggerBase):
"""Trigger for cover opened state changes."""
_domain_specs = {
domain: CoverDomainSpec(
device_class=dc,
value_source=ATTR_IS_CLOSED if domain == DOMAIN else None,
target_value=False if domain == DOMAIN else STATE_ON,
)
for domain, dc in device_classes.items()
}
_binary_sensor_target_state = STATE_ON
_cover_is_closed_target_value = False
_domains = domains or {DOMAIN}
_device_classes = device_classes
return CoverOpenedTrigger
def make_cover_closed_trigger(
*, device_classes: dict[str, str]
*, device_classes: dict[str, str], domains: set[str] | None = None
) -> type[CoverTriggerBase]:
"""Create a trigger cover_closed."""
class CoverClosedTrigger(CoverTriggerBase):
"""Trigger for cover closed state changes."""
_domain_specs = {
domain: CoverDomainSpec(
device_class=dc,
value_source=ATTR_IS_CLOSED if domain == DOMAIN else None,
target_value=True if domain == DOMAIN else STATE_OFF,
)
for domain, dc in device_classes.items()
}
_binary_sensor_target_state = STATE_OFF
_cover_is_closed_target_value = True
_domains = domains or {DOMAIN}
_device_classes = device_classes
return CoverClosedTrigger

View File

@@ -20,8 +20,14 @@ DEVICE_CLASSES_DOOR: dict[str, str] = {
TRIGGERS: dict[str, type[Trigger]] = {
"opened": make_cover_opened_trigger(device_classes=DEVICE_CLASSES_DOOR),
"closed": make_cover_closed_trigger(device_classes=DEVICE_CLASSES_DOOR),
"opened": make_cover_opened_trigger(
device_classes=DEVICE_CLASSES_DOOR,
domains={BINARY_SENSOR_DOMAIN, COVER_DOMAIN},
),
"closed": make_cover_closed_trigger(
device_classes=DEVICE_CLASSES_DOOR,
domains={BINARY_SENSOR_DOMAIN, COVER_DOMAIN},
),
}

View File

@@ -20,8 +20,14 @@ DEVICE_CLASSES_GARAGE_DOOR: dict[str, str] = {
TRIGGERS: dict[str, type[Trigger]] = {
"opened": make_cover_opened_trigger(device_classes=DEVICE_CLASSES_GARAGE_DOOR),
"closed": make_cover_closed_trigger(device_classes=DEVICE_CLASSES_GARAGE_DOOR),
"opened": make_cover_opened_trigger(
device_classes=DEVICE_CLASSES_GARAGE_DOOR,
domains={BINARY_SENSOR_DOMAIN, COVER_DOMAIN},
),
"closed": make_cover_closed_trigger(
device_classes=DEVICE_CLASSES_GARAGE_DOOR,
domains={BINARY_SENSOR_DOMAIN, COVER_DOMAIN},
),
}

View File

@@ -10,11 +10,7 @@ from functools import partial, wraps
import logging
from typing import Any, Concatenate
from aiohasupervisor import (
AddonNotSupportedError,
SupervisorError,
SupervisorNotFoundError,
)
from aiohasupervisor import SupervisorError
from aiohasupervisor.models import (
AddonsOptions,
AddonState as SupervisorAddonState,
@@ -169,7 +165,15 @@ class AddonManager:
)
addon_info = await self._supervisor_client.addons.addon_info(self.addon_slug)
return self._async_convert_installed_addon_info(addon_info)
addon_state = self.async_get_addon_state(addon_info)
return AddonInfo(
available=addon_info.available,
hostname=addon_info.hostname,
options=addon_info.options,
state=addon_state,
update_available=addon_info.update_available,
version=addon_info.version,
)
@callback
def async_get_addon_state(self, addon_info: InstalledAddonComplete) -> AddonState:
@@ -185,20 +189,6 @@ class AddonManager:
return addon_state
@callback
def _async_convert_installed_addon_info(
self, addon_info: InstalledAddonComplete
) -> AddonInfo:
"""Convert InstalledAddonComplete model to AddonInfo model."""
return AddonInfo(
available=addon_info.available,
hostname=addon_info.hostname,
options=addon_info.options,
state=self.async_get_addon_state(addon_info),
update_available=addon_info.update_available,
version=addon_info.version,
)
@api_error(
"Failed to set the {addon_name} app options",
expected_error_type=SupervisorError,
@@ -209,17 +199,21 @@ class AddonManager:
self.addon_slug, AddonsOptions(config=config)
)
def _check_addon_available(self, addon_info: AddonInfo) -> None:
"""Check if the managed add-on is available."""
if not addon_info.available:
raise AddonError(f"{self.addon_name} app is not available")
@api_error(
"Failed to install the {addon_name} app", expected_error_type=SupervisorError
)
async def async_install_addon(self) -> None:
"""Install the managed add-on."""
try:
await self._supervisor_client.store.install_addon(self.addon_slug)
except AddonNotSupportedError as err:
raise AddonError(
f"{self.addon_name} app is not available: {err!s}"
) from None
addon_info = await self.async_get_addon_info()
self._check_addon_available(addon_info)
await self._supervisor_client.store.install_addon(self.addon_slug)
@api_error(
"Failed to uninstall the {addon_name} app",
@@ -232,29 +226,17 @@ class AddonManager:
@api_error("Failed to update the {addon_name} app")
async def async_update_addon(self) -> None:
"""Update the managed add-on if needed."""
try:
# Not using async_get_addon_info here because it would make an unnecessary
# call to /store/addon/{slug}/info. This will raise if the addon is not
# installed so one call to /addon/{slug}/info is all that is needed
addon_info = await self._supervisor_client.addons.addon_info(
self.addon_slug
)
except SupervisorNotFoundError:
raise AddonError(f"{self.addon_name} app is not installed") from None
addon_info = await self.async_get_addon_info()
self._check_addon_available(addon_info)
if addon_info.state is AddonState.NOT_INSTALLED:
raise AddonError(f"{self.addon_name} app is not installed")
if not addon_info.update_available:
return
try:
await self._supervisor_client.store.addon_availability(self.addon_slug)
except AddonNotSupportedError as err:
raise AddonError(
f"{self.addon_name} app is not available: {err!s}"
) from None
await self.async_create_backup(
addon_info=self._async_convert_installed_addon_info(addon_info)
)
await self.async_create_backup()
await self._supervisor_client.store.update_addon(
self.addon_slug, StoreAddonUpdate(backup=False)
)
@@ -284,14 +266,10 @@ class AddonManager:
"Failed to create a backup of the {addon_name} app",
expected_error_type=SupervisorError,
)
async def async_create_backup(self, *, addon_info: AddonInfo | None = None) -> None:
async def async_create_backup(self) -> None:
"""Create a partial backup of the managed add-on."""
if addon_info:
addon_version = addon_info.version
else:
addon_version = (await self.async_get_addon_info()).version
name = f"addon_{self.addon_slug}_{addon_version}"
addon_info = await self.async_get_addon_info()
name = f"addon_{self.addon_slug}_{addon_info.version}"
self._logger.debug("Creating backup: %s", name)
await self._supervisor_client.backups.partial_backup(

View File

@@ -15,43 +15,50 @@ from homeassistant.components.weather import (
ATTR_WEATHER_HUMIDITY,
DOMAIN as WEATHER_DOMAIN,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers.automation import NumericalDomainSpec
from homeassistant.core import HomeAssistant, split_entity_id
from homeassistant.helpers.trigger import (
EntityNumericalStateAttributeChangedTriggerBase,
EntityNumericalStateAttributeCrossedThresholdTriggerBase,
EntityTriggerBase,
Trigger,
get_device_class_or_undefined,
)
HUMIDITY_DOMAIN_SPECS: dict[str, NumericalDomainSpec] = {
CLIMATE_DOMAIN: NumericalDomainSpec(
value_source=CLIMATE_ATTR_CURRENT_HUMIDITY,
),
HUMIDIFIER_DOMAIN: NumericalDomainSpec(
value_source=HUMIDIFIER_ATTR_CURRENT_HUMIDITY,
),
SENSOR_DOMAIN: NumericalDomainSpec(
device_class=SensorDeviceClass.HUMIDITY,
),
WEATHER_DOMAIN: NumericalDomainSpec(
value_source=ATTR_WEATHER_HUMIDITY,
),
}
class _HumidityTriggerMixin(EntityTriggerBase):
"""Mixin for humidity triggers providing entity filtering and value extraction."""
_attributes = {
CLIMATE_DOMAIN: CLIMATE_ATTR_CURRENT_HUMIDITY,
HUMIDIFIER_DOMAIN: HUMIDIFIER_ATTR_CURRENT_HUMIDITY,
SENSOR_DOMAIN: None, # Use state.state
WEATHER_DOMAIN: ATTR_WEATHER_HUMIDITY,
}
_domains = {SENSOR_DOMAIN, CLIMATE_DOMAIN, HUMIDIFIER_DOMAIN, WEATHER_DOMAIN}
def entity_filter(self, entities: set[str]) -> set[str]:
"""Filter entities: all climate/humidifier/weather, sensor only with device_class humidity."""
entities = super().entity_filter(entities)
return {
entity_id
for entity_id in entities
if split_entity_id(entity_id)[0] != SENSOR_DOMAIN
or get_device_class_or_undefined(self._hass, entity_id)
== SensorDeviceClass.HUMIDITY
}
class HumidityChangedTrigger(EntityNumericalStateAttributeChangedTriggerBase):
class HumidityChangedTrigger(
_HumidityTriggerMixin, EntityNumericalStateAttributeChangedTriggerBase
):
"""Trigger for humidity value changes across multiple domains."""
_domain_specs = HUMIDITY_DOMAIN_SPECS
class HumidityCrossedThresholdTrigger(
EntityNumericalStateAttributeCrossedThresholdTriggerBase
_HumidityTriggerMixin, EntityNumericalStateAttributeCrossedThresholdTriggerBase
):
"""Trigger for humidity value crossing a threshold across multiple domains."""
_domain_specs = HUMIDITY_DOMAIN_SPECS
TRIGGERS: dict[str, type[Trigger]] = {
"changed": HumidityChangedTrigger,

View File

@@ -74,7 +74,7 @@ class IntelliClimaVMCFan(IntelliClimaECOEntity, FanEntity):
"""Return the current speed percentage."""
device_data = self._device_data
if device_data.speed_set == FanSpeed.auto_get:
if device_data.speed_set == FanSpeed.auto:
return None
return ranged_value_to_percentage(self._speed_range, int(device_data.speed_set))
@@ -92,7 +92,7 @@ class IntelliClimaVMCFan(IntelliClimaECOEntity, FanEntity):
if device_data.mode_set == FanMode.off:
return None
if (
device_data.speed_set == FanSpeed.auto_get
device_data.speed_set == FanSpeed.auto
and device_data.mode_set == FanMode.sensor
):
return "auto"
@@ -111,7 +111,7 @@ class IntelliClimaVMCFan(IntelliClimaECOEntity, FanEntity):
infinitely.
"""
percentage = 25 if percentage == 0 else percentage
await self.async_set_mode_speed(preset_mode=preset_mode, percentage=percentage)
await self.async_set_mode_speed(fan_mode=preset_mode, percentage=percentage)
async def async_turn_off(self, **kwargs: Any) -> None:
"""Turn off the fan."""
@@ -124,10 +124,10 @@ class IntelliClimaVMCFan(IntelliClimaECOEntity, FanEntity):
async def async_set_preset_mode(self, preset_mode: str) -> None:
"""Set preset mode."""
await self.async_set_mode_speed(preset_mode=preset_mode)
await self.async_set_mode_speed(fan_mode=preset_mode)
async def async_set_mode_speed(
self, preset_mode: str | None = None, percentage: int | None = None
self, fan_mode: str | None = None, percentage: int | None = None
) -> None:
"""Set mode and speed.
@@ -137,7 +137,7 @@ class IntelliClimaVMCFan(IntelliClimaECOEntity, FanEntity):
percentage = self.percentage if percentage is None else percentage
percentage = 25 if percentage is None else percentage
if preset_mode == "auto":
if fan_mode == "auto":
# auto is a special case with special mode and speed setting
await self.coordinator.api.ecocomfort.set_mode_speed_auto(self._device_sn)
await self.coordinator.async_request_refresh()
@@ -148,20 +148,21 @@ class IntelliClimaVMCFan(IntelliClimaECOEntity, FanEntity):
return
# Determine the fan mode
if not self.is_on:
if fan_mode is not None:
# Set to requested fan_mode
mode = fan_mode
elif not self.is_on:
# Default to alternate fan mode if not turned on
mode = FanMode.alternate
else:
# Maintain current mode
mode = self._device_data.mode_set
speed = FanSpeed(
str(
math.ceil(
percentage_to_ranged_value(
self._speed_range,
percentage,
)
speed = str(
math.ceil(
percentage_to_ranged_value(
self._speed_range,
percentage,
)
)
)

View File

@@ -7,5 +7,5 @@
"integration_type": "device",
"iot_class": "cloud_polling",
"quality_scale": "bronze",
"requirements": ["pyintelliclima==0.3.1"]
"requirements": ["pyintelliclima==0.2.2"]
}

View File

@@ -68,12 +68,12 @@ class IntelliClimaVMCFanModeSelect(IntelliClimaECOEntity, SelectEntity):
# If in auto mode (sensor mode with auto speed), return None (handled by fan entity preset mode)
if (
device_data.speed_set == FanSpeed.auto_get
device_data.speed_set == FanSpeed.auto
and device_data.mode_set == FanMode.sensor
):
return None
return INTELLICLIMA_MODE_TO_FAN_MODE.get(device_data.mode_set)
return INTELLICLIMA_MODE_TO_FAN_MODE.get(FanMode(device_data.mode_set))
async def async_select_option(self, option: str) -> None:
"""Set the fan mode."""
@@ -83,7 +83,7 @@ class IntelliClimaVMCFanModeSelect(IntelliClimaECOEntity, SelectEntity):
# Determine speed: keep current speed if available, otherwise default to sleep
if (
device_data.speed_set == FanSpeed.auto_get
device_data.speed_set == FanSpeed.auto
or device_data.mode_set == FanMode.off
):
speed = FanSpeed.sleep

View File

@@ -6,7 +6,7 @@
"error": {
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
"invalid_auth": "[%key:common::config_flow::error::invalid_auth%]",
"no_devices": "No supported IntelliClima devices were found in your account",
"no_devices": "No IntelliClima devices found in your account",
"unknown": "[%key:common::config_flow::error::unknown%]"
},
"step": {

View File

@@ -4,7 +4,6 @@ from typing import Any
from homeassistant.const import STATE_OFF, STATE_ON
from homeassistant.core import HomeAssistant
from homeassistant.helpers.automation import NumericalDomainSpec
from homeassistant.helpers.trigger import (
EntityNumericalStateAttributeChangedTriggerBase,
EntityNumericalStateAttributeCrossedThresholdTriggerBase,
@@ -21,18 +20,13 @@ def _convert_uint8_to_percentage(value: Any) -> float:
return (float(value) / 255.0) * 100.0
BRIGHTNESS_DOMAIN_SPECS = {
DOMAIN: NumericalDomainSpec(
value_source=ATTR_BRIGHTNESS,
value_converter=_convert_uint8_to_percentage,
),
}
class BrightnessChangedTrigger(EntityNumericalStateAttributeChangedTriggerBase):
"""Trigger for brightness changed."""
_domain_specs = BRIGHTNESS_DOMAIN_SPECS
_domains = {DOMAIN}
_attributes = {DOMAIN: ATTR_BRIGHTNESS}
_converter = staticmethod(_convert_uint8_to_percentage)
class BrightnessCrossedThresholdTrigger(
@@ -40,7 +34,9 @@ class BrightnessCrossedThresholdTrigger(
):
"""Trigger for brightness crossed threshold."""
_domain_specs = BRIGHTNESS_DOMAIN_SPECS
_domains = {DOMAIN}
_attributes = {DOMAIN: ATTR_BRIGHTNESS}
_converter = staticmethod(_convert_uint8_to_percentage)
TRIGGERS: dict[str, type[Trigger]] = {

View File

@@ -20,8 +20,6 @@ from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from .coordinator import LitterRobotConfigEntry
from .entity import LitterRobotEntity, _WhiskerEntityT
PARALLEL_UPDATES = 0
@dataclass(frozen=True, kw_only=True)
class RobotBinarySensorEntityDescription(

View File

@@ -14,9 +14,7 @@ from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from .coordinator import LitterRobotConfigEntry
from .entity import LitterRobotEntity, _WhiskerEntityT, whisker_command
PARALLEL_UPDATES = 1
from .entity import LitterRobotEntity, _WhiskerEntityT
@dataclass(frozen=True, kw_only=True)
@@ -73,7 +71,6 @@ class LitterRobotButtonEntity(LitterRobotEntity[_WhiskerEntityT], ButtonEntity):
entity_description: RobotButtonEntityDescription[_WhiskerEntityT]
@whisker_command
async def async_press(self) -> None:
"""Press the button."""
await self.entity_description.press_fn(self.robot)

View File

@@ -46,18 +46,11 @@ class LitterRobotDataUpdateCoordinator(DataUpdateCoordinator[None]):
async def _async_update_data(self) -> None:
"""Update all device states from the Litter-Robot API."""
try:
await self.account.refresh_robots()
await self.account.load_pets()
for pet in self.account.pets:
# Need to fetch weight history for `get_visits_since`
await pet.fetch_weight_history()
except LitterRobotLoginException as ex:
raise ConfigEntryAuthFailed("Invalid credentials") from ex
except LitterRobotException as ex:
raise UpdateFailed(
f"Unable to fetch data from the Whisker API: {ex}"
) from ex
await self.account.refresh_robots()
await self.account.load_pets()
for pet in self.account.pets:
# Need to fetch weight history for `get_visits_since`
await pet.fetch_weight_history()
async def _async_setup(self) -> None:
"""Set up the coordinator."""

View File

@@ -1,24 +0,0 @@
"""Diagnostics support for Litter-Robot."""
from __future__ import annotations
from typing import Any
from pylitterbot.utils import REDACT_FIELDS
from homeassistant.components.diagnostics import async_redact_data
from homeassistant.core import HomeAssistant
from .coordinator import LitterRobotConfigEntry
async def async_get_config_entry_diagnostics(
hass: HomeAssistant, entry: LitterRobotConfigEntry
) -> dict[str, Any]:
"""Return diagnostics for a config entry."""
account = entry.runtime_data.account
data = {
"robots": [robot.to_dict() for robot in account.robots],
"pets": [pet.to_dict() for pet in account.pets],
}
return async_redact_data(data, REDACT_FIELDS)

View File

@@ -2,14 +2,11 @@
from __future__ import annotations
from collections.abc import Awaitable, Callable, Coroutine
from typing import Any, Concatenate, Generic, TypeVar
from typing import Generic, TypeVar
from pylitterbot import Pet, Robot
from pylitterbot.exceptions import LitterRobotException
from pylitterbot.robot import EVENT_UPDATE
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.entity import EntityDescription
from homeassistant.helpers.update_coordinator import CoordinatorEntity
@@ -20,26 +17,6 @@ from .coordinator import LitterRobotDataUpdateCoordinator
_WhiskerEntityT = TypeVar("_WhiskerEntityT", bound=Robot | Pet)
def whisker_command[_WhiskerEntityT2: LitterRobotEntity, **_P](
func: Callable[Concatenate[_WhiskerEntityT2, _P], Awaitable[None]],
) -> Callable[Concatenate[_WhiskerEntityT2, _P], Coroutine[Any, Any, None]]:
"""Wrap a Whisker command to handle exceptions."""
async def handler(
self: _WhiskerEntityT2, *args: _P.args, **kwargs: _P.kwargs
) -> None:
try:
await func(self, *args, **kwargs)
except LitterRobotException as ex:
raise HomeAssistantError(
translation_domain=DOMAIN,
translation_key="command_failed",
translation_placeholders={"error": str(ex)},
) from ex
return handler
def get_device_info(whisker_entity: Robot | Pet) -> DeviceInfo:
"""Get device info for a robot or pet."""
if isinstance(whisker_entity, Robot):

View File

@@ -23,16 +23,16 @@ rules:
unique-config-entry: done
# Silver
action-exceptions: done
action-exceptions: todo
config-entry-unloading: done
docs-configuration-parameters:
status: done
comment: No options to configure
docs-installation-parameters: done
entity-unavailable: done
entity-unavailable: todo
integration-owner: done
log-when-unavailable: done
parallel-updates: done
log-when-unavailable: todo
parallel-updates: todo
reauthentication-flow: done
test-coverage:
status: todo
@@ -42,7 +42,7 @@ rules:
# Gold
devices: done
diagnostics: done
diagnostics: todo
discovery-update-info:
status: done
comment: The integration is cloud-based

View File

@@ -15,9 +15,7 @@ from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from .coordinator import LitterRobotConfigEntry, LitterRobotDataUpdateCoordinator
from .entity import LitterRobotEntity, _WhiskerEntityT, whisker_command
PARALLEL_UPDATES = 1
from .entity import LitterRobotEntity, _WhiskerEntityT
_CastTypeT = TypeVar("_CastTypeT", int, float, str)
@@ -156,7 +154,6 @@ class LitterRobotSelectEntity(
"""Return the selected entity option to represent the entity state."""
return str(self.entity_description.current_fn(self.robot))
@whisker_command
async def async_select_option(self, option: str) -> None:
"""Change the selected option."""
await self.entity_description.select_fn(self.robot, option)

View File

@@ -23,8 +23,6 @@ from homeassistant.util import dt as dt_util
from .coordinator import LitterRobotConfigEntry
from .entity import LitterRobotEntity, _WhiskerEntityT
PARALLEL_UPDATES = 0
def icon_for_gauge_level(gauge_level: int | None = None, offset: int = 0) -> str:
"""Return a gauge icon valid identifier."""

View File

@@ -195,14 +195,6 @@
}
}
},
"exceptions": {
"command_failed": {
"message": "An error occurred while communicating with the device: {error}"
},
"firmware_update_failed": {
"message": "Unable to start firmware update on {name}"
}
},
"issues": {
"deprecated_entity": {
"description": "The Litter-Robot entity `{entity}` is deprecated and will be removed in a future release.\nPlease update your dashboards, automations and scripts, disable `{entity}` and reload the integration/restart Home Assistant to fix this issue.",

View File

@@ -25,9 +25,7 @@ from homeassistant.helpers.issue_registry import (
from .const import DOMAIN
from .coordinator import LitterRobotConfigEntry
from .entity import LitterRobotEntity, _WhiskerEntityT, whisker_command
PARALLEL_UPDATES = 1
from .entity import LitterRobotEntity, _WhiskerEntityT
@dataclass(frozen=True, kw_only=True)
@@ -137,12 +135,10 @@ class RobotSwitchEntity(LitterRobotEntity[_WhiskerEntityT], SwitchEntity):
"""Return true if switch is on."""
return self.entity_description.value_fn(self.robot)
@whisker_command
async def async_turn_on(self, **kwargs: Any) -> None:
"""Turn the switch on."""
await self.entity_description.set_fn(self.robot, True)
@whisker_command
async def async_turn_off(self, **kwargs: Any) -> None:
"""Turn the switch off."""
await self.entity_description.set_fn(self.robot, False)

View File

@@ -16,9 +16,7 @@ from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from homeassistant.util import dt as dt_util
from .coordinator import LitterRobotConfigEntry
from .entity import LitterRobotEntity, _WhiskerEntityT, whisker_command
PARALLEL_UPDATES = 1
from .entity import LitterRobotEntity, _WhiskerEntityT
@dataclass(frozen=True, kw_only=True)
@@ -76,7 +74,6 @@ class LitterRobotTimeEntity(LitterRobotEntity[_WhiskerEntityT], TimeEntity):
"""Return the value reported by the time."""
return self.entity_description.value_fn(self.robot)
@whisker_command
async def async_set_value(self, value: time) -> None:
"""Update the current value."""
await self.entity_description.set_fn(self.robot, value)

View File

@@ -17,11 +17,8 @@ from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from .const import DOMAIN
from .coordinator import LitterRobotConfigEntry
from .entity import LitterRobotEntity, whisker_command
PARALLEL_UPDATES = 1
from .entity import LitterRobotEntity
SCAN_INTERVAL = timedelta(days=1)
@@ -83,15 +80,11 @@ class RobotUpdateEntity(LitterRobotEntity[LitterRobot4], UpdateEntity):
latest_version = self.robot.firmware
self._attr_latest_version = latest_version
@whisker_command
async def async_install(
self, version: str | None, backup: bool, **kwargs: Any
) -> None:
"""Install an update."""
if await self.robot.has_firmware_update(True):
if not await self.robot.update_firmware():
raise HomeAssistantError(
translation_domain=DOMAIN,
translation_key="firmware_update_failed",
translation_placeholders={"name": self.robot.name},
)
message = f"Unable to start firmware update on {self.robot.name}"
raise HomeAssistantError(message)

View File

@@ -19,9 +19,7 @@ from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from homeassistant.util import dt as dt_util
from .coordinator import LitterRobotConfigEntry
from .entity import LitterRobotEntity, whisker_command
PARALLEL_UPDATES = 1
from .entity import LitterRobotEntity
LITTER_BOX_STATUS_STATE_MAP = {
LitterBoxStatus.CLEAN_CYCLE: VacuumActivity.CLEANING,
@@ -68,18 +66,15 @@ class LitterRobotCleaner(LitterRobotEntity[LitterRobot], StateVacuumEntity):
"""Return the state of the cleaner."""
return LITTER_BOX_STATUS_STATE_MAP.get(self.robot.status, VacuumActivity.ERROR)
@whisker_command
async def async_start(self) -> None:
"""Start a clean cycle."""
await self.robot.set_power_status(True)
await self.robot.start_cleaning()
@whisker_command
async def async_stop(self, **kwargs: Any) -> None:
"""Stop the vacuum cleaner."""
await self.robot.set_power_status(False)
@whisker_command
async def async_set_sleep_mode(
self, enabled: bool, start_time: str | None = None
) -> None:

View File

@@ -187,27 +187,6 @@ DISCOVERY_SCHEMAS = [
# allow None value to account for 'default' value
allow_none_value=True,
),
MatterDiscoverySchema(
platform=Platform.NUMBER,
entity_description=MatterNumberEntityDescription(
key="power_on_level",
entity_category=EntityCategory.CONFIG,
translation_key="power_on_level",
native_max_value=255,
native_min_value=0,
mode=NumberMode.BOX,
# use 255 to indicate that the value should revert to the default
device_to_ha=lambda x: 255 if x is None else x,
ha_to_device=lambda x: None if x == 255 else int(x),
native_step=1,
native_unit_of_measurement=None,
),
entity_class=MatterNumber,
required_attributes=(clusters.LevelControl.Attributes.StartUpCurrentLevel,),
not_device_type=(device_types.Speaker,),
# allow None value to account for 'default' value
allow_none_value=True,
),
MatterDiscoverySchema(
platform=Platform.NUMBER,
entity_description=MatterNumberEntityDescription(

View File

@@ -238,9 +238,6 @@
"on_transition_time": {
"name": "On transition time"
},
"power_on_level": {
"name": "Power-on level"
},
"pump_setpoint": {
"name": "Setpoint"
},
@@ -325,11 +322,11 @@
}
},
"startup_on_off": {
"name": "Power-on behavior",
"name": "Power-on behavior on startup",
"state": {
"off": "[%key:common::state::off%]",
"on": "[%key:common::state::on%]",
"previous": "Previous state",
"previous": "Previous",
"toggle": "[%key:common::action::toggle%]"
}
},

View File

@@ -6,20 +6,28 @@ from homeassistant.components.binary_sensor import (
)
from homeassistant.const import STATE_OFF, STATE_ON
from homeassistant.core import HomeAssistant
from homeassistant.helpers.automation import DomainSpec
from homeassistant.helpers.trigger import (
EntityTargetStateTriggerBase,
EntityTriggerBase,
Trigger,
get_device_class_or_undefined,
)
class _MotionBinaryTriggerBase(EntityTriggerBase):
"""Base trigger for motion binary sensor state changes."""
_domain_specs = {
BINARY_SENSOR_DOMAIN: DomainSpec(device_class=BinarySensorDeviceClass.MOTION)
}
_domains = {BINARY_SENSOR_DOMAIN}
def entity_filter(self, entities: set[str]) -> set[str]:
"""Filter entities by motion device class."""
entities = super().entity_filter(entities)
return {
entity_id
for entity_id in entities
if get_device_class_or_undefined(self._hass, entity_id)
== BinarySensorDeviceClass.MOTION
}
class MotionDetectedTrigger(_MotionBinaryTriggerBase, EntityTargetStateTriggerBase):

View File

@@ -6,20 +6,28 @@ from homeassistant.components.binary_sensor import (
)
from homeassistant.const import STATE_OFF, STATE_ON
from homeassistant.core import HomeAssistant
from homeassistant.helpers.automation import DomainSpec
from homeassistant.helpers.trigger import (
EntityTargetStateTriggerBase,
EntityTriggerBase,
Trigger,
get_device_class_or_undefined,
)
class _OccupancyBinaryTriggerBase(EntityTriggerBase):
"""Base trigger for occupancy binary sensor state changes."""
_domain_specs = {
BINARY_SENSOR_DOMAIN: DomainSpec(device_class=BinarySensorDeviceClass.OCCUPANCY)
}
_domains = {BINARY_SENSOR_DOMAIN}
def entity_filter(self, entities: set[str]) -> set[str]:
"""Filter entities by occupancy device class."""
entities = super().entity_filter(entities)
return {
entity_id
for entity_id in entities
if get_device_class_or_undefined(self._hass, entity_id)
== BinarySensorDeviceClass.OCCUPANCY
}
class OccupancyDetectedTrigger(

View File

@@ -14,11 +14,13 @@ from .coordinator import PranaConfigEntry, PranaCoordinator
_LOGGER = logging.getLogger(__name__)
PLATFORMS = [Platform.FAN, Platform.SWITCH]
# Keep platforms sorted alphabetically to satisfy lint rule
PLATFORMS = [Platform.SWITCH]
async def async_setup_entry(hass: HomeAssistant, entry: PranaConfigEntry) -> bool:
"""Set up Prana from a config entry."""
coordinator = PranaCoordinator(hass, entry)
await coordinator.async_config_entry_first_refresh()
entry.runtime_data = coordinator

View File

@@ -1,186 +0,0 @@
"""Fan platform for Prana integration."""
from collections.abc import Callable
from dataclasses import dataclass
import math
from typing import Any
from prana_local_api_client.models.prana_state import FanState
from homeassistant.components.fan import (
FanEntity,
FanEntityDescription,
FanEntityFeature,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from homeassistant.util.percentage import (
percentage_to_ranged_value,
ranged_value_to_percentage,
)
from homeassistant.util.scaling import int_states_in_range
from . import PranaConfigEntry
from .entity import PranaBaseEntity, PranaCoordinator, PranaEntityDescription, StrEnum
PARALLEL_UPDATES = 1
# The Prana device API expects fan speed values in scaled units (tenths of a speed step)
# rather than the raw step value used internally by this integration. This factor is
# applied when sending speeds to the API to match its expected units.
PRANA_SPEED_MULTIPLIER = 10
class PranaFanType(StrEnum):
"""Enumerates Prana fan types exposed by the device API."""
SUPPLY = "supply"
EXTRACT = "extract"
BOUNDED = "bounded"
@dataclass(frozen=True, kw_only=True)
class PranaFanEntityDescription(FanEntityDescription, PranaEntityDescription):
"""Description of a Prana fan entity."""
value_fn: Callable[[PranaCoordinator], FanState]
speed_range: Callable[[PranaCoordinator], tuple[int, int]]
ENTITIES: tuple[PranaEntityDescription, ...] = (
PranaFanEntityDescription(
key=PranaFanType.SUPPLY,
translation_key="supply",
value_fn=lambda coord: (
coord.data.supply if not coord.data.bound else coord.data.bounded
),
speed_range=lambda coord: (
1,
coord.data.supply.max_speed
if not coord.data.bound
else coord.data.bounded.max_speed,
),
),
PranaFanEntityDescription(
key=PranaFanType.EXTRACT,
translation_key="extract",
value_fn=lambda coord: (
coord.data.extract if not coord.data.bound else coord.data.bounded
),
speed_range=lambda coord: (
1,
coord.data.extract.max_speed
if not coord.data.bound
else coord.data.bounded.max_speed,
),
),
)
async def async_setup_entry(
hass: HomeAssistant,
entry: PranaConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up Prana fan entities from a config entry."""
async_add_entities(
PranaFan(entry.runtime_data, entity_description)
for entity_description in ENTITIES
)
class PranaFan(PranaBaseEntity, FanEntity):
"""Representation of a Prana fan entity."""
entity_description: PranaFanEntityDescription
_attr_preset_modes = ["night", "boost"]
_attr_supported_features = (
FanEntityFeature.SET_SPEED
| FanEntityFeature.TURN_ON
| FanEntityFeature.TURN_OFF
| FanEntityFeature.PRESET_MODE
)
@property
def _api_target_key(self) -> str:
"""Return the correct target key for API commands based on bounded state."""
# If the device is in bound mode, both supply and extract fans control the same bounded fan speeds.
if self.coordinator.data.bound:
return PranaFanType.BOUNDED
# Otherwise, return the specific fan type (supply or extract) for API commands.
return self.entity_description.key
@property
def speed_count(self) -> int:
"""Return the number of speeds the fan supports."""
return int_states_in_range(
self.entity_description.speed_range(self.coordinator)
)
@property
def percentage(self) -> int | None:
"""Return the current fan speed percentage."""
current_speed = self.entity_description.value_fn(self.coordinator).speed
return ranged_value_to_percentage(
self.entity_description.speed_range(self.coordinator), current_speed
)
async def async_set_percentage(self, percentage: int) -> None:
"""Set fan speed (0-100%) by converting to device-specific speed steps."""
if percentage == 0:
await self.async_turn_off()
return
await self.coordinator.api_client.set_speed(
math.ceil(
percentage_to_ranged_value(
self.entity_description.speed_range(self.coordinator),
percentage,
)
)
* PRANA_SPEED_MULTIPLIER,
self._api_target_key,
)
await self.coordinator.async_refresh()
@property
def is_on(self) -> bool:
"""Return true if the fan is on."""
return self.entity_description.value_fn(self.coordinator).is_on
async def async_turn_on(
self,
percentage: int | None = None,
preset_mode: str | None = None,
**kwargs: Any,
) -> None:
"""Turn the fan on and optionally set speed or preset mode."""
if percentage == 0:
await self.async_turn_off()
return
await self.coordinator.api_client.set_speed_is_on(True, self._api_target_key)
if percentage is not None:
await self.async_set_percentage(percentage)
if preset_mode is not None:
await self.async_set_preset_mode(preset_mode)
if percentage is None and preset_mode is None:
await self.coordinator.async_refresh()
async def async_turn_off(self, **kwargs: Any) -> None:
"""Turn the fan off."""
await self.coordinator.api_client.set_speed_is_on(False, self._api_target_key)
await self.coordinator.async_refresh()
async def async_set_preset_mode(self, preset_mode: str) -> None:
"""Set the preset mode (e.g., night or boost)."""
await self.coordinator.api_client.set_switch(preset_mode, True)
await self.coordinator.async_refresh()
@property
def preset_mode(self) -> str | None:
"""Return the current preset mode."""
if self.coordinator.data.night:
return "night"
if self.coordinator.data.boost:
return "boost"
return None

View File

@@ -1,13 +1,5 @@
{
"entity": {
"fan": {
"extract": {
"default": "mdi:arrow-expand-right"
},
"supply": {
"default": "mdi:arrow-expand-left"
}
},
"switch": {
"auto": {
"default": "mdi:fan-auto"

View File

@@ -25,30 +25,6 @@
}
},
"entity": {
"fan": {
"extract": {
"name": "Extract fan",
"state_attributes": {
"preset_mode": {
"state": {
"boost": "Boost",
"night": "Night"
}
}
}
},
"supply": {
"name": "Supply fan",
"state_attributes": {
"preset_mode": {
"state": {
"boost": "[%key:component::prana::entity::fan::extract::state_attributes::preset_mode::state::boost%]",
"night": "[%key:component::prana::entity::fan::extract::state_attributes::preset_mode::state::night%]"
}
}
}
}
},
"switch": {
"auto": {
"name": "Auto"

View File

@@ -2,7 +2,6 @@
from homeassistant.const import STATE_UNAVAILABLE, STATE_UNKNOWN
from homeassistant.core import HomeAssistant, State
from homeassistant.helpers.automation import DomainSpec
from homeassistant.helpers.trigger import (
ENTITY_STATE_TRIGGER_SCHEMA,
EntityTriggerBase,
@@ -15,7 +14,7 @@ from . import DOMAIN
class SceneActivatedTrigger(EntityTriggerBase):
"""Trigger for scene entity activations."""
_domain_specs = {DOMAIN: DomainSpec()}
_domains = {DOMAIN}
_schema = ENTITY_STATE_TRIGGER_SCHEMA
def is_valid_transition(self, from_state: State, to_state: State) -> bool:

View File

@@ -2,7 +2,6 @@
from homeassistant.const import STATE_OFF, STATE_ON, STATE_UNAVAILABLE, STATE_UNKNOWN
from homeassistant.core import HomeAssistant, State
from homeassistant.helpers.automation import DomainSpec
from homeassistant.helpers.trigger import (
EntityTransitionTriggerBase,
Trigger,
@@ -15,7 +14,7 @@ from .const import ATTR_NEXT_EVENT, DOMAIN
class ScheduleBackToBackTrigger(EntityTransitionTriggerBase):
"""Trigger for back-to-back schedule blocks."""
_domain_specs = {DOMAIN: DomainSpec()}
_domains = {DOMAIN}
_from_states = {STATE_OFF, STATE_ON}
_to_states = {STATE_ON}

View File

@@ -74,11 +74,6 @@ from .const import (
_LOGGER = logging.getLogger(__name__)
def format_zigbee_address(address: str) -> str:
"""Format a zigbee address to be more readable."""
return ":".join(address.lower()[i : i + 2] for i in range(0, 16, 2))
@dataclass
class SmartThingsData:
"""Define an object to hold SmartThings data."""
@@ -495,14 +490,6 @@ def create_devices(
kwargs[ATTR_CONNECTIONS] = {
(dr.CONNECTION_NETWORK_MAC, device.device.hub.mac_address)
}
if device.device.hub.hub_eui:
connections = kwargs.setdefault(ATTR_CONNECTIONS, set())
connections.add(
(
dr.CONNECTION_ZIGBEE,
format_zigbee_address(device.device.hub.hub_eui),
)
)
if device.device.parent_device_id and device.device.parent_device_id in devices:
kwargs[ATTR_VIA_DEVICE] = (DOMAIN, device.device.parent_device_id)
if (ocf := device.device.ocf) is not None:
@@ -526,10 +513,6 @@ def create_devices(
ATTR_SW_VERSION: viper.software_version,
}
)
if (zigbee := device.device.zigbee) is not None:
kwargs[ATTR_CONNECTIONS] = {
(dr.CONNECTION_ZIGBEE, format_zigbee_address(zigbee.eui))
}
if (matter := device.device.matter) is not None:
kwargs.update(
{

View File

@@ -34,5 +34,5 @@
"iot_class": "cloud_push",
"loggers": ["pysmartthings"],
"quality_scale": "bronze",
"requirements": ["pysmartthings==3.7.0"]
"requirements": ["pysmartthings==3.6.0"]
}

View File

@@ -42,7 +42,7 @@
"username": "[%key:common::config_flow::data::username%]"
},
"description": "Please enter your MySubaru credentials\nNOTE: Initial setup may take up to 30 seconds",
"title": "MySubaru Connected Services configuration"
"title": "Subaru Starlink configuration"
}
}
},
@@ -95,7 +95,7 @@
"update_enabled": "Enable vehicle polling"
},
"description": "When enabled, vehicle polling will send a remote command to your vehicle every 2 hours to obtain new sensor data. Without vehicle polling, new sensor data is only received when the vehicle automatically pushes data (normally after engine shutdown).",
"title": "MySubaru Connected Services options"
"title": "Subaru Starlink options"
}
}
},

View File

@@ -2,7 +2,6 @@
from homeassistant.const import STATE_UNAVAILABLE, STATE_UNKNOWN
from homeassistant.core import HomeAssistant, State
from homeassistant.helpers.automation import DomainSpec
from homeassistant.helpers.trigger import (
ENTITY_STATE_TRIGGER_SCHEMA,
EntityTriggerBase,
@@ -15,7 +14,7 @@ from .const import DOMAIN
class TextChangedTrigger(EntityTriggerBase):
"""Trigger for text entity when its content changes."""
_domain_specs = {DOMAIN: DomainSpec()}
_domains = {DOMAIN}
_schema = ENTITY_STATE_TRIGGER_SCHEMA
def is_valid_state(self, state: State) -> bool:

View File

@@ -5,11 +5,11 @@ from __future__ import annotations
from dataclasses import dataclass
from tuya_device_handlers.device_wrapper.base import DeviceWrapper
from tuya_device_handlers.device_wrapper.binary_sensor import (
DPCodeBitmapBitWrapper,
DPCodeInSetWrapper,
from tuya_device_handlers.device_wrapper.binary_sensor import DPCodeBitmapBitWrapper
from tuya_device_handlers.device_wrapper.common import (
DPCodeBooleanWrapper,
DPCodeWrapper,
)
from tuya_device_handlers.device_wrapper.common import DPCodeBooleanWrapper
from tuya_sharing import CustomerDevice, Manager
from homeassistant.components.binary_sensor import (
@@ -376,10 +376,29 @@ BINARY_SENSORS: dict[DeviceCategory, tuple[TuyaBinarySensorEntityDescription, ..
}
class _CustomDPCodeWrapper(DPCodeWrapper[bool]):
"""Custom DPCode Wrapper to check for values in a set."""
_valid_values: set[bool | float | int | str]
def __init__(
self, dpcode: str, valid_values: set[bool | float | int | str]
) -> None:
"""Init CustomDPCodeBooleanWrapper."""
super().__init__(dpcode)
self._valid_values = valid_values
def read_device_status(self, device: CustomerDevice) -> bool | None:
"""Read the device value for the dpcode."""
if (raw_value := device.status.get(self.dpcode)) is None:
return None
return raw_value in self._valid_values
def _get_dpcode_wrapper(
device: CustomerDevice,
description: TuyaBinarySensorEntityDescription,
) -> DeviceWrapper[bool] | None:
) -> DPCodeWrapper | None:
"""Get DPCode wrapper for an entity description."""
dpcode = description.dpcode or description.key
if description.bitmap_key is not None:
@@ -393,7 +412,7 @@ def _get_dpcode_wrapper(
# Legacy / compatibility
if dpcode not in device.status:
return None
return DPCodeInSetWrapper(
return _CustomDPCodeWrapper(
dpcode,
description.on_value
if isinstance(description.on_value, set)

View File

@@ -205,7 +205,7 @@ class _PresetWrapper(DPCodeEnumWrapper):
def read_device_status(self, device: CustomerDevice) -> str | None:
"""Read the device status."""
if (raw := self._read_dpcode_value(device)) not in self.options:
if (raw := self._read_dpcode_value(device)) in TUYA_HVAC_TO_HA:
return None
return raw

View File

@@ -44,7 +44,7 @@
"iot_class": "cloud_push",
"loggers": ["tuya_sharing"],
"requirements": [
"tuya-device-handlers==0.0.12",
"tuya-device-handlers==0.0.11",
"tuya-device-sharing-sdk==0.2.8"
]
}

View File

@@ -490,9 +490,9 @@
}
},
"relay_status": {
"name": "Power-on behavior",
"name": "Power on behavior",
"state": {
"last": "Previous state",
"last": "Remember last state",
"memory": "[%key:component::tuya::entity::select::relay_status::state::last%]",
"off": "[%key:common::state::off%]",
"on": "[%key:common::state::on%]",

View File

@@ -76,7 +76,9 @@ async def async_remove_config_entry_device(
"""Remove config entry from a device."""
hub = config_entry.runtime_data
return not any(
identifier in hub.api.devices for _, identifier in device_entry.connections
identifier
for _, identifier in device_entry.connections
if identifier in hub.api.clients or identifier in hub.api.devices
)

View File

@@ -1,54 +0,0 @@
"""The UniFi Access integration."""
from __future__ import annotations
from unifi_access_api import ApiAuthError, ApiConnectionError, UnifiAccessApiClient
from homeassistant.const import CONF_API_TOKEN, CONF_HOST, CONF_VERIFY_SSL, Platform
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from .coordinator import UnifiAccessConfigEntry, UnifiAccessCoordinator
PLATFORMS: list[Platform] = [Platform.BUTTON]
async def async_setup_entry(hass: HomeAssistant, entry: UnifiAccessConfigEntry) -> bool:
"""Set up UniFi Access from a config entry."""
session = async_get_clientsession(hass, verify_ssl=entry.data[CONF_VERIFY_SSL])
client = UnifiAccessApiClient(
host=entry.data[CONF_HOST],
api_token=entry.data[CONF_API_TOKEN],
session=session,
verify_ssl=entry.data[CONF_VERIFY_SSL],
)
try:
await client.authenticate()
except ApiAuthError as err:
raise ConfigEntryNotReady(
f"Authentication failed for UniFi Access at {entry.data[CONF_HOST]}"
) from err
except ApiConnectionError as err:
raise ConfigEntryNotReady(
f"Unable to connect to UniFi Access at {entry.data[CONF_HOST]}"
) from err
coordinator = UnifiAccessCoordinator(hass, entry, client)
await coordinator.async_config_entry_first_refresh()
entry.runtime_data = coordinator
entry.async_on_unload(client.close)
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
return True
async def async_unload_entry(
hass: HomeAssistant, entry: UnifiAccessConfigEntry
) -> bool:
"""Unload a config entry."""
return await hass.config_entries.async_unload_platforms(entry, PLATFORMS)

View File

@@ -1,52 +0,0 @@
"""Button platform for the UniFi Access integration."""
from __future__ import annotations
from unifi_access_api import ApiError, Door
from homeassistant.components.button import ButtonEntity
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from .const import DOMAIN
from .coordinator import UnifiAccessConfigEntry, UnifiAccessCoordinator
from .entity import UnifiAccessEntity
PARALLEL_UPDATES = 1
async def async_setup_entry(
hass: HomeAssistant,
entry: UnifiAccessConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up UniFi Access button entities."""
coordinator = entry.runtime_data
async_add_entities(
UnifiAccessUnlockButton(coordinator, door) for door in coordinator.data.values()
)
class UnifiAccessUnlockButton(UnifiAccessEntity, ButtonEntity):
"""Representation of a UniFi Access door unlock button."""
_attr_translation_key = "unlock"
def __init__(
self,
coordinator: UnifiAccessCoordinator,
door: Door,
) -> None:
"""Initialize the button entity."""
super().__init__(coordinator, door, "unlock")
async def async_press(self) -> None:
"""Unlock the door."""
try:
await self.coordinator.client.unlock_door(self._door_id)
except ApiError as err:
raise HomeAssistantError(
translation_domain=DOMAIN,
translation_key="unlock_failed",
) from err

View File

@@ -1,68 +0,0 @@
"""Config flow for UniFi Access integration."""
from __future__ import annotations
import logging
from typing import Any
from unifi_access_api import ApiAuthError, ApiConnectionError, UnifiAccessApiClient
import voluptuous as vol
from homeassistant.config_entries import ConfigFlow, ConfigFlowResult
from homeassistant.const import CONF_API_TOKEN, CONF_HOST, CONF_VERIFY_SSL
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from .const import DOMAIN
_LOGGER = logging.getLogger(__name__)
class UnifiAccessConfigFlow(ConfigFlow, domain=DOMAIN):
"""Handle a config flow for UniFi Access."""
VERSION = 1
MINOR_VERSION = 1
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle the initial step."""
errors: dict[str, str] = {}
if user_input is not None:
session = async_get_clientsession(
self.hass, verify_ssl=user_input[CONF_VERIFY_SSL]
)
client = UnifiAccessApiClient(
host=user_input[CONF_HOST],
api_token=user_input[CONF_API_TOKEN],
session=session,
verify_ssl=user_input[CONF_VERIFY_SSL],
)
try:
await client.authenticate()
except ApiAuthError:
errors["base"] = "invalid_auth"
except ApiConnectionError:
errors["base"] = "cannot_connect"
except Exception:
_LOGGER.exception("Unexpected exception")
errors["base"] = "unknown"
else:
self._async_abort_entries_match({CONF_HOST: user_input[CONF_HOST]})
return self.async_create_entry(
title="UniFi Access",
data=user_input,
)
return self.async_show_form(
step_id="user",
data_schema=vol.Schema(
{
vol.Required(CONF_HOST): str,
vol.Required(CONF_API_TOKEN): str,
vol.Required(CONF_VERIFY_SSL, default=False): bool,
}
),
errors=errors,
)

View File

@@ -1,3 +0,0 @@
"""Constants for the UniFi Access integration."""
DOMAIN = "unifi_access"

View File

@@ -1,128 +0,0 @@
"""Data update coordinator for the UniFi Access integration."""
from __future__ import annotations
import asyncio
import logging
from typing import cast
from unifi_access_api import (
ApiAuthError,
ApiConnectionError,
ApiError,
Door,
UnifiAccessApiClient,
WsMessageHandler,
)
from unifi_access_api.models.websocket import (
LocationUpdateState,
LocationUpdateV2,
V2LocationState,
V2LocationUpdate,
WebsocketMessage,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import DOMAIN
_LOGGER = logging.getLogger(__name__)
type UnifiAccessConfigEntry = ConfigEntry[UnifiAccessCoordinator]
class UnifiAccessCoordinator(DataUpdateCoordinator[dict[str, Door]]):
"""Coordinator for fetching UniFi Access door data."""
config_entry: UnifiAccessConfigEntry
def __init__(
self,
hass: HomeAssistant,
entry: UnifiAccessConfigEntry,
client: UnifiAccessApiClient,
) -> None:
"""Initialize the coordinator."""
super().__init__(
hass,
_LOGGER,
config_entry=entry,
name=DOMAIN,
update_interval=None,
)
self.client = client
async def _async_setup(self) -> None:
"""Set up the WebSocket connection for push updates."""
handlers: dict[str, WsMessageHandler] = {
"access.data.device.location_update_v2": self._handle_location_update,
"access.data.v2.location.update": self._handle_v2_location_update,
}
self.client.start_websocket(
handlers,
on_connect=self._on_ws_connect,
on_disconnect=self._on_ws_disconnect,
)
async def _async_update_data(self) -> dict[str, Door]:
"""Fetch all doors from the API."""
try:
async with asyncio.timeout(10):
doors = await self.client.get_doors()
except ApiAuthError as err:
raise UpdateFailed(f"Authentication failed: {err}") from err
except ApiConnectionError as err:
raise UpdateFailed(f"Error connecting to API: {err}") from err
except ApiError as err:
raise UpdateFailed(f"Error communicating with API: {err}") from err
return {door.id: door for door in doors}
def _on_ws_connect(self) -> None:
"""Handle WebSocket connection established."""
_LOGGER.debug("WebSocket connected to UniFi Access")
if not self.last_update_success:
self.config_entry.async_create_background_task(
self.hass,
self.async_request_refresh(),
"unifi_access_reconnect_refresh",
)
def _on_ws_disconnect(self) -> None:
"""Handle WebSocket disconnection."""
_LOGGER.debug("WebSocket disconnected from UniFi Access")
self.async_set_update_error(
UpdateFailed("WebSocket disconnected from UniFi Access")
)
async def _handle_location_update(self, msg: WebsocketMessage) -> None:
"""Handle location_update_v2 messages."""
update = cast(LocationUpdateV2, msg)
self._process_door_update(update.data.id, update.data.state)
async def _handle_v2_location_update(self, msg: WebsocketMessage) -> None:
"""Handle V2 location update messages."""
update = cast(V2LocationUpdate, msg)
self._process_door_update(update.data.id, update.data.state)
def _process_door_update(
self, door_id: str, ws_state: LocationUpdateState | V2LocationState | None
) -> None:
"""Process a door state update from WebSocket."""
if self.data is None or door_id not in self.data:
return
if ws_state is None:
return
current_door = self.data[door_id]
updates: dict[str, object] = {}
if ws_state.dps is not None:
updates["door_position_status"] = ws_state.dps
if ws_state.lock == "locked":
updates["door_lock_relay_status"] = "lock"
elif ws_state.lock == "unlocked":
updates["door_lock_relay_status"] = "unlock"
updated_door = current_door.with_updates(**updates)
self.async_set_updated_data({**self.data, door_id: updated_door})

View File

@@ -1,43 +0,0 @@
"""Base entity for the UniFi Access integration."""
from __future__ import annotations
from unifi_access_api import Door
from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .const import DOMAIN
from .coordinator import UnifiAccessCoordinator
class UnifiAccessEntity(CoordinatorEntity[UnifiAccessCoordinator]):
"""Base entity for UniFi Access doors."""
_attr_has_entity_name = True
def __init__(
self,
coordinator: UnifiAccessCoordinator,
door: Door,
key: str,
) -> None:
"""Initialize the entity."""
super().__init__(coordinator)
self._door_id = door.id
self._attr_unique_id = f"{door.id}-{key}"
self._attr_device_info = DeviceInfo(
identifiers={(DOMAIN, door.id)},
name=door.name,
manufacturer="Ubiquiti",
)
@property
def available(self) -> bool:
"""Return True if entity is available."""
return super().available and self._door_id in self.coordinator.data
@property
def _door(self) -> Door:
"""Return the current door state from coordinator data."""
return self.coordinator.data[self._door_id]

View File

@@ -1,9 +0,0 @@
{
"entity": {
"button": {
"unlock": {
"default": "mdi:lock-open"
}
}
}
}

View File

@@ -1,12 +0,0 @@
{
"domain": "unifi_access",
"name": "UniFi Access",
"codeowners": ["@imhotep", "@RaHehl"],
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/unifi_access",
"integration_type": "hub",
"iot_class": "local_push",
"loggers": ["unifi_access_api"],
"quality_scale": "bronze",
"requirements": ["py-unifi-access==1.0.0"]
}

View File

@@ -1,66 +0,0 @@
rules:
# Bronze
action-setup:
status: exempt
comment: Integration does not register custom actions.
appropriate-polling:
status: exempt
comment: Integration uses WebSocket push updates, no polling.
brands: done
common-modules: done
config-flow-test-coverage: done
config-flow: done
dependency-transparency: done
docs-actions:
status: exempt
comment: Integration does not register custom actions.
docs-high-level-description: done
docs-installation-instructions: done
docs-removal-instructions: done
entity-event-setup: done
entity-unique-id: done
has-entity-name: done
runtime-data: done
test-before-configure: done
test-before-setup: done
unique-config-entry: done
# Silver
action-exceptions: done
config-entry-unloading: done
docs-configuration-parameters: todo
docs-installation-parameters: todo
entity-unavailable: todo
integration-owner: done
log-when-unavailable: todo
parallel-updates: done
reauthentication-flow: todo
test-coverage: done
# Gold
devices: done
diagnostics: todo
discovery-update-info: todo
discovery: todo
docs-data-update: todo
docs-examples: todo
docs-known-limitations: todo
docs-supported-devices: todo
docs-supported-functions: todo
docs-troubleshooting: todo
docs-use-cases: todo
dynamic-devices: todo
entity-category: todo
entity-device-class: todo
entity-disabled-by-default: todo
entity-translations: todo
exception-translations: done
icon-translations: todo
reconfiguration-flow: todo
repair-issues: todo
stale-devices: todo
# Platinum
async-dependency: done
inject-websession: done
strict-typing: todo

View File

@@ -1,38 +0,0 @@
{
"config": {
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]"
},
"error": {
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
"invalid_auth": "[%key:common::config_flow::error::invalid_auth%]",
"unknown": "[%key:common::config_flow::error::unknown%]"
},
"step": {
"user": {
"data": {
"api_token": "[%key:common::config_flow::data::api_token%]",
"host": "[%key:common::config_flow::data::host%]",
"verify_ssl": "[%key:common::config_flow::data::verify_ssl%]"
},
"data_description": {
"api_token": "API token generated in the UniFi Access settings.",
"host": "Hostname or IP address of the UniFi Access controller.",
"verify_ssl": "Verify the SSL certificate of the controller."
}
}
}
},
"entity": {
"button": {
"unlock": {
"name": "Unlock"
}
}
},
"exceptions": {
"unlock_failed": {
"message": "Failed to unlock the door."
}
}
}

View File

@@ -398,7 +398,7 @@ SENSOR_DESCRIPTIONS = {
Keys.WARNING: VictronBLESensorEntityDescription(
key=Keys.WARNING,
device_class=SensorDeviceClass.ENUM,
translation_key="warning",
translation_key="alarm",
options=ALARM_OPTIONS,
),
Keys.YIELD_TODAY: VictronBLESensorEntityDescription(

View File

@@ -248,24 +248,7 @@
"name": "[%key:component::victron_ble::common::starter_voltage%]"
},
"warning": {
"name": "Warning",
"state": {
"bms_lockout": "[%key:component::victron_ble::entity::sensor::alarm::state::bms_lockout%]",
"dc_ripple": "[%key:component::victron_ble::entity::sensor::alarm::state::dc_ripple%]",
"high_starter_voltage": "[%key:component::victron_ble::entity::sensor::alarm::state::high_starter_voltage%]",
"high_temperature": "[%key:component::victron_ble::entity::sensor::alarm::state::high_temperature%]",
"high_v_ac_out": "[%key:component::victron_ble::entity::sensor::alarm::state::high_v_ac_out%]",
"high_voltage": "[%key:component::victron_ble::entity::sensor::alarm::state::high_voltage%]",
"low_soc": "[%key:component::victron_ble::entity::sensor::alarm::state::low_soc%]",
"low_starter_voltage": "[%key:component::victron_ble::entity::sensor::alarm::state::low_starter_voltage%]",
"low_temperature": "[%key:component::victron_ble::entity::sensor::alarm::state::low_temperature%]",
"low_v_ac_out": "[%key:component::victron_ble::entity::sensor::alarm::state::low_v_ac_out%]",
"low_voltage": "[%key:component::victron_ble::entity::sensor::alarm::state::low_voltage%]",
"mid_voltage": "[%key:component::victron_ble::common::midpoint_voltage%]",
"no_alarm": "[%key:component::victron_ble::entity::sensor::alarm::state::no_alarm%]",
"overload": "[%key:component::victron_ble::entity::sensor::alarm::state::overload%]",
"short_circuit": "[%key:component::victron_ble::entity::sensor::alarm::state::short_circuit%]"
}
"name": "Warning"
},
"yield_today": {
"name": "Yield today"

View File

@@ -20,8 +20,14 @@ DEVICE_CLASSES_WINDOW: dict[str, str] = {
TRIGGERS: dict[str, type[Trigger]] = {
"opened": make_cover_opened_trigger(device_classes=DEVICE_CLASSES_WINDOW),
"closed": make_cover_closed_trigger(device_classes=DEVICE_CLASSES_WINDOW),
"opened": make_cover_opened_trigger(
device_classes=DEVICE_CLASSES_WINDOW,
domains={BINARY_SENSOR_DOMAIN, COVER_DOMAIN},
),
"closed": make_cover_closed_trigger(
device_classes=DEVICE_CLASSES_WINDOW,
domains={BINARY_SENSOR_DOMAIN, COVER_DOMAIN},
),
}

View File

@@ -103,12 +103,6 @@ ZEROCONF_PROPERTIES_SCHEMA = vol.Schema(
extra=vol.ALLOW_EXTRA,
)
# USB devices to ignore in serial port selection (non-Zigbee devices)
# Format: (manufacturer, description)
IGNORED_USB_DEVICES = {
("Nabu Casa", "ZWA-2"),
}
class OptionsMigrationIntent(StrEnum):
"""Zigbee options flow intents."""
@@ -182,12 +176,7 @@ async def list_serial_ports(hass: HomeAssistant) -> list[USBDevice]:
ports.append(addon_port)
# Filter out ignored USB devices
return [
port
for port in ports
if (port.manufacturer, port.description) not in IGNORED_USB_DEVICES
]
return ports
class BaseZhaFlow(ConfigEntryBaseFlow):

View File

@@ -23,7 +23,7 @@
"universal_silabs_flasher",
"serialx"
],
"requirements": ["zha==1.0.2", "serialx==0.6.2"],
"requirements": ["zha==1.0.1", "serialx==0.6.2"],
"usb": [
{
"description": "*2652*",

View File

@@ -961,7 +961,8 @@ class HomeAssistant:
async def async_block_till_done(self, wait_background_tasks: bool = False) -> None:
"""Block until all pending work is done."""
# To flush out any call_soon_threadsafe
# Sleep twice to flush out any call_soon_threadsafe
await asyncio.sleep(0)
await asyncio.sleep(0)
start_time: float | None = None
current_task = asyncio.current_task()

View File

@@ -82,7 +82,6 @@ FLOWS = {
"aurora_abb_powerone",
"aussie_broadband",
"autarco",
"autoskope",
"awair",
"aws_s3",
"axis",
@@ -751,7 +750,6 @@ FLOWS = {
"uhoo",
"ukraine_alarm",
"unifi",
"unifi_access",
"unifiprotect",
"upb",
"upcloud",

View File

@@ -647,12 +647,6 @@
"config_flow": true,
"iot_class": "cloud_polling"
},
"autoskope": {
"name": "Autoskope",
"integration_type": "hub",
"config_flow": true,
"iot_class": "cloud_polling"
},
"avion": {
"name": "Avi-on",
"integration_type": "hub",
@@ -7379,12 +7373,6 @@
"config_flow": true,
"iot_class": "cloud_polling"
},
"unifi_access": {
"name": "UniFi Access",
"integration_type": "hub",
"config_flow": true,
"iot_class": "local_push"
},
"universal": {
"name": "Universal media player",
"integration_type": "hub",

View File

@@ -1,68 +1,14 @@
"""Helpers for automation."""
from collections.abc import Callable, Mapping
from dataclasses import dataclass
from enum import Enum
from typing import Any
import voluptuous as vol
from homeassistant.const import CONF_OPTIONS
from homeassistant.core import HomeAssistant, split_entity_id
from .entity import get_device_class_or_undefined
from .typing import ConfigType
class AnyDeviceClassType(Enum):
"""Singleton type for matching any device class."""
_singleton = 0
ANY_DEVICE_CLASS = AnyDeviceClassType._singleton # noqa: SLF001
@dataclass(frozen=True, slots=True)
class DomainSpec:
"""Describes how to match and extract a value from an entity.
Used by triggers and conditions.
"""
device_class: str | None | AnyDeviceClassType = ANY_DEVICE_CLASS
value_source: str | None = None
"""Attribute name to extract the value from, or None for state.state."""
@dataclass(frozen=True, slots=True)
class NumericalDomainSpec(DomainSpec):
"""DomainSpec with an optional value converter for numerical triggers."""
value_converter: Callable[[Any], float] | None = None
"""Optional converter for numerical values (e.g. uint8 → percentage)."""
def filter_by_domain_specs(
hass: HomeAssistant,
domain_specs: Mapping[str, DomainSpec],
entities: set[str],
) -> set[str]:
"""Filter entities matching any of the domain specs."""
result: set[str] = set()
for entity_id in entities:
if not (domain_spec := domain_specs.get(split_entity_id(entity_id)[0])):
continue
if (
domain_spec.device_class is not ANY_DEVICE_CLASS
and get_device_class_or_undefined(hass, entity_id)
!= domain_spec.device_class
):
continue
result.add(entity_id)
return result
def get_absolute_description_key(domain: str, key: str) -> str:
"""Return the absolute description key."""
if not key.startswith("_"):

View File

@@ -4,7 +4,7 @@ from __future__ import annotations
import abc
from collections import deque
from collections.abc import Callable, Container, Coroutine, Generator, Iterable, Mapping
from collections.abc import Callable, Container, Coroutine, Generator, Iterable
from contextlib import contextmanager
from dataclasses import dataclass
from datetime import datetime, time as dt_time, timedelta
@@ -54,7 +54,7 @@ from homeassistant.const import (
STATE_UNKNOWN,
WEEKDAYS,
)
from homeassistant.core import HomeAssistant, State, callback
from homeassistant.core import HomeAssistant, State, callback, split_entity_id
from homeassistant.exceptions import (
ConditionError,
ConditionErrorContainer,
@@ -76,8 +76,6 @@ from homeassistant.util.yaml import load_yaml_dict
from . import config_validation as cv, entity_registry as er, selector
from .automation import (
DomainSpec,
filter_by_domain_specs,
get_absolute_description_key,
get_relative_description_key,
move_options_fields_to_top_level,
@@ -334,10 +332,10 @@ ENTITY_STATE_CONDITION_SCHEMA_ANY_ALL = vol.Schema(
)
class EntityConditionBase[DomainSpecT: DomainSpec = DomainSpec](Condition):
class EntityConditionBase(Condition):
"""Base class for entity conditions."""
_domain_specs: Mapping[str, DomainSpecT]
_domain: str
_schema: vol.Schema = ENTITY_STATE_CONDITION_SCHEMA_ANY_ALL
@override
@@ -358,8 +356,12 @@ class EntityConditionBase[DomainSpecT: DomainSpec = DomainSpec](Condition):
self._behavior = config.options[ATTR_BEHAVIOR]
def entity_filter(self, entities: set[str]) -> set[str]:
"""Filter entities matching any of the domain specs."""
return filter_by_domain_specs(self._hass, self._domain_specs, entities)
"""Filter entities of this domain."""
return {
entity_id
for entity_id in entities
if split_entity_id(entity_id)[0] == self._domain
}
@abc.abstractmethod
def is_valid_state(self, entity_state: State) -> bool:
@@ -426,7 +428,7 @@ def make_entity_state_condition(
class CustomCondition(EntityStateConditionBase):
"""Condition for entity state."""
_domain_specs = {domain: DomainSpec()}
_domain = domain
_states = states_set
return CustomCondition
@@ -456,7 +458,7 @@ def make_entity_state_attribute_condition(
class CustomCondition(EntityStateAttributeConditionBase):
"""Condition for entity attribute."""
_domain_specs = {domain: DomainSpec()}
_domain = domain
_attribute = attribute
_attribute_states = attribute_states_set

View File

@@ -169,16 +169,6 @@ def get_device_class(hass: HomeAssistant, entity_id: str) -> str | None:
return entry.device_class or entry.original_device_class
def get_device_class_or_undefined(
hass: HomeAssistant, entity_id: str
) -> str | None | UndefinedType:
"""Get the device class of an entity or UNDEFINED if not found."""
try:
return get_device_class(hass, entity_id)
except HomeAssistantError:
return UNDEFINED
def get_supported_features(hass: HomeAssistant, entity_id: str) -> int:
"""Get supported features for an entity.

View File

@@ -5,7 +5,7 @@ from __future__ import annotations
import abc
import asyncio
from collections import defaultdict
from collections.abc import Callable, Coroutine, Iterable, Mapping
from collections.abc import Callable, Coroutine, Iterable
from dataclasses import dataclass, field
from enum import StrEnum
import functools
@@ -69,13 +69,11 @@ from homeassistant.util.yaml import load_yaml_dict
from . import config_validation as cv, selector
from .automation import (
DomainSpec,
NumericalDomainSpec,
filter_by_domain_specs,
get_absolute_description_key,
get_relative_description_key,
move_options_fields_to_top_level,
)
from .entity import get_device_class
from .integration_platform import async_process_integration_platforms
from .selector import TargetSelector
from .target import (
@@ -83,7 +81,7 @@ from .target import (
async_track_target_selector_state_change_event,
)
from .template import Template
from .typing import ConfigType, TemplateVarsType
from .typing import UNDEFINED, ConfigType, TemplateVarsType, UndefinedType
_LOGGER = logging.getLogger(__name__)
@@ -336,10 +334,20 @@ ENTITY_STATE_TRIGGER_SCHEMA_FIRST_LAST = ENTITY_STATE_TRIGGER_SCHEMA.extend(
)
class EntityTriggerBase[DomainSpecT: DomainSpec = DomainSpec](Trigger):
def get_device_class_or_undefined(
hass: HomeAssistant, entity_id: str
) -> str | None | UndefinedType:
"""Get the device class of an entity or UNDEFINED if not found."""
try:
return get_device_class(hass, entity_id)
except HomeAssistantError:
return UNDEFINED
class EntityTriggerBase(Trigger):
"""Trigger for entity state changes."""
_domain_specs: Mapping[str, DomainSpecT]
_domains: set[str]
_schema: vol.Schema = ENTITY_STATE_TRIGGER_SCHEMA_FIRST_LAST
@override
@@ -358,10 +366,6 @@ class EntityTriggerBase[DomainSpecT: DomainSpec = DomainSpec](Trigger):
self._options = config.options or {}
self._target = config.target
def entity_filter(self, entities: set[str]) -> set[str]:
"""Filter entities matching any of the domain specs."""
return filter_by_domain_specs(self._hass, self._domain_specs, entities)
def is_valid_transition(self, from_state: State, to_state: State) -> bool:
"""Check if the origin state is valid and the state has changed."""
if from_state.state in (STATE_UNAVAILABLE, STATE_UNKNOWN):
@@ -392,6 +396,14 @@ class EntityTriggerBase[DomainSpecT: DomainSpec = DomainSpec](Trigger):
== 1
)
def entity_filter(self, entities: set[str]) -> set[str]:
"""Filter entities of these domains."""
return {
entity_id
for entity_id in entities
if split_entity_id(entity_id)[0] in self._domains
}
@override
async def async_attach_runner(
self, run_action: TriggerActionRunner
@@ -599,22 +611,19 @@ def _get_numerical_value(
return entity_or_float
class EntityNumericalStateBase(EntityTriggerBase[NumericalDomainSpec]):
class EntityNumericalStateBase(EntityTriggerBase):
"""Base class for numerical state and state attribute triggers."""
_attributes: dict[str, str | None]
_converter: Callable[[Any], float] = float
def _get_tracked_value(self, state: State) -> Any:
"""Get the tracked numerical value from a state."""
domain_spec = self._domain_specs[split_entity_id(state.entity_id)[0]]
if domain_spec.value_source is None:
domain = split_entity_id(state.entity_id)[0]
source = self._attributes[domain]
if source is None:
return state.state
return state.attributes.get(domain_spec.value_source)
def _get_converter(self, state: State) -> Callable[[Any], float]:
"""Get the value converter for an entity."""
domain_spec = self._domain_specs[split_entity_id(state.entity_id)[0]]
if domain_spec.value_converter is not None:
return domain_spec.value_converter
return float
return state.attributes.get(source)
class EntityNumericalStateAttributeChangedTriggerBase(EntityNumericalStateBase):
@@ -645,7 +654,7 @@ class EntityNumericalStateAttributeChangedTriggerBase(EntityNumericalStateBase):
return False
try:
current_value = self._get_converter(state)(_attribute_value)
current_value = self._converter(_attribute_value)
except TypeError, ValueError:
# Value is not a valid number, don't trigger
return False
@@ -771,7 +780,7 @@ class EntityNumericalStateAttributeCrossedThresholdTriggerBase(
return False
try:
current_value = self._get_converter(state)(_attribute_value)
current_value = self._converter(_attribute_value)
except TypeError, ValueError:
# Value is not a valid number, don't trigger
return False
@@ -803,7 +812,7 @@ def make_entity_target_state_trigger(
class CustomTrigger(EntityTargetStateTriggerBase):
"""Trigger for entity state changes."""
_domain_specs = {domain: DomainSpec()}
_domains = {domain}
_to_states = to_states_set
return CustomTrigger
@@ -817,7 +826,7 @@ def make_entity_transition_trigger(
class CustomTrigger(EntityTransitionTriggerBase):
"""Trigger for conditional entity state changes."""
_domain_specs = {domain: DomainSpec()}
_domains = {domain}
_from_states = from_states
_to_states = to_states
@@ -832,34 +841,36 @@ def make_entity_origin_state_trigger(
class CustomTrigger(EntityOriginStateTriggerBase):
"""Trigger for entity "from state" changes."""
_domain_specs = {domain: DomainSpec()}
_domains = {domain}
_from_state = from_state
return CustomTrigger
def make_entity_numerical_state_changed_trigger(
domain_specs: Mapping[str, NumericalDomainSpec],
def make_entity_numerical_state_attribute_changed_trigger(
domains: set[str], attributes: dict[str, str | None]
) -> type[EntityNumericalStateAttributeChangedTriggerBase]:
"""Create a trigger for numerical state value change."""
"""Create a trigger for numerical state attribute change."""
class CustomTrigger(EntityNumericalStateAttributeChangedTriggerBase):
"""Trigger for numerical state value changes."""
"""Trigger for numerical state attribute changes."""
_domain_specs = domain_specs
_domains = domains
_attributes = attributes
return CustomTrigger
def make_entity_numerical_state_crossed_threshold_trigger(
domain_specs: Mapping[str, NumericalDomainSpec],
def make_entity_numerical_state_attribute_crossed_threshold_trigger(
domains: set[str], attributes: dict[str, str | None]
) -> type[EntityNumericalStateAttributeCrossedThresholdTriggerBase]:
"""Create a trigger for numerical state value crossing a threshold."""
"""Create a trigger for numerical state attribute change."""
class CustomTrigger(EntityNumericalStateAttributeCrossedThresholdTriggerBase):
"""Trigger for numerical state value crossing a threshold."""
"""Trigger for numerical state attribute changes."""
_domain_specs = domain_specs
_domains = domains
_attributes = attributes
return CustomTrigger
@@ -872,7 +883,7 @@ def make_entity_target_state_attribute_trigger(
class CustomTrigger(EntityTargetStateAttributeTriggerBase):
"""Trigger for entity state changes."""
_domain_specs = {domain: DomainSpec()}
_domains = {domain}
_attribute = attribute
_attribute_to_state = to_state

View File

@@ -4,6 +4,7 @@ aiodhcpwatcher==1.2.1
aiodiscover==2.7.1
aiodns==4.0.0
aiogithubapi==26.0.0
aiohasupervisor==0.3.3
aiohttp-asyncmdnsresolver==0.1.1
aiohttp-fast-zlib==0.3.0
aiohttp==3.13.3

View File

@@ -28,6 +28,10 @@ dependencies = [
# module level in `bootstrap.py` and its requirements thus need to be in
# requirements.txt to ensure they are always installed
"aiogithubapi==26.0.0",
# Integrations may depend on hassio integration without listing it to
# change behavior based on presence of supervisor. Deprecated with #127228
# Lib can be removed with 2025.11
"aiohasupervisor==0.3.3",
"aiohttp==3.13.3",
"aiohttp_cors==0.8.1",
"aiohttp-fast-zlib==0.3.0",

1
requirements.txt generated
View File

@@ -5,6 +5,7 @@
# Home Assistant Core
aiodns==4.0.0
aiogithubapi==26.0.0
aiohasupervisor==0.3.3
aiohttp-asyncmdnsresolver==0.1.1
aiohttp-fast-zlib==0.3.0
aiohttp==3.13.3

14
requirements_all.txt generated
View File

@@ -579,9 +579,6 @@ autarco==3.2.0
# homeassistant.components.husqvarna_automower_ble
automower-ble==0.2.8
# homeassistant.components.autoskope
autoskope_client==1.4.1
# homeassistant.components.generic
# homeassistant.components.stream
av==16.0.1
@@ -1882,9 +1879,6 @@ py-sucks==0.9.11
# homeassistant.components.synology_dsm
py-synologydsm-api==2.7.3
# homeassistant.components.unifi_access
py-unifi-access==1.0.0
# homeassistant.components.atome
pyAtome==0.1.1
@@ -2161,7 +2155,7 @@ pyicloud==2.4.1
pyinsteon==1.6.4
# homeassistant.components.intelliclima
pyintelliclima==0.3.1
pyintelliclima==0.2.2
# homeassistant.components.intesishome
pyintesishome==1.8.0
@@ -2488,7 +2482,7 @@ pysmappee==0.2.29
pysmarlaapi==1.0.2
# homeassistant.components.smartthings
pysmartthings==3.7.0
pysmartthings==3.6.0
# homeassistant.components.smarty
pysmarty2==0.10.3
@@ -3136,7 +3130,7 @@ ttls==1.8.3
ttn_client==1.2.3
# homeassistant.components.tuya
tuya-device-handlers==0.0.12
tuya-device-handlers==0.0.11
# homeassistant.components.tuya
tuya-device-sharing-sdk==0.2.8
@@ -3362,7 +3356,7 @@ zeroconf==0.148.0
zeversolar==0.3.2
# homeassistant.components.zha
zha==1.0.2
zha==1.0.1
# homeassistant.components.zhong_hong
zhong-hong-hvac==1.0.13

View File

@@ -537,9 +537,6 @@ autarco==3.2.0
# homeassistant.components.husqvarna_automower_ble
automower-ble==0.2.8
# homeassistant.components.autoskope
autoskope_client==1.4.1
# homeassistant.components.generic
# homeassistant.components.stream
av==16.0.1
@@ -1631,9 +1628,6 @@ py-sucks==0.9.11
# homeassistant.components.synology_dsm
py-synologydsm-api==2.7.3
# homeassistant.components.unifi_access
py-unifi-access==1.0.0
# homeassistant.components.hdmi_cec
pyCEC==0.5.2
@@ -1850,7 +1844,7 @@ pyicloud==2.4.1
pyinsteon==1.6.4
# homeassistant.components.intelliclima
pyintelliclima==0.3.1
pyintelliclima==0.2.2
# homeassistant.components.ipma
pyipma==3.0.9
@@ -2120,7 +2114,7 @@ pysmappee==0.2.29
pysmarlaapi==1.0.2
# homeassistant.components.smartthings
pysmartthings==3.7.0
pysmartthings==3.6.0
# homeassistant.components.smarty
pysmarty2==0.10.3
@@ -2639,7 +2633,7 @@ ttls==1.8.3
ttn_client==1.2.3
# homeassistant.components.tuya
tuya-device-handlers==0.0.12
tuya-device-handlers==0.0.11
# homeassistant.components.tuya
tuya-device-sharing-sdk==0.2.8
@@ -2835,7 +2829,7 @@ zeroconf==0.148.0
zeversolar==0.3.2
# homeassistant.components.zha
zha==1.0.2
zha==1.0.1
# homeassistant.components.zinvolt
zinvolt==0.3.0

View File

@@ -1,12 +0,0 @@
"""Tests for Autoskope integration."""
from homeassistant.core import HomeAssistant
from tests.common import MockConfigEntry
async def setup_integration(hass: HomeAssistant, config_entry: MockConfigEntry) -> None:
"""Set up the Autoskope integration."""
config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()

View File

@@ -1,69 +0,0 @@
"""Test fixtures for Autoskope integration."""
from collections.abc import Generator
from json import loads
from unittest.mock import AsyncMock, patch
from autoskope_client.models import Vehicle
import pytest
from homeassistant.components.autoskope.const import DEFAULT_HOST, DOMAIN
from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_USERNAME
from tests.common import MockConfigEntry, load_fixture
@pytest.fixture
def mock_config_entry() -> MockConfigEntry:
"""Return a mock config entry."""
return MockConfigEntry(
domain=DOMAIN,
title="Autoskope (test_user)",
data={
CONF_USERNAME: "test_user",
CONF_PASSWORD: "test_password",
CONF_HOST: DEFAULT_HOST,
},
unique_id=f"test_user@{DEFAULT_HOST}",
entry_id="01AUTOSKOPE_TEST_ENTRY",
)
@pytest.fixture
def mock_vehicles() -> list[Vehicle]:
"""Return a list of mock vehicles from fixture data."""
data = loads(load_fixture("vehicles.json", DOMAIN))
return [
Vehicle.from_api(vehicle, data["positions"]) for vehicle in data["vehicles"]
]
@pytest.fixture
def mock_autoskope_client(mock_vehicles: list[Vehicle]) -> Generator[AsyncMock]:
"""Mock the Autoskope API client."""
with (
patch(
"homeassistant.components.autoskope.AutoskopeApi",
autospec=True,
) as mock_client,
patch(
"homeassistant.components.autoskope.config_flow.AutoskopeApi",
new=mock_client,
),
):
client = mock_client.return_value
client.connect.return_value = None
client.get_vehicles.return_value = mock_vehicles
client.__aenter__.return_value = client
client.__aexit__.return_value = None
yield client
@pytest.fixture
def mock_setup_entry() -> Generator[AsyncMock]:
"""Override async_setup_entry."""
with patch(
"homeassistant.components.autoskope.async_setup_entry",
return_value=True,
) as mock_setup:
yield mock_setup

View File

@@ -1,33 +0,0 @@
{
"vehicles": [
{
"id": "12345",
"name": "Test Vehicle",
"ex_pow": 12.5,
"bat_pow": 3.7,
"hdop": 1.2,
"support_infos": {
"imei": "123456789012345"
},
"model": "Autoskope"
}
],
"positions": {
"type": "FeatureCollection",
"features": [
{
"type": "Feature",
"geometry": {
"type": "Point",
"coordinates": [8.6821267, 50.1109221]
},
"properties": {
"carid": "12345",
"s": 0,
"dt": "2025-05-28T10:00:00Z",
"park": false
}
}
]
}
}

View File

@@ -1,55 +0,0 @@
# serializer version: 1
# name: test_all_entities[device_tracker.test_vehicle-entry]
EntityRegistryEntrySnapshot({
'aliases': set({
}),
'area_id': None,
'capabilities': None,
'config_entry_id': <ANY>,
'config_subentry_id': <ANY>,
'device_class': None,
'device_id': <ANY>,
'disabled_by': None,
'domain': 'device_tracker',
'entity_category': <EntityCategory.DIAGNOSTIC: 'diagnostic'>,
'entity_id': 'device_tracker.test_vehicle',
'has_entity_name': True,
'hidden_by': None,
'icon': None,
'id': <ANY>,
'labels': set({
}),
'name': None,
'object_id_base': None,
'options': dict({
}),
'original_device_class': None,
'original_icon': 'mdi:car',
'original_name': None,
'platform': 'autoskope',
'previous_unique_id': None,
'suggested_object_id': None,
'supported_features': 0,
'translation_key': None,
'unique_id': '12345',
'unit_of_measurement': None,
})
# ---
# name: test_all_entities[device_tracker.test_vehicle-state]
StateSnapshot({
'attributes': ReadOnlyDict({
'friendly_name': 'Test Vehicle',
'gps_accuracy': 6.0,
'icon': 'mdi:car',
'latitude': 50.1109221,
'longitude': 8.6821267,
'source_type': <SourceType.GPS: 'gps'>,
}),
'context': <ANY>,
'entity_id': 'device_tracker.test_vehicle',
'last_changed': <ANY>,
'last_reported': <ANY>,
'last_updated': <ANY>,
'state': 'not_home',
})
# ---

View File

@@ -1,167 +0,0 @@
"""Test Autoskope config flow."""
from unittest.mock import AsyncMock
from autoskope_client.models import CannotConnect, InvalidAuth
import pytest
from homeassistant.components.autoskope.const import (
DEFAULT_HOST,
DOMAIN,
SECTION_ADVANCED_SETTINGS,
)
from homeassistant.config_entries import SOURCE_USER
from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_USERNAME
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResultType
from tests.common import MockConfigEntry
USER_INPUT = {
CONF_USERNAME: "test_user",
CONF_PASSWORD: "test_password",
SECTION_ADVANCED_SETTINGS: {
CONF_HOST: DEFAULT_HOST,
},
}
async def test_full_flow(
hass: HomeAssistant,
mock_autoskope_client: AsyncMock,
mock_setup_entry: AsyncMock,
) -> None:
"""Test full user config flow from form to entry creation."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result["type"] is FlowResultType.FORM
assert result["errors"] == {}
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
USER_INPUT,
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == "Autoskope (test_user)"
assert result["data"] == {
CONF_USERNAME: "test_user",
CONF_PASSWORD: "test_password",
CONF_HOST: DEFAULT_HOST,
}
assert result["result"].unique_id == f"test_user@{DEFAULT_HOST}"
@pytest.mark.parametrize(
("exception", "error"),
[
(InvalidAuth("Invalid credentials"), "invalid_auth"),
(CannotConnect("Connection failed"), "cannot_connect"),
],
)
async def test_flow_errors(
hass: HomeAssistant,
mock_autoskope_client: AsyncMock,
mock_setup_entry: AsyncMock,
exception: Exception,
error: str,
) -> None:
"""Test config flow error handling with recovery."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
mock_autoskope_client.__aenter__.side_effect = exception
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
USER_INPUT,
)
assert result["type"] is FlowResultType.FORM
assert result["errors"] == {"base": error}
# Recovery: clear the error and retry
mock_autoskope_client.__aenter__.side_effect = None
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
USER_INPUT,
)
assert result["type"] is FlowResultType.CREATE_ENTRY
async def test_flow_invalid_url(
hass: HomeAssistant,
mock_autoskope_client: AsyncMock,
mock_setup_entry: AsyncMock,
) -> None:
"""Test config flow rejects invalid URL with recovery."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_USERNAME: "test_user",
CONF_PASSWORD: "test_password",
SECTION_ADVANCED_SETTINGS: {
CONF_HOST: "not-a-valid-url",
},
},
)
assert result["type"] is FlowResultType.FORM
assert result["errors"] == {"base": "invalid_url"}
# Recovery: provide a valid URL
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
USER_INPUT,
)
assert result["type"] is FlowResultType.CREATE_ENTRY
async def test_already_configured(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
mock_autoskope_client: AsyncMock,
) -> None:
"""Test aborting if already configured."""
mock_config_entry.add_to_hass(hass)
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
USER_INPUT,
)
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "already_configured"
async def test_custom_host(
hass: HomeAssistant,
mock_autoskope_client: AsyncMock,
mock_setup_entry: AsyncMock,
) -> None:
"""Test config flow with a custom white-label host."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_USERNAME: "test_user",
CONF_PASSWORD: "test_password",
SECTION_ADVANCED_SETTINGS: {
CONF_HOST: "https://custom.autoskope.server",
},
},
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["data"][CONF_HOST] == "https://custom.autoskope.server"
assert result["result"].unique_id == "test_user@https://custom.autoskope.server"

View File

@@ -1,232 +0,0 @@
"""Test Autoskope device tracker."""
from unittest.mock import AsyncMock
from autoskope_client.models import CannotConnect, InvalidAuth, Vehicle, VehiclePosition
from freezegun.api import FrozenDateTimeFactory
import pytest
from syrupy.assertion import SnapshotAssertion
from homeassistant.components.autoskope.const import DOMAIN, UPDATE_INTERVAL
from homeassistant.const import STATE_UNAVAILABLE
from homeassistant.core import HomeAssistant
from homeassistant.helpers import device_registry as dr, entity_registry as er
from . import setup_integration
from tests.common import MockConfigEntry, async_fire_time_changed, snapshot_platform
async def test_all_entities(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
mock_autoskope_client: AsyncMock,
entity_registry: er.EntityRegistry,
snapshot: SnapshotAssertion,
) -> None:
"""Test all entities with snapshot."""
await setup_integration(hass, mock_config_entry)
await snapshot_platform(hass, entity_registry, snapshot, mock_config_entry.entry_id)
@pytest.mark.parametrize(
("speed", "park_mode", "has_position", "expected_icon"),
[
(50, False, True, "mdi:car-arrow-right"),
(0, True, True, "mdi:car-brake-parking"),
(2, False, True, "mdi:car"),
(0, False, False, "mdi:car-clock"),
],
ids=["moving", "parked", "idle", "no_position"],
)
async def test_vehicle_icons(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
mock_autoskope_client: AsyncMock,
speed: int,
park_mode: bool,
has_position: bool,
expected_icon: str,
) -> None:
"""Test device tracker icon for different vehicle states."""
position = (
VehiclePosition(
latitude=50.1109221,
longitude=8.6821267,
speed=speed,
timestamp="2025-05-28T10:00:00Z",
park_mode=park_mode,
)
if has_position
else None
)
mock_autoskope_client.get_vehicles.return_value = [
Vehicle(
id="12345",
name="Test Vehicle",
position=position,
external_voltage=12.5,
battery_voltage=3.7,
gps_quality=1.2,
imei="123456789012345",
model="Autoskope",
)
]
await setup_integration(hass, mock_config_entry)
state = hass.states.get("device_tracker.test_vehicle")
assert state is not None
assert state.attributes["icon"] == expected_icon
async def test_entity_unavailable_on_coordinator_error(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
mock_autoskope_client: AsyncMock,
freezer: FrozenDateTimeFactory,
) -> None:
"""Test entity becomes unavailable when coordinator update fails."""
await setup_integration(hass, mock_config_entry)
state = hass.states.get("device_tracker.test_vehicle")
assert state is not None
assert state.state != STATE_UNAVAILABLE
# Simulate connection error on next update
mock_autoskope_client.get_vehicles.side_effect = CannotConnect("Connection lost")
freezer.tick(UPDATE_INTERVAL)
async_fire_time_changed(hass)
await hass.async_block_till_done()
state = hass.states.get("device_tracker.test_vehicle")
assert state is not None
assert state.state == STATE_UNAVAILABLE
async def test_entity_recovers_after_error(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
mock_autoskope_client: AsyncMock,
mock_vehicles: list[Vehicle],
freezer: FrozenDateTimeFactory,
) -> None:
"""Test entity recovers after a transient coordinator error."""
await setup_integration(hass, mock_config_entry)
# Simulate error
mock_autoskope_client.get_vehicles.side_effect = CannotConnect("Connection lost")
freezer.tick(UPDATE_INTERVAL)
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert hass.states.get("device_tracker.test_vehicle").state == STATE_UNAVAILABLE
# Recover
mock_autoskope_client.get_vehicles.side_effect = None
mock_autoskope_client.get_vehicles.return_value = mock_vehicles
freezer.tick(UPDATE_INTERVAL)
async_fire_time_changed(hass)
await hass.async_block_till_done()
state = hass.states.get("device_tracker.test_vehicle")
assert state.state != STATE_UNAVAILABLE
async def test_reauth_success(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
mock_autoskope_client: AsyncMock,
mock_vehicles: list[Vehicle],
freezer: FrozenDateTimeFactory,
) -> None:
"""Test entity stays available after successful re-authentication."""
await setup_integration(hass, mock_config_entry)
# First get_vehicles raises InvalidAuth, retry after authenticate succeeds
mock_autoskope_client.get_vehicles.side_effect = [
InvalidAuth("Token expired"),
mock_vehicles,
]
freezer.tick(UPDATE_INTERVAL)
async_fire_time_changed(hass)
await hass.async_block_till_done()
state = hass.states.get("device_tracker.test_vehicle")
assert state is not None
assert state.state != STATE_UNAVAILABLE
async def test_reauth_failure(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
mock_autoskope_client: AsyncMock,
mock_vehicles: list[Vehicle],
freezer: FrozenDateTimeFactory,
) -> None:
"""Test entity becomes unavailable on permanent auth failure."""
await setup_integration(hass, mock_config_entry)
# get_vehicles raises InvalidAuth, and re-authentication also fails
mock_autoskope_client.get_vehicles.side_effect = InvalidAuth("Token expired")
mock_autoskope_client.authenticate.side_effect = InvalidAuth("Invalid credentials")
freezer.tick(UPDATE_INTERVAL)
async_fire_time_changed(hass)
await hass.async_block_till_done()
state = hass.states.get("device_tracker.test_vehicle")
assert state is not None
assert state.state == STATE_UNAVAILABLE
# Clean up side effects to prevent teardown errors
mock_autoskope_client.get_vehicles.side_effect = None
mock_autoskope_client.authenticate.side_effect = None
mock_autoskope_client.get_vehicles.return_value = mock_vehicles
async def test_vehicle_name_update(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
mock_autoskope_client: AsyncMock,
freezer: FrozenDateTimeFactory,
) -> None:
"""Test device name updates in device registry when vehicle is renamed."""
await setup_integration(hass, mock_config_entry)
device_registry = dr.async_get(hass)
device_entry = device_registry.async_get_device(identifiers={(DOMAIN, "12345")})
assert device_entry is not None
assert device_entry.name == "Test Vehicle"
# Simulate vehicle rename on Autoskope side
mock_autoskope_client.get_vehicles.return_value = [
Vehicle(
id="12345",
name="Renamed Vehicle",
position=VehiclePosition(
latitude=50.1109221,
longitude=8.6821267,
speed=0,
timestamp="2025-05-28T10:00:00Z",
park_mode=True,
),
external_voltage=12.5,
battery_voltage=3.7,
gps_quality=1.2,
imei="123456789012345",
model="Autoskope",
)
]
freezer.tick(UPDATE_INTERVAL)
async_fire_time_changed(hass)
await hass.async_block_till_done()
# Device registry should reflect the new name
device_entry = device_registry.async_get_device(identifiers={(DOMAIN, "12345")})
assert device_entry is not None
assert device_entry.name == "Renamed Vehicle"

View File

@@ -1,48 +0,0 @@
"""Test Autoskope integration setup."""
from unittest.mock import AsyncMock
from autoskope_client.models import CannotConnect, InvalidAuth
import pytest
from homeassistant.config_entries import ConfigEntryState
from homeassistant.core import HomeAssistant
from . import setup_integration
from tests.common import MockConfigEntry
async def test_setup_entry(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
mock_autoskope_client: AsyncMock,
) -> None:
"""Test successful setup and unload of entry."""
await setup_integration(hass, mock_config_entry)
assert mock_config_entry.state is ConfigEntryState.LOADED
await hass.config_entries.async_unload(mock_config_entry.entry_id)
await hass.async_block_till_done()
assert mock_config_entry.state is ConfigEntryState.NOT_LOADED
@pytest.mark.parametrize(
("exception", "expected_state"),
[
(InvalidAuth("Invalid credentials"), ConfigEntryState.SETUP_ERROR),
(CannotConnect("Connection failed"), ConfigEntryState.SETUP_RETRY),
],
)
async def test_setup_entry_errors(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
mock_autoskope_client: AsyncMock,
exception: Exception,
expected_state: ConfigEntryState,
) -> None:
"""Test setup with authentication and connection errors."""
mock_autoskope_client.connect.side_effect = exception
await setup_integration(hass, mock_config_entry)
assert mock_config_entry.state is expected_state

View File

@@ -13,7 +13,6 @@ import string
from typing import TYPE_CHECKING, Any
from unittest.mock import AsyncMock, MagicMock, patch
from aiohasupervisor import SupervisorNotFoundError
from aiohasupervisor.models import (
Discovery,
GreenInfo,
@@ -314,7 +313,6 @@ def addon_not_installed_fixture(
"""Mock add-on not installed."""
from .hassio.common import mock_addon_not_installed # noqa: PLC0415
addon_info.side_effect = SupervisorNotFoundError
return mock_addon_not_installed(addon_store_info, addon_info)

View File

@@ -3,15 +3,11 @@
from __future__ import annotations
import asyncio
from typing import Any
from unittest.mock import AsyncMock, call
from uuid import uuid4
from aiohasupervisor import (
AddonNotSupportedArchitectureError,
AddonNotSupportedHomeAssistantVersionError,
AddonNotSupportedMachineTypeError,
SupervisorError,
)
from aiohasupervisor import SupervisorError
from aiohasupervisor.models import AddonsOptions, Discovery, PartialBackupOptions
import pytest
@@ -24,8 +20,10 @@ from homeassistant.components.hassio.addon_manager import (
from homeassistant.core import HomeAssistant
@pytest.mark.usefixtures("addon_not_installed")
async def test_not_installed_raises_exception(addon_manager: AddonManager) -> None:
async def test_not_installed_raises_exception(
addon_manager: AddonManager,
addon_not_installed: dict[str, Any],
) -> None:
"""Test addon not installed raises exception."""
addon_config = {"test_key": "test"}
@@ -40,40 +38,24 @@ async def test_not_installed_raises_exception(addon_manager: AddonManager) -> No
assert str(err.value) == "Test app is not installed"
@pytest.mark.parametrize(
"exception",
[
AddonNotSupportedArchitectureError(
"Add-on test not supported on this platform, supported architectures: test"
),
AddonNotSupportedHomeAssistantVersionError(
"Add-on test not supported on this system, requires Home Assistant version 2026.1.0 or greater"
),
AddonNotSupportedMachineTypeError(
"Add-on test not supported on this machine, supported machine types: test"
),
],
)
async def test_not_available_raises_exception(
addon_manager: AddonManager,
supervisor_client: AsyncMock,
addon_store_info: AsyncMock,
addon_info: AsyncMock,
exception: SupervisorError,
) -> None:
"""Test addon not available raises exception."""
supervisor_client.store.addon_availability.side_effect = exception
supervisor_client.store.install_addon.side_effect = exception
addon_info.return_value.update_available = True
addon_store_info.return_value.available = False
addon_info.return_value.available = False
with pytest.raises(AddonError) as err:
await addon_manager.async_install_addon()
assert str(err.value) == f"Test app is not available: {exception!s}"
assert str(err.value) == "Test app is not available"
with pytest.raises(AddonError) as err:
await addon_manager.async_update_addon()
assert str(err.value) == f"Test app is not available: {exception!s}"
assert str(err.value) == "Test app is not available"
async def test_get_addon_discovery_info(
@@ -514,10 +496,11 @@ async def test_stop_addon_error(
assert stop_addon.call_count == 1
@pytest.mark.usefixtures("hass", "addon_installed")
async def test_update_addon(
hass: HomeAssistant,
addon_manager: AddonManager,
addon_info: AsyncMock,
addon_installed: AsyncMock,
create_backup: AsyncMock,
update_addon: AsyncMock,
) -> None:
@@ -526,7 +509,7 @@ async def test_update_addon(
await addon_manager.async_update_addon()
assert addon_info.call_count == 1
assert addon_info.call_count == 2
assert create_backup.call_count == 1
assert create_backup.call_args == call(
PartialBackupOptions(name="addon_test_addon_1.0.0", addons={"test_addon"})
@@ -534,10 +517,10 @@ async def test_update_addon(
assert update_addon.call_count == 1
@pytest.mark.usefixtures("addon_installed")
async def test_update_addon_no_update(
addon_manager: AddonManager,
addon_info: AsyncMock,
addon_installed: AsyncMock,
create_backup: AsyncMock,
update_addon: AsyncMock,
) -> None:
@@ -551,10 +534,11 @@ async def test_update_addon_no_update(
assert update_addon.call_count == 0
@pytest.mark.usefixtures("hass", "addon_installed")
async def test_update_addon_error(
hass: HomeAssistant,
addon_manager: AddonManager,
addon_info: AsyncMock,
addon_installed: AsyncMock,
create_backup: AsyncMock,
update_addon: AsyncMock,
) -> None:
@@ -567,7 +551,7 @@ async def test_update_addon_error(
assert str(err.value) == "Failed to update the Test app: Boom"
assert addon_info.call_count == 1
assert addon_info.call_count == 2
assert create_backup.call_count == 1
assert create_backup.call_args == call(
PartialBackupOptions(name="addon_test_addon_1.0.0", addons={"test_addon"})
@@ -575,10 +559,11 @@ async def test_update_addon_error(
assert update_addon.call_count == 1
@pytest.mark.usefixtures("hass", "addon_installed")
async def test_schedule_update_addon(
hass: HomeAssistant,
addon_manager: AddonManager,
addon_info: AsyncMock,
addon_installed: AsyncMock,
create_backup: AsyncMock,
update_addon: AsyncMock,
) -> None:
@@ -604,7 +589,7 @@ async def test_schedule_update_addon(
await asyncio.gather(update_task, update_task_two)
assert addon_manager.task_in_progress() is False
assert addon_info.call_count == 2
assert addon_info.call_count == 3
assert create_backup.call_count == 1
assert create_backup.call_args == call(
PartialBackupOptions(name="addon_test_addon_1.0.0", addons={"test_addon"})

View File

@@ -906,6 +906,7 @@ async def test_config_flow_thread_addon_already_installed(
}
@pytest.mark.usefixtures("addon_not_installed")
async def test_options_flow_zigbee_to_thread(
hass: HomeAssistant,
install_addon: AsyncMock,

View File

@@ -4,7 +4,6 @@ from collections.abc import Generator
from types import SimpleNamespace
from unittest.mock import AsyncMock, patch
from pyintelliclima.const import FanMode, FanSpeed
from pyintelliclima.intelliclima_types import (
IntelliClimaDevices,
IntelliClimaECO,
@@ -51,9 +50,9 @@ def single_eco_device() -> IntelliClimaDevices:
model=IntelliClimaModelType(modello="ECO", tipo="wifi"),
name="Test VMC",
houses_id="12345",
mode_set=FanMode.inward,
mode_set="1",
mode_state="1",
speed_set=FanSpeed.medium,
speed_set="3",
speed_state="3",
last_online="2025-11-18 10:22:51",
creation_date="2025-11-18 10:22:51",

View File

@@ -3,7 +3,6 @@
from collections.abc import AsyncGenerator
from unittest.mock import AsyncMock, patch
from pyintelliclima.const import FanMode, FanSpeed
import pytest
from syrupy.assertion import SnapshotAssertion
@@ -104,7 +103,7 @@ async def test_fan_turn_on_service_calls_api(
# Device serial from single_eco_device.crono_sn
mock_cloud_interface.ecocomfort.set_mode_speed.assert_awaited_once_with(
"11223344", FanMode.inward, FanSpeed.low
"11223344", "1", "2"
)
@@ -120,10 +119,10 @@ async def test_fan_set_percentage_maps_to_speed(
{ATTR_ENTITY_ID: FAN_ENTITY_ID, ATTR_PERCENTAGE: 15},
blocking=True,
)
# Initial mode_set=FanMode.inward from single_eco_device.
# Sleep speed is FanSpeed.sleep (25%).
# Initial mode_set="1" (forward) from single_eco_device.
# Sleep speed is "1" (25%).
mock_cloud_interface.ecocomfort.set_mode_speed.assert_awaited_once_with(
"11223344", FanMode.inward, FanSpeed.sleep
"11223344", "1", "1"
)
@@ -166,18 +165,18 @@ async def test_fan_set_percentage_zero_turns_off(
("service_data", "expected_mode", "expected_speed"),
[
# percentage=None, preset_mode=None -> defaults to previous speed > 75% (medium),
# previous mode > FanMode.inward
({}, FanMode.inward, FanSpeed.medium),
# percentage=0, preset_mode=None -> default 25% (FanSpeed.sleep), previous mode (inward)
({ATTR_PERCENTAGE: 0}, FanMode.inward, FanSpeed.sleep),
# previous mode > "inward"
({}, "1", "3"),
# percentage=0, preset_mode=None -> default 25% (sleep), previous mode (inward)
({ATTR_PERCENTAGE: 0}, "1", "1"),
],
)
async def test_fan_turn_on_defaulting_behavior(
hass: HomeAssistant,
mock_cloud_interface: AsyncMock,
service_data: dict,
expected_mode: FanMode,
expected_speed: FanSpeed,
expected_mode: str,
expected_speed: str,
) -> None:
"""turn_on defaults percentage/preset as expected."""
data = {ATTR_ENTITY_ID: FAN_ENTITY_ID} | service_data

View File

@@ -86,10 +86,10 @@ async def test_select_option_keeps_current_speed(
{ATTR_ENTITY_ID: SELECT_ENTITY_ID, ATTR_OPTION: option},
blocking=True,
)
# Device starts with speed_set=FanSpeed.medium (from single_eco_device in conftest),
# Device starts with speed_set="3" (from single_eco_device in conftest),
# mode is not off and not auto, so current speed is preserved.
mock_cloud_interface.ecocomfort.set_mode_speed.assert_awaited_once_with(
"11223344", expected_mode, FanSpeed.medium
"11223344", expected_mode, "3"
)
@@ -119,9 +119,9 @@ async def test_select_option_in_auto_mode_defaults_speed_to_sleep(
mock_cloud_interface: AsyncMock,
single_eco_device,
) -> None:
"""When speed_set is FanSpeed.auto_get (auto preset), selecting an option defaults to sleep speed."""
"""When speed_set is FanSpeed.auto (auto preset), selecting an option defaults to sleep speed."""
eco = list(single_eco_device.ecocomfort2_devices.values())[0]
eco.speed_set = FanSpeed.auto_get
eco.speed_set = FanSpeed.auto
eco.mode_set = FanMode.sensor
await hass.services.async_call(

View File

@@ -1,24 +0,0 @@
# serializer version: 1
# name: test_diagnostics
dict({
'pets': list([
]),
'robots': list([
dict({
'cleanCycleWaitTimeMinutes': '7',
'cycleCapacity': '30',
'cycleCount': '15',
'cyclesAfterDrawerFull': '0',
'lastSeen': '2022-09-17T13:06:37.884Z',
'litterRobotId': '**REDACTED**',
'litterRobotNickname': 'Test',
'litterRobotSerial': '**REDACTED**',
'nightLightActive': '1',
'panelLockActive': '0',
'powerStatus': 'AC',
'sleepModeActive': '112:50:19',
'unitStatus': 'RDY',
}),
]),
})
# ---

View File

@@ -3,12 +3,10 @@
from unittest.mock import MagicMock
from freezegun import freeze_time
import pytest
from homeassistant.components.button import DOMAIN as BUTTON_DOMAIN, SERVICE_PRESS
from homeassistant.const import ATTR_ENTITY_ID, STATE_UNKNOWN, EntityCategory
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import entity_registry as er
from .conftest import setup_integration
@@ -44,18 +42,3 @@ async def test_button(
state = hass.states.get(BUTTON_ENTITY)
assert state
assert state.state == "2021-11-15T10:37:00+00:00"
async def test_button_command_exception(
hass: HomeAssistant, mock_account_with_side_effects: MagicMock
) -> None:
"""Test that LitterRobotException is wrapped in HomeAssistantError."""
await setup_integration(hass, mock_account_with_side_effects, BUTTON_DOMAIN)
with pytest.raises(HomeAssistantError, match="Invalid command: oops"):
await hass.services.async_call(
BUTTON_DOMAIN,
SERVICE_PRESS,
{ATTR_ENTITY_ID: BUTTON_ENTITY},
blocking=True,
)

View File

@@ -1,81 +0,0 @@
"""Tests for the Litter-Robot coordinator."""
from unittest.mock import MagicMock
from freezegun.api import FrozenDateTimeFactory
from pylitterbot.exceptions import LitterRobotException, LitterRobotLoginException
from homeassistant.components.litterrobot.const import DOMAIN
from homeassistant.components.litterrobot.coordinator import UPDATE_INTERVAL
from homeassistant.components.vacuum import DOMAIN as VACUUM_DOMAIN
from homeassistant.config_entries import SOURCE_REAUTH
from homeassistant.const import STATE_UNAVAILABLE
from homeassistant.core import HomeAssistant
from .common import VACUUM_ENTITY_ID
from .conftest import setup_integration
from tests.common import async_fire_time_changed
async def test_coordinator_update_error(
hass: HomeAssistant,
mock_account: MagicMock,
freezer: FrozenDateTimeFactory,
) -> None:
"""Test entities become unavailable when coordinator update fails."""
await setup_integration(hass, mock_account, VACUUM_DOMAIN)
assert (state := hass.states.get(VACUUM_ENTITY_ID))
assert state.state != STATE_UNAVAILABLE
# Simulate an API error during update
mock_account.refresh_robots.side_effect = LitterRobotException("Unable to connect")
freezer.tick(UPDATE_INTERVAL)
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert (state := hass.states.get(VACUUM_ENTITY_ID))
assert state.state == STATE_UNAVAILABLE
# Recover
mock_account.refresh_robots.side_effect = None
freezer.tick(UPDATE_INTERVAL)
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert (state := hass.states.get(VACUUM_ENTITY_ID))
assert state.state != STATE_UNAVAILABLE
async def test_coordinator_update_auth_error(
hass: HomeAssistant,
mock_account: MagicMock,
freezer: FrozenDateTimeFactory,
) -> None:
"""Test reauthentication flow is triggered on login error during update."""
entry = await setup_integration(hass, mock_account, VACUUM_DOMAIN)
assert (state := hass.states.get(VACUUM_ENTITY_ID))
assert state.state != STATE_UNAVAILABLE
# Simulate an authentication error during update
mock_account.refresh_robots.side_effect = LitterRobotLoginException(
"Invalid credentials"
)
freezer.tick(UPDATE_INTERVAL)
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert (state := hass.states.get(VACUUM_ENTITY_ID))
assert state.state == STATE_UNAVAILABLE
# Ensure a reauthentication flow was triggered
flows = hass.config_entries.flow.async_progress()
assert len(flows) == 1
flow = flows[0]
assert flow["step_id"] == "reauth_confirm"
assert flow["handler"] == DOMAIN
assert flow["context"].get("source") == SOURCE_REAUTH
assert flow["context"].get("entry_id") == entry.entry_id

Some files were not shown because too many files have changed in this diff Show More