Compare commits

...

38 Commits

Author SHA1 Message Date
Willem-Jan van Rootselaar
324ed65999 add codeowner to homevolt (#164097) 2026-02-25 19:46:41 +01:00
Maikel Punie
42428b91bb Bump velbusaio to 2026.2.0 (#164093) 2026-02-25 19:41:17 +01:00
Glenn de Haan
c41dd3e3a8 Bump hdfury to 1.6.0 (#164088) 2026-02-25 19:40:11 +01:00
Joost Lekkerkerker
02171a1da0 Add Zinvolt power sensor (#164092) 2026-02-25 18:58:25 +01:00
konsulten
19c7f663ca Add diagnostic to systemnexa2 integration (#164090) 2026-02-25 18:51:51 +01:00
Matthias Alphart
87bd04af5a Update knx-frontend to 2026.2.25.165736 (#164089) 2026-02-25 18:50:21 +01:00
Jamie Magee
5af6227ad7 Add action exceptions for cover commands in aladdin_connect (#164087) 2026-02-25 18:45:04 +01:00
Robert Resch
9b56f936fd Bump uv to 0.10.6 (#164086) 2026-02-25 18:36:07 +01:00
Joost Lekkerkerker
f2afd324d9 Make Zinvolt battery state a non diagnostic sensor (#164071) 2026-02-25 18:22:23 +01:00
Joost Lekkerkerker
173aab5233 Refresh coordinator in Zinvolt after setting value (#164069) 2026-02-25 18:19:58 +01:00
Joost Lekkerkerker
1d97729547 Use different name source in Zinvolt (#164072) 2026-02-25 18:18:52 +01:00
konsulten
91ca674a36 Add sensor platform to systemnexa2 (#163961) 2026-02-25 18:18:12 +01:00
Joost Lekkerkerker
6157802fb5 Set initiate flow for Zinvolt (#164054) 2026-02-25 18:18:10 +01:00
Joost Lekkerkerker
7e3b7a0c02 Add integration_type device to zerproc (#163998) 2026-02-25 18:17:56 +01:00
Joost Lekkerkerker
6a5455d7a5 Add integration_type device to wiffi (#163978) 2026-02-25 18:17:23 +01:00
Kamil Breguła
09765fe53d Fix AWS S3 config flow endpoint URL validation (#164085)
Co-authored-by: mik-laj <12058428+mik-laj@users.noreply.github.com>
2026-02-25 18:17:04 +01:00
Felix Eckhofer
2fccbd6e47 dwd_weather_warnings: Filter expired warnings (#163096) 2026-02-25 18:16:44 +01:00
Jamie Magee
ef7cccbe3f Handle coordinator update errors in aladdin_connect (#164084) 2026-02-25 18:15:40 +01:00
Jamie Magee
a704c2d44b Add parallel updates to aladdin_connect (#164082) 2026-02-25 18:06:43 +01:00
Robert Resch
f12c5b627d Remove building wheels for Python 3.13 (#164083) 2026-02-25 18:05:32 +01:00
Bram Kragten
b241054a96 Update frontend to 20260225.0 (#164076) 2026-02-25 17:55:00 +01:00
Erik Montnemery
0fd515404d Fix smarla test snapshots (#164078) 2026-02-25 17:50:06 +01:00
Erik Montnemery
52382b7fe5 Fix ntfy test snapshots (#164079) 2026-02-25 17:49:46 +01:00
Thomas D
209af5dccc Adjust service description for Volvo integration (#164073) 2026-02-25 17:46:34 +01:00
Liquidmasl
227d2e8de6 Sonarr coordinator refactor (#164077)
Co-authored-by: Joost Lekkerkerker <joostlek@outlook.com>
2026-02-25 17:46:18 +01:00
Erwin Douna
96d50565f9 Portainer optimize switch (#163520)
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Robert Resch <robert@resch.dev>
2026-02-25 17:39:49 +01:00
Tom
80fc3691d8 Align airOS add_entities consumption in sensor (#164055)
Co-authored-by: Joost Lekkerkerker <joostlek@outlook.com>
2026-02-25 17:25:51 +01:00
Christian Lackas
15e00f6ffa Add siren support for HmIP-MP3P (Combination Signalling Device) (#161634)
Co-authored-by: Joost Lekkerkerker <joostlek@outlook.com>
2026-02-25 17:16:56 +01:00
Brett Adams
f25b437832 Add quality scale to Tessie integration (#160499)
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
Co-authored-by: Tom <CoMPaTech@users.noreply.github.com>
2026-02-25 17:10:41 +01:00
Franck Nijhof
2e34d4d3a6 Add brands system integration to proxy brand images through local API (#163960)
Co-authored-by: Robert Resch <robert@resch.dev>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-02-25 17:10:28 +01:00
Liquidmasl
b81b12f094 Sonarr service calls instead of sensor attributes (#161199)
Co-authored-by: Joostlek <joostlek@outlook.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-02-25 17:09:06 +01:00
Erwin Douna
7446d5ea7c Add reconfigure flow to Fully Kiosk (#161840) 2026-02-25 17:08:43 +01:00
Matt Zimmerman
7b811cddce Use has_entity_name in SmartTub entities (#162374)
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-02-25 16:45:48 +01:00
Paul Bottein
19545f29dc Use show in sidebar property instead of removing panel title and icon (#164025) 2026-02-25 16:37:15 +01:00
Jamie Magee
e591291cbe Add platform tests for aladdin_connect cover and sensor (#164011) 2026-02-25 16:20:19 +01:00
Joost Lekkerkerker
cb990823cd Improve platforms pylint plugin (#164067) 2026-02-25 16:15:28 +01:00
Willem-Jan van Rootselaar
2cfafc04ce Bump python-bsblan to 5.1.0 (#164064) 2026-02-25 15:57:07 +01:00
Ludovic BOUÉ
0563037c5a Fix MatterValve state handling and allow None values for attributes (#164066) 2026-02-25 15:57:05 +01:00
162 changed files with 5339 additions and 465 deletions

View File

@@ -110,7 +110,7 @@ jobs:
strategy:
fail-fast: false
matrix:
abi: ["cp313", "cp314"]
abi: ["cp314"]
arch: ["amd64", "aarch64"]
include:
- arch: amd64
@@ -161,7 +161,7 @@ jobs:
strategy:
fail-fast: false
matrix:
abi: ["cp313", "cp314"]
abi: ["cp314"]
arch: ["amd64", "aarch64"]
include:
- arch: amd64

6
CODEOWNERS generated
View File

@@ -242,6 +242,8 @@ build.json @home-assistant/supervisor
/tests/components/bosch_alarm/ @mag1024 @sanjay900
/homeassistant/components/bosch_shc/ @tschamm
/tests/components/bosch_shc/ @tschamm
/homeassistant/components/brands/ @home-assistant/core
/tests/components/brands/ @home-assistant/core
/homeassistant/components/braviatv/ @bieniu @Drafteed
/tests/components/braviatv/ @bieniu @Drafteed
/homeassistant/components/bring/ @miaucl @tr4nt0r
@@ -717,8 +719,8 @@ build.json @home-assistant/supervisor
/tests/components/homematic/ @pvizeli
/homeassistant/components/homematicip_cloud/ @hahn-th @lackas
/tests/components/homematicip_cloud/ @hahn-th @lackas
/homeassistant/components/homevolt/ @danielhiversen
/tests/components/homevolt/ @danielhiversen
/homeassistant/components/homevolt/ @danielhiversen @liudger
/tests/components/homevolt/ @danielhiversen @liudger
/homeassistant/components/homewizard/ @DCSBL
/tests/components/homewizard/ @DCSBL
/homeassistant/components/honeywell/ @rdfurman @mkmer

2
Dockerfile generated
View File

@@ -30,7 +30,7 @@ RUN \
# Verify go2rtc can be executed
go2rtc --version \
# Install uv
&& pip3 install uv==0.9.26
&& pip3 install uv==0.10.6
WORKDIR /usr/src

View File

@@ -210,6 +210,7 @@ DEFAULT_INTEGRATIONS = {
"analytics", # Needed for onboarding
"application_credentials",
"backup",
"brands",
"frontend",
"hardware",
"labs",

View File

@@ -89,11 +89,10 @@ async def async_setup_entry(
"""Set up the AirOS binary sensors from a config entry."""
coordinator = config_entry.runtime_data
entities: list[BinarySensorEntity] = []
entities.extend(
entities = [
AirOSBinarySensor(coordinator, description)
for description in COMMON_BINARY_SENSORS
)
]
if coordinator.device_data["fw_major"] == 8:
entities.extend(

View File

@@ -182,15 +182,15 @@ async def async_setup_entry(
"""Set up the AirOS sensors from a config entry."""
coordinator = config_entry.runtime_data
async_add_entities(
AirOSSensor(coordinator, description) for description in COMMON_SENSORS
)
entities = [AirOSSensor(coordinator, description) for description in COMMON_SENSORS]
if coordinator.device_data["fw_major"] == 8:
async_add_entities(
entities.extend(
AirOSSensor(coordinator, description) for description in AIROS8_SENSORS
)
async_add_entities(entities)
class AirOSSensor(AirOSEntity, SensorEntity):
"""Representation of a Sensor."""

View File

@@ -5,12 +5,13 @@ from __future__ import annotations
from datetime import timedelta
import logging
import aiohttp
from genie_partner_sdk.client import AladdinConnectClient
from genie_partner_sdk.model import GarageDoor
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
_LOGGER = logging.getLogger(__name__)
type AladdinConnectConfigEntry = ConfigEntry[dict[str, AladdinConnectCoordinator]]
@@ -40,7 +41,10 @@ class AladdinConnectCoordinator(DataUpdateCoordinator[GarageDoor]):
async def _async_update_data(self) -> GarageDoor:
"""Fetch data from the Aladdin Connect API."""
await self.client.update_door(self.data.device_id, self.data.door_number)
try:
await self.client.update_door(self.data.device_id, self.data.door_number)
except aiohttp.ClientError as err:
raise UpdateFailed(f"Error communicating with API: {err}") from err
self.data.status = self.client.get_door_status(
self.data.device_id, self.data.door_number
)

View File

@@ -4,14 +4,19 @@ from __future__ import annotations
from typing import Any
import aiohttp
from homeassistant.components.cover import CoverDeviceClass, CoverEntity
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from .const import SUPPORTED_FEATURES
from .const import DOMAIN, SUPPORTED_FEATURES
from .coordinator import AladdinConnectConfigEntry, AladdinConnectCoordinator
from .entity import AladdinConnectEntity
PARALLEL_UPDATES = 1
async def async_setup_entry(
hass: HomeAssistant,
@@ -40,11 +45,23 @@ class AladdinCoverEntity(AladdinConnectEntity, CoverEntity):
async def async_open_cover(self, **kwargs: Any) -> None:
"""Issue open command to cover."""
await self.client.open_door(self._device_id, self._number)
try:
await self.client.open_door(self._device_id, self._number)
except aiohttp.ClientError as err:
raise HomeAssistantError(
translation_domain=DOMAIN,
translation_key="open_door_failed",
) from err
async def async_close_cover(self, **kwargs: Any) -> None:
"""Issue close command to cover."""
await self.client.close_door(self._device_id, self._number)
try:
await self.client.close_door(self._device_id, self._number)
except aiohttp.ClientError as err:
raise HomeAssistantError(
translation_domain=DOMAIN,
translation_key="close_door_failed",
) from err
@property
def is_closed(self) -> bool | None:

View File

@@ -26,20 +26,22 @@ rules:
unique-config-entry: done
# Silver
action-exceptions: todo
action-exceptions: done
config-entry-unloading: done
docs-configuration-parameters:
status: exempt
comment: Integration does not have an options flow.
docs-installation-parameters: done
entity-unavailable: todo
entity-unavailable:
status: done
comment: Handled by the coordinator.
integration-owner: done
log-when-unavailable: todo
parallel-updates: todo
log-when-unavailable:
status: done
comment: Handled by the coordinator.
parallel-updates: done
reauthentication-flow: done
test-coverage:
status: todo
comment: Platform tests for cover and sensor need to be implemented to reach 95% coverage.
test-coverage: done
# Gold
devices: done

View File

@@ -20,6 +20,8 @@ from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from .coordinator import AladdinConnectConfigEntry, AladdinConnectCoordinator
from .entity import AladdinConnectEntity
PARALLEL_UPDATES = 0
@dataclass(frozen=True, kw_only=True)
class AladdinConnectSensorEntityDescription(SensorEntityDescription):

View File

@@ -32,5 +32,13 @@
"title": "[%key:common::config_flow::title::reauth%]"
}
}
},
"exceptions": {
"close_door_failed": {
"message": "Failed to close the garage door"
},
"open_door_failed": {
"message": "Failed to open the garage door"
}
}
}

View File

@@ -60,9 +60,8 @@ class S3ConfigFlow(ConfigFlow, domain=DOMAIN):
}
)
if not urlparse(user_input[CONF_ENDPOINT_URL]).hostname.endswith(
AWS_DOMAIN
):
hostname = urlparse(user_input[CONF_ENDPOINT_URL]).hostname
if not hostname or not hostname.endswith(AWS_DOMAIN):
errors[CONF_ENDPOINT_URL] = "invalid_endpoint_url"
else:
try:

View File

@@ -0,0 +1,291 @@
"""The Brands integration."""
from __future__ import annotations
from collections import deque
from http import HTTPStatus
import logging
from pathlib import Path
from random import SystemRandom
import time
from typing import Any, Final
from aiohttp import ClientError, hdrs, web
import voluptuous as vol
from homeassistant.components import websocket_api
from homeassistant.components.http import KEY_AUTHENTICATED, HomeAssistantView
from homeassistant.core import HomeAssistant, callback, valid_domain
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.event import async_track_time_interval
from homeassistant.helpers.typing import ConfigType
from homeassistant.loader import async_get_custom_components
from .const import (
ALLOWED_IMAGES,
BRANDS_CDN_URL,
CACHE_TTL,
CATEGORY_RE,
CDN_TIMEOUT,
DOMAIN,
HARDWARE_IMAGE_RE,
IMAGE_FALLBACKS,
PLACEHOLDER,
TOKEN_CHANGE_INTERVAL,
)
_LOGGER = logging.getLogger(__name__)
_RND: Final = SystemRandom()
CONFIG_SCHEMA = cv.empty_config_schema(DOMAIN)
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the Brands integration."""
access_tokens: deque[str] = deque([], 2)
access_tokens.append(hex(_RND.getrandbits(256))[2:])
hass.data[DOMAIN] = access_tokens
@callback
def _rotate_token(_now: Any) -> None:
"""Rotate the access token."""
access_tokens.append(hex(_RND.getrandbits(256))[2:])
async_track_time_interval(hass, _rotate_token, TOKEN_CHANGE_INTERVAL)
hass.http.register_view(BrandsIntegrationView(hass))
hass.http.register_view(BrandsHardwareView(hass))
websocket_api.async_register_command(hass, ws_access_token)
return True
@callback
@websocket_api.websocket_command({vol.Required("type"): "brands/access_token"})
def ws_access_token(
hass: HomeAssistant,
connection: websocket_api.ActiveConnection,
msg: dict[str, Any],
) -> None:
"""Return the current brands access token."""
access_tokens: deque[str] = hass.data[DOMAIN]
connection.send_result(msg["id"], {"token": access_tokens[-1]})
def _read_cached_file_with_marker(
cache_path: Path,
) -> tuple[bytes | None, float] | None:
"""Read a cached file, distinguishing between content and 404 markers.
Returns (content, mtime) where content is None for 404 markers (empty files).
Returns None if the file does not exist at all.
"""
if not cache_path.is_file():
return None
mtime = cache_path.stat().st_mtime
data = cache_path.read_bytes()
if not data:
# Empty file is a 404 marker
return (None, mtime)
return (data, mtime)
def _write_cache_file(cache_path: Path, data: bytes) -> None:
"""Write data to cache file, creating directories as needed."""
cache_path.parent.mkdir(parents=True, exist_ok=True)
cache_path.write_bytes(data)
def _read_brand_file(brand_dir: Path, image: str) -> bytes | None:
"""Read a brand image, trying fallbacks in a single I/O pass."""
for candidate in (image, *IMAGE_FALLBACKS.get(image, ())):
file_path = brand_dir / candidate
if file_path.is_file():
return file_path.read_bytes()
return None
class _BrandsBaseView(HomeAssistantView):
"""Base view for serving brand images."""
requires_auth = False
def __init__(self, hass: HomeAssistant) -> None:
"""Initialize the view."""
self._hass = hass
self._cache_dir = Path(hass.config.cache_path(DOMAIN))
def _authenticate(self, request: web.Request) -> None:
"""Authenticate the request using Bearer token or query token."""
access_tokens: deque[str] = self._hass.data[DOMAIN]
authenticated = (
request[KEY_AUTHENTICATED] or request.query.get("token") in access_tokens
)
if not authenticated:
if hdrs.AUTHORIZATION in request.headers:
raise web.HTTPUnauthorized
raise web.HTTPForbidden
async def _serve_from_custom_integration(
self,
domain: str,
image: str,
) -> web.Response | None:
"""Try to serve a brand image from a custom integration."""
custom_components = await async_get_custom_components(self._hass)
if (integration := custom_components.get(domain)) is None:
return None
if not integration.has_branding:
return None
brand_dir = Path(integration.file_path) / "brand"
data = await self._hass.async_add_executor_job(
_read_brand_file, brand_dir, image
)
if data is not None:
return self._build_response(data)
return None
async def _serve_from_cache_or_cdn(
self,
cdn_path: str,
cache_subpath: str,
*,
fallback_placeholder: bool = True,
) -> web.Response:
"""Serve from disk cache, fetching from CDN if needed."""
cache_path = self._cache_dir / cache_subpath
now = time.time()
# Try disk cache
result = await self._hass.async_add_executor_job(
_read_cached_file_with_marker, cache_path
)
if result is not None:
data, mtime = result
# Schedule background refresh if stale
if now - mtime > CACHE_TTL:
self._hass.async_create_background_task(
self._fetch_and_cache(cdn_path, cache_path),
f"brands_refresh_{cache_subpath}",
)
else:
# Cache miss - fetch from CDN
data = await self._fetch_and_cache(cdn_path, cache_path)
if data is None:
if fallback_placeholder:
return await self._serve_placeholder(
image=cache_subpath.rsplit("/", 1)[-1]
)
return web.Response(status=HTTPStatus.NOT_FOUND)
return self._build_response(data)
async def _fetch_and_cache(
self,
cdn_path: str,
cache_path: Path,
) -> bytes | None:
"""Fetch from CDN and write to cache. Returns data or None on 404."""
url = f"{BRANDS_CDN_URL}/{cdn_path}"
session = async_get_clientsession(self._hass)
try:
resp = await session.get(url, timeout=CDN_TIMEOUT)
except ClientError, TimeoutError:
_LOGGER.debug("Failed to fetch brand from CDN: %s", cdn_path)
return None
if resp.status == HTTPStatus.NOT_FOUND:
# Cache the 404 as empty file
await self._hass.async_add_executor_job(_write_cache_file, cache_path, b"")
return None
if resp.status != HTTPStatus.OK:
_LOGGER.debug("Unexpected CDN response %s for %s", resp.status, cdn_path)
return None
data = await resp.read()
await self._hass.async_add_executor_job(_write_cache_file, cache_path, data)
return data
async def _serve_placeholder(self, image: str) -> web.Response:
"""Serve a placeholder image."""
return await self._serve_from_cache_or_cdn(
cdn_path=f"_/{PLACEHOLDER}/{image}",
cache_subpath=f"integrations/{PLACEHOLDER}/{image}",
fallback_placeholder=False,
)
@staticmethod
def _build_response(data: bytes) -> web.Response:
"""Build a response with proper headers."""
return web.Response(
body=data,
content_type="image/png",
)
class BrandsIntegrationView(_BrandsBaseView):
"""Serve integration brand images."""
name = "api:brands:integration"
url = "/api/brands/integration/{domain}/{image}"
async def get(
self,
request: web.Request,
domain: str,
image: str,
) -> web.Response:
"""Handle GET request for an integration brand image."""
self._authenticate(request)
if not valid_domain(domain) or image not in ALLOWED_IMAGES:
return web.Response(status=HTTPStatus.NOT_FOUND)
use_placeholder = request.query.get("placeholder") != "no"
# 1. Try custom integration local files
if (
response := await self._serve_from_custom_integration(domain, image)
) is not None:
return response
# 2. Try cache / CDN (always use direct path for proper 404 caching)
return await self._serve_from_cache_or_cdn(
cdn_path=f"brands/{domain}/{image}",
cache_subpath=f"integrations/{domain}/{image}",
fallback_placeholder=use_placeholder,
)
class BrandsHardwareView(_BrandsBaseView):
"""Serve hardware brand images."""
name = "api:brands:hardware"
url = "/api/brands/hardware/{category}/{image:.+}"
async def get(
self,
request: web.Request,
category: str,
image: str,
) -> web.Response:
"""Handle GET request for a hardware brand image."""
self._authenticate(request)
if not CATEGORY_RE.match(category):
return web.Response(status=HTTPStatus.NOT_FOUND)
# Hardware images have dynamic names like "manufacturer_model.png"
# Validate it ends with .png and contains only safe characters
if not HARDWARE_IMAGE_RE.match(image):
return web.Response(status=HTTPStatus.NOT_FOUND)
cache_subpath = f"hardware/{category}/{image}"
return await self._serve_from_cache_or_cdn(
cdn_path=cache_subpath,
cache_subpath=cache_subpath,
)

View File

@@ -0,0 +1,57 @@
"""Constants for the Brands integration."""
from __future__ import annotations
from datetime import timedelta
import re
from typing import Final
from aiohttp import ClientTimeout
DOMAIN: Final = "brands"
# CDN
BRANDS_CDN_URL: Final = "https://brands.home-assistant.io"
CDN_TIMEOUT: Final = ClientTimeout(total=10)
PLACEHOLDER: Final = "_placeholder"
# Caching
CACHE_TTL: Final = 30 * 24 * 60 * 60 # 30 days in seconds
# Access token
TOKEN_CHANGE_INTERVAL: Final = timedelta(minutes=30)
# Validation
CATEGORY_RE: Final = re.compile(r"^[a-z0-9_]+$")
HARDWARE_IMAGE_RE: Final = re.compile(r"^[a-z0-9_-]+\.png$")
# Images and fallback chains
ALLOWED_IMAGES: Final = frozenset(
{
"icon.png",
"logo.png",
"icon@2x.png",
"logo@2x.png",
"dark_icon.png",
"dark_logo.png",
"dark_icon@2x.png",
"dark_logo@2x.png",
}
)
# Fallback chains for image resolution, mirroring the brands CDN build logic.
# When a requested image is not found, we try each fallback in order.
IMAGE_FALLBACKS: Final[dict[str, list[str]]] = {
"logo.png": ["icon.png"],
"icon@2x.png": ["icon.png"],
"logo@2x.png": ["logo.png", "icon.png"],
"dark_icon.png": ["icon.png"],
"dark_logo.png": ["dark_icon.png", "logo.png", "icon.png"],
"dark_icon@2x.png": ["icon@2x.png", "icon.png"],
"dark_logo@2x.png": [
"dark_icon@2x.png",
"logo@2x.png",
"logo.png",
"icon.png",
],
}

View File

@@ -0,0 +1,10 @@
{
"domain": "brands",
"name": "Brands",
"codeowners": ["@home-assistant/core"],
"config_flow": false,
"dependencies": ["http", "websocket_api"],
"documentation": "https://www.home-assistant.io/integrations/brands",
"integration_type": "system",
"quality_scale": "internal"
}

View File

@@ -8,7 +8,7 @@
"iot_class": "local_polling",
"loggers": ["bsblan"],
"quality_scale": "silver",
"requirements": ["python-bsblan==5.0.1"],
"requirements": ["python-bsblan==5.1.0"],
"zeroconf": [
{
"name": "bsb-lan*",

View File

@@ -38,7 +38,7 @@ async def _root_payload(
media_class=MediaClass.DIRECTORY,
media_content_id="",
media_content_type="presets",
thumbnail="https://brands.home-assistant.io/_/cambridge_audio/logo.png",
thumbnail="/api/brands/integration/cambridge_audio/logo.png",
can_play=False,
can_expand=True,
)

View File

@@ -11,6 +11,7 @@ Wetterwarnungen (Stufe 1)
from __future__ import annotations
from datetime import UTC, datetime
from typing import Any
from homeassistant.components.sensor import SensorEntity, SensorEntityDescription
@@ -95,13 +96,25 @@ class DwdWeatherWarningsSensor(
entry_type=DeviceEntryType.SERVICE,
)
def _filter_expired_warnings(
self, warnings: list[dict[str, Any]] | None
) -> list[dict[str, Any]]:
if warnings is None:
return []
now = datetime.now(UTC)
return [warning for warning in warnings if warning[API_ATTR_WARNING_END] > now]
@property
def native_value(self) -> int | None:
"""Return the state of the sensor."""
if self.entity_description.key == CURRENT_WARNING_SENSOR:
return self.coordinator.api.current_warning_level
warnings = self.coordinator.api.current_warnings
else:
warnings = self.coordinator.api.expected_warnings
return self.coordinator.api.expected_warning_level
warnings = self._filter_expired_warnings(warnings)
return max((w.get(API_ATTR_WARNING_LEVEL, 0) for w in warnings), default=0)
@property
def extra_state_attributes(self) -> dict[str, Any]:
@@ -117,6 +130,7 @@ class DwdWeatherWarningsSensor(
else:
searched_warnings = self.coordinator.api.expected_warnings
searched_warnings = self._filter_expired_warnings(searched_warnings)
data[ATTR_WARNING_COUNT] = len(searched_warnings)
for i, warning in enumerate(searched_warnings, 1):

View File

@@ -304,7 +304,7 @@ def base_owntone_library() -> BrowseMedia:
can_play=False,
can_expand=True,
children=children,
thumbnail="https://brands.home-assistant.io/_/forked_daapd/logo.png",
thumbnail="/api/brands/integration/forked_daapd/logo.png",
)
@@ -321,7 +321,7 @@ def library(other: Sequence[BrowseMedia] | None) -> BrowseMedia:
media_content_type=MediaType.APP,
can_play=False,
can_expand=True,
thumbnail="https://brands.home-assistant.io/_/forked_daapd/logo.png",
thumbnail="/api/brands/integration/forked_daapd/logo.png",
)
]
if other:

View File

@@ -297,6 +297,9 @@ class Panel:
# If the panel should only be visible to admins
require_admin = False
# If the panel should be shown in the sidebar
show_in_sidebar = True
# If the panel is a configuration panel for a integration
config_panel_domain: str | None = None
@@ -310,6 +313,7 @@ class Panel:
config: dict[str, Any] | None,
require_admin: bool,
config_panel_domain: str | None,
show_in_sidebar: bool,
) -> None:
"""Initialize a built-in panel."""
self.component_name = component_name
@@ -319,6 +323,7 @@ class Panel:
self.config = config
self.require_admin = require_admin
self.config_panel_domain = config_panel_domain
self.show_in_sidebar = show_in_sidebar
self.sidebar_default_visible = sidebar_default_visible
@callback
@@ -335,18 +340,17 @@ class Panel:
"url_path": self.frontend_url_path,
"require_admin": self.require_admin,
"config_panel_domain": self.config_panel_domain,
"show_in_sidebar": self.show_in_sidebar,
}
if config_override:
if "require_admin" in config_override:
response["require_admin"] = config_override["require_admin"]
if config_override.get("show_in_sidebar") is False:
response["title"] = None
response["icon"] = None
else:
if "icon" in config_override:
response["icon"] = config_override["icon"]
if "title" in config_override:
response["title"] = config_override["title"]
if "show_in_sidebar" in config_override:
response["show_in_sidebar"] = config_override["show_in_sidebar"]
if "icon" in config_override:
response["icon"] = config_override["icon"]
if "title" in config_override:
response["title"] = config_override["title"]
return response
@@ -364,6 +368,7 @@ def async_register_built_in_panel(
*,
update: bool = False,
config_panel_domain: str | None = None,
show_in_sidebar: bool = True,
) -> None:
"""Register a built-in panel."""
panel = Panel(
@@ -375,6 +380,7 @@ def async_register_built_in_panel(
config,
require_admin,
config_panel_domain,
show_in_sidebar,
)
panels = hass.data.setdefault(DATA_PANELS, {})
@@ -570,28 +576,28 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"light",
sidebar_icon="mdi:lamps",
sidebar_title="light",
sidebar_default_visible=False,
show_in_sidebar=False,
)
async_register_built_in_panel(
hass,
"security",
sidebar_icon="mdi:security",
sidebar_title="security",
sidebar_default_visible=False,
show_in_sidebar=False,
)
async_register_built_in_panel(
hass,
"climate",
sidebar_icon="mdi:home-thermometer",
sidebar_title="climate",
sidebar_default_visible=False,
show_in_sidebar=False,
)
async_register_built_in_panel(
hass,
"home",
sidebar_icon="mdi:home",
sidebar_title="home",
sidebar_default_visible=False,
show_in_sidebar=False,
)
async_register_built_in_panel(hass, "profile")
@@ -1085,3 +1091,4 @@ class PanelResponse(TypedDict):
url_path: str
require_admin: bool
config_panel_domain: str | None
show_in_sidebar: bool

View File

@@ -21,5 +21,5 @@
"integration_type": "system",
"preview_features": { "winter_mode": {} },
"quality_scale": "internal",
"requirements": ["home-assistant-frontend==20260128.6"]
"requirements": ["home-assistant-frontend==20260225.0"]
}

View File

@@ -19,6 +19,8 @@ from homeassistant.const import (
CONF_SSL,
CONF_VERIFY_SSL,
)
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.device_registry import format_mac
from homeassistant.helpers.service_info.dhcp import DhcpServiceInfo
@@ -27,6 +29,34 @@ from homeassistant.helpers.service_info.mqtt import MqttServiceInfo
from .const import DEFAULT_PORT, DOMAIN, LOGGER
async def _validate_input(hass: HomeAssistant, data: dict[str, Any]) -> Any:
"""Validate the user input allows us to connect."""
fully = FullyKiosk(
async_get_clientsession(hass),
data[CONF_HOST],
DEFAULT_PORT,
data[CONF_PASSWORD],
use_ssl=data[CONF_SSL],
verify_ssl=data[CONF_VERIFY_SSL],
)
try:
async with asyncio.timeout(15):
device_info = await fully.getDeviceInfo()
except (
ClientConnectorError,
FullyKioskError,
TimeoutError,
) as error:
LOGGER.debug(error.args, exc_info=True)
raise CannotConnect from error
except Exception as error: # pylint: disable=broad-except
LOGGER.exception("Unexpected exception")
raise UnknownError from error
return device_info
class FullyKioskConfigFlow(ConfigFlow, domain=DOMAIN):
"""Handle a config flow for Fully Kiosk Browser."""
@@ -43,58 +73,42 @@ class FullyKioskConfigFlow(ConfigFlow, domain=DOMAIN):
host: str,
user_input: dict[str, Any],
errors: dict[str, str],
description_placeholders: dict[str, str] | Any = None,
) -> ConfigFlowResult | None:
fully = FullyKiosk(
async_get_clientsession(self.hass),
host,
DEFAULT_PORT,
user_input[CONF_PASSWORD],
use_ssl=user_input[CONF_SSL],
verify_ssl=user_input[CONF_VERIFY_SSL],
)
"""Create a config entry."""
self._async_abort_entries_match({CONF_HOST: host})
try:
async with asyncio.timeout(15):
device_info = await fully.getDeviceInfo()
except (
ClientConnectorError,
FullyKioskError,
TimeoutError,
) as error:
LOGGER.debug(error.args, exc_info=True)
device_info = await _validate_input(
self.hass, {**user_input, CONF_HOST: host}
)
except CannotConnect:
errors["base"] = "cannot_connect"
description_placeholders["error_detail"] = str(error.args)
return None
except Exception as error: # noqa: BLE001
LOGGER.exception("Unexpected exception: %s", error)
except UnknownError:
errors["base"] = "unknown"
description_placeholders["error_detail"] = str(error.args)
return None
await self.async_set_unique_id(device_info["deviceID"], raise_on_progress=False)
self._abort_if_unique_id_configured(updates=user_input)
return self.async_create_entry(
title=device_info["deviceName"],
data={
CONF_HOST: host,
CONF_PASSWORD: user_input[CONF_PASSWORD],
CONF_MAC: format_mac(device_info["Mac"]),
CONF_SSL: user_input[CONF_SSL],
CONF_VERIFY_SSL: user_input[CONF_VERIFY_SSL],
},
)
else:
await self.async_set_unique_id(
device_info["deviceID"], raise_on_progress=False
)
self._abort_if_unique_id_configured(updates=user_input)
return self.async_create_entry(
title=device_info["deviceName"],
data={
CONF_HOST: host,
CONF_PASSWORD: user_input[CONF_PASSWORD],
CONF_MAC: format_mac(device_info["Mac"]),
CONF_SSL: user_input[CONF_SSL],
CONF_VERIFY_SSL: user_input[CONF_VERIFY_SSL],
},
)
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle the initial step."""
errors: dict[str, str] = {}
placeholders: dict[str, str] = {}
if user_input is not None:
result = await self._create_entry(
user_input[CONF_HOST], user_input, errors, placeholders
)
result = await self._create_entry(user_input[CONF_HOST], user_input, errors)
if result:
return result
@@ -108,7 +122,6 @@ class FullyKioskConfigFlow(ConfigFlow, domain=DOMAIN):
vol.Optional(CONF_VERIFY_SSL, default=False): bool,
}
),
description_placeholders=placeholders,
errors=errors,
)
@@ -171,3 +184,66 @@ class FullyKioskConfigFlow(ConfigFlow, domain=DOMAIN):
self.host = device_info["hostname4"]
self._discovered_device_info = device_info
return await self.async_step_discovery_confirm()
async def async_step_reconfigure(
self, user_input: dict[str, Any]
) -> ConfigFlowResult:
"""Handle reconfiguration of an existing config entry."""
errors: dict[str, str] = {}
reconf_entry = self._get_reconfigure_entry()
suggested_values = {
CONF_HOST: reconf_entry.data[CONF_HOST],
CONF_PASSWORD: reconf_entry.data[CONF_PASSWORD],
CONF_SSL: reconf_entry.data[CONF_SSL],
CONF_VERIFY_SSL: reconf_entry.data[CONF_VERIFY_SSL],
}
if user_input:
try:
device_info = await _validate_input(
self.hass,
data={
**reconf_entry.data,
**user_input,
},
)
except CannotConnect:
errors["base"] = "cannot_connect"
except UnknownError:
errors["base"] = "unknown"
else:
await self.async_set_unique_id(
device_info["deviceID"], raise_on_progress=False
)
self._abort_if_unique_id_mismatch()
return self.async_update_reload_and_abort(
reconf_entry,
data_updates={
**reconf_entry.data,
**user_input,
},
)
return self.async_show_form(
step_id="reconfigure",
data_schema=self.add_suggested_values_to_schema(
data_schema=vol.Schema(
{
vol.Required(CONF_HOST): str,
vol.Required(CONF_PASSWORD): str,
vol.Optional(CONF_SSL, default=False): bool,
vol.Optional(CONF_VERIFY_SSL, default=False): bool,
}
),
suggested_values=user_input or suggested_values,
),
errors=errors,
)
class CannotConnect(HomeAssistantError):
"""Error to indicate we cannot connect to the Fully Kiosk device."""
class UnknownError(HomeAssistantError):
"""Error to indicate an unknown error occurred."""

View File

@@ -6,11 +6,13 @@
},
"config": {
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_account%]"
"already_configured": "[%key:common::config_flow::abort::already_configured_account%]",
"reconfigure_successful": "[%key:common::config_flow::abort::reconfigure_successful%]",
"unique_id_mismatch": "Please ensure you reconfigure the same device."
},
"error": {
"cannot_connect": "Cannot connect. Details: {error_detail}",
"unknown": "Unknown. Details: {error_detail}"
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
"unknown": "[%key:common::config_flow::error::unknown%]"
},
"step": {
"discovery_confirm": {
@@ -26,6 +28,20 @@
},
"description": "Do you want to set up {name} ({host})?"
},
"reconfigure": {
"data": {
"host": "[%key:common::config_flow::data::host%]",
"password": "[%key:common::config_flow::data::password%]",
"ssl": "[%key:common::config_flow::data::ssl%]",
"verify_ssl": "[%key:common::config_flow::data::verify_ssl%]"
},
"data_description": {
"host": "The hostname or IP address of the device running your Fully Kiosk Browser application.",
"password": "[%key:component::fully_kiosk::common::data_description_password%]",
"ssl": "[%key:component::fully_kiosk::common::data_description_ssl%]",
"verify_ssl": "[%key:component::fully_kiosk::common::data_description_verify_ssl%]"
}
},
"user": {
"data": {
"host": "[%key:common::config_flow::data::host%]",

View File

@@ -207,7 +207,7 @@ class SupervisorOSUpdateEntity(HassioOSEntity, UpdateEntity):
@property
def entity_picture(self) -> str | None:
"""Return the icon of the entity."""
return "https://brands.home-assistant.io/homeassistant/icon.png"
return "/api/brands/integration/homeassistant/icon.png?placeholder=no"
@property
def release_url(self) -> str | None:
@@ -258,7 +258,7 @@ class SupervisorSupervisorUpdateEntity(HassioSupervisorEntity, UpdateEntity):
@property
def entity_picture(self) -> str | None:
"""Return the icon of the entity."""
return "https://brands.home-assistant.io/hassio/icon.png"
return "/api/brands/integration/hassio/icon.png?placeholder=no"
async def async_install(
self, version: str | None, backup: bool, **kwargs: Any
@@ -296,7 +296,7 @@ class SupervisorCoreUpdateEntity(HassioCoreEntity, UpdateEntity):
@property
def entity_picture(self) -> str | None:
"""Return the icon of the entity."""
return "https://brands.home-assistant.io/homeassistant/icon.png"
return "/api/brands/integration/homeassistant/icon.png?placeholder=no"
@property
def release_url(self) -> str | None:

View File

@@ -7,7 +7,7 @@
"integration_type": "device",
"iot_class": "local_polling",
"quality_scale": "platinum",
"requirements": ["hdfury==1.5.0"],
"requirements": ["hdfury==1.6.0"],
"zeroconf": [
{ "name": "diva-*", "type": "_http._tcp.local." },
{ "name": "vertex2-*", "type": "_http._tcp.local." },

View File

@@ -18,6 +18,7 @@ PLATFORMS = [
Platform.LIGHT,
Platform.LOCK,
Platform.SENSOR,
Platform.SIREN,
Platform.SWITCH,
Platform.VALVE,
Platform.WEATHER,

View File

@@ -0,0 +1,86 @@
"""Support for HomematicIP Cloud sirens."""
from __future__ import annotations
import logging
from typing import Any
from homematicip.base.functionalChannels import NotificationMp3SoundChannel
from homematicip.device import CombinationSignallingDevice
from homeassistant.components.siren import (
ATTR_TONE,
ATTR_VOLUME_LEVEL,
SirenEntity,
SirenEntityFeature,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from .entity import HomematicipGenericEntity
from .hap import HomematicIPConfigEntry, HomematicipHAP
_logger = logging.getLogger(__name__)
# Map tone integers to HmIP sound file strings
_TONE_TO_SOUNDFILE: dict[int, str] = {0: "INTERNAL_SOUNDFILE"}
_TONE_TO_SOUNDFILE.update({i: f"SOUNDFILE_{i:03d}" for i in range(1, 253)})
# Available tones as dict[int, str] for HA UI
AVAILABLE_TONES: dict[int, str] = {0: "Internal"}
AVAILABLE_TONES.update({i: f"Sound {i}" for i in range(1, 253)})
async def async_setup_entry(
hass: HomeAssistant,
config_entry: HomematicIPConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up the HomematicIP Cloud sirens from a config entry."""
hap = config_entry.runtime_data
async_add_entities(
HomematicipMP3Siren(hap, device)
for device in hap.home.devices
if isinstance(device, CombinationSignallingDevice)
)
class HomematicipMP3Siren(HomematicipGenericEntity, SirenEntity):
"""Representation of the HomematicIP MP3 siren (HmIP-MP3P)."""
_attr_available_tones = AVAILABLE_TONES
_attr_supported_features = (
SirenEntityFeature.TURN_ON
| SirenEntityFeature.TURN_OFF
| SirenEntityFeature.TONES
| SirenEntityFeature.VOLUME_SET
)
def __init__(
self, hap: HomematicipHAP, device: CombinationSignallingDevice
) -> None:
"""Initialize the siren entity."""
super().__init__(hap, device, post="Siren", channel=1, is_multi_channel=False)
@property
def _func_channel(self) -> NotificationMp3SoundChannel:
return self._device.functionalChannels[self._channel]
@property
def is_on(self) -> bool:
"""Return true if siren is playing."""
return self._func_channel.playingFileActive
async def async_turn_on(self, **kwargs: Any) -> None:
"""Turn the siren on."""
tone = kwargs.get(ATTR_TONE, 0)
volume_level = kwargs.get(ATTR_VOLUME_LEVEL, 1.0)
sound_file = _TONE_TO_SOUNDFILE.get(tone, "INTERNAL_SOUNDFILE")
await self._func_channel.set_sound_file_volume_level_async(
sound_file=sound_file, volume_level=volume_level
)
async def async_turn_off(self, **kwargs: Any) -> None:
"""Turn the siren off."""
await self._func_channel.stop_sound_async()

View File

@@ -1,7 +1,7 @@
{
"domain": "homevolt",
"name": "Homevolt",
"codeowners": ["@danielhiversen"],
"codeowners": ["@danielhiversen", "@liudger"],
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/homevolt",
"integration_type": "device",

View File

@@ -13,7 +13,7 @@
"requirements": [
"xknx==3.15.0",
"xknxproject==3.8.2",
"knx-frontend==2026.2.13.222258"
"knx-frontend==2026.2.25.165736"
],
"single_config_entry": true
}

View File

@@ -219,7 +219,7 @@ async def library_payload(hass):
)
for child in library_info.children:
child.thumbnail = "https://brands.home-assistant.io/_/kodi/logo.png"
child.thumbnail = "/api/brands/integration/kodi/logo.png"
with contextlib.suppress(BrowseError):
item = await media_source.async_browse_media(

View File

@@ -353,14 +353,13 @@ def _register_panel(
kwargs = {
"frontend_url_path": url_path,
"require_admin": config[CONF_REQUIRE_ADMIN],
"show_in_sidebar": config[CONF_SHOW_IN_SIDEBAR],
"sidebar_title": config[CONF_TITLE],
"sidebar_icon": config.get(CONF_ICON, DEFAULT_ICON),
"config": {"mode": mode},
"update": update,
}
if config[CONF_SHOW_IN_SIDEBAR]:
kwargs["sidebar_title"] = config[CONF_TITLE]
kwargs["sidebar_icon"] = config.get(CONF_ICON, DEFAULT_ICON)
frontend.async_register_built_in_panel(hass, DOMAIN, **kwargs)

View File

@@ -42,7 +42,7 @@ async def async_get_media_browser_root_object(
media_class=MediaClass.APP,
media_content_id="",
media_content_type=DOMAIN,
thumbnail="https://brands.home-assistant.io/_/lovelace/logo.png",
thumbnail="/api/brands/integration/lovelace/logo.png",
can_play=False,
can_expand=True,
)
@@ -72,7 +72,7 @@ async def async_browse_media(
media_class=MediaClass.APP,
media_content_id=DEFAULT_DASHBOARD,
media_content_type=DOMAIN,
thumbnail="https://brands.home-assistant.io/_/lovelace/logo.png",
thumbnail="/api/brands/integration/lovelace/logo.png",
can_play=True,
can_expand=False,
)
@@ -104,7 +104,7 @@ async def async_browse_media(
media_class=MediaClass.APP,
media_content_id=f"{info['url_path']}/{view['path']}",
media_content_type=DOMAIN,
thumbnail="https://brands.home-assistant.io/_/lovelace/logo.png",
thumbnail="/api/brands/integration/lovelace/logo.png",
can_play=True,
can_expand=False,
)
@@ -213,7 +213,7 @@ def _item_from_info(info: dict) -> BrowseMedia:
media_class=MediaClass.APP,
media_content_id=info["url_path"],
media_content_type=DOMAIN,
thumbnail="https://brands.home-assistant.io/_/lovelace/logo.png",
thumbnail="/api/brands/integration/lovelace/logo.png",
can_play=True,
can_expand=len(info["views"]) > 1,
)

View File

@@ -69,34 +69,37 @@ class MatterValve(MatterEntity, ValveEntity):
def _update_from_device(self) -> None:
"""Update from device."""
self._calculate_features()
current_state: int
self._attr_is_opening = False
self._attr_is_closing = False
current_state: int | None
current_state = self.get_matter_attribute_value(
ValveConfigurationAndControl.Attributes.CurrentState
)
target_state: int
target_state: int | None
target_state = self.get_matter_attribute_value(
ValveConfigurationAndControl.Attributes.TargetState
)
if (
current_state == ValveStateEnum.kTransitioning
and target_state == ValveStateEnum.kOpen
if current_state is None:
self._attr_is_closed = None
elif current_state == ValveStateEnum.kTransitioning and (
target_state == ValveStateEnum.kOpen
):
self._attr_is_opening = True
self._attr_is_closing = False
elif (
current_state == ValveStateEnum.kTransitioning
and target_state == ValveStateEnum.kClosed
self._attr_is_closed = None
elif current_state == ValveStateEnum.kTransitioning and (
target_state == ValveStateEnum.kClosed
):
self._attr_is_opening = False
self._attr_is_closing = True
self._attr_is_closed = None
elif current_state == ValveStateEnum.kClosed:
self._attr_is_opening = False
self._attr_is_closing = False
self._attr_is_closed = True
else:
self._attr_is_opening = False
self._attr_is_closing = False
elif current_state == ValveStateEnum.kOpen:
self._attr_is_closed = False
else:
self._attr_is_closed = None
# handle optional position
if self.supported_features & ValveEntityFeature.SET_POSITION:
self._attr_current_valve_position = self.get_matter_attribute_value(
@@ -145,6 +148,7 @@ DISCOVERY_SCHEMAS = [
ValveConfigurationAndControl.Attributes.CurrentState,
ValveConfigurationAndControl.Attributes.TargetState,
),
allow_none_value=True,
optional_attributes=(ValveConfigurationAndControl.Attributes.CurrentLevel,),
device_type=(device_types.WaterValve,),
),

View File

@@ -83,7 +83,7 @@ class MediaSourceItem:
identifier=None,
media_class=MediaClass.APP,
media_content_type=MediaType.APP,
thumbnail=f"https://brands.home-assistant.io/_/{source.domain}/logo.png",
thumbnail=f"/api/brands/integration/{source.domain}/logo.png",
title=source.name,
can_play=False,
can_expand=True,

View File

@@ -20,11 +20,11 @@ from .coordinator import NintendoParentalControlsConfigEntry, NintendoUpdateCoor
from .services import async_setup_services
_PLATFORMS: list[Platform] = [
Platform.SENSOR,
Platform.TIME,
Platform.SWITCH,
Platform.NUMBER,
Platform.SELECT,
Platform.SENSOR,
Platform.SWITCH,
Platform.TIME,
]
PLATFORM_SCHEMA = cv.config_entry_only_config_schema(DOMAIN)

View File

@@ -23,7 +23,7 @@ async def async_get_media_browser_root_object(
media_class=MediaClass.APP,
media_content_id="",
media_content_type="plex",
thumbnail="https://brands.home-assistant.io/_/plex/logo.png",
thumbnail="/api/brands/integration/plex/logo.png",
can_play=False,
can_expand=True,
)

View File

@@ -94,7 +94,7 @@ def browse_media( # noqa: C901
can_expand=True,
children=[],
children_media_class=MediaClass.DIRECTORY,
thumbnail="https://brands.home-assistant.io/_/plex/logo.png",
thumbnail="/api/brands/integration/plex/logo.png",
)
if platform != "sonos":
server_info.children.append(

View File

@@ -29,9 +29,9 @@ from .services import async_setup_services
_PLATFORMS: list[Platform] = [
Platform.BINARY_SENSOR,
Platform.BUTTON,
Platform.SENSOR,
Platform.SWITCH,
Platform.BUTTON,
]
CONFIG_SCHEMA = cv.config_entry_only_config_schema(DOMAIN)

View File

@@ -41,8 +41,8 @@ class PortainerSwitchEntityDescription(SwitchEntityDescription):
"""Class to hold Portainer switch description."""
is_on_fn: Callable[[PortainerContainerData], bool | None]
turn_on_fn: Callable[[str, Portainer, int, str], Coroutine[Any, Any, None]]
turn_off_fn: Callable[[str, Portainer, int, str], Coroutine[Any, Any, None]]
turn_on_fn: Callable[[Portainer], Callable[[int, str], Coroutine[Any, Any, None]]]
turn_off_fn: Callable[[Portainer], Callable[[int, str], Coroutine[Any, Any, None]]]
@dataclass(frozen=True, kw_only=True)
@@ -50,53 +50,20 @@ class PortainerStackSwitchEntityDescription(SwitchEntityDescription):
"""Class to hold Portainer stack switch description."""
is_on_fn: Callable[[PortainerStackData], bool | None]
turn_on_fn: Callable[[str, Portainer, int, int], Coroutine[Any, Any, None]]
turn_off_fn: Callable[[str, Portainer, int, int], Coroutine[Any, Any, None]]
turn_on_fn: Callable[[Portainer], Callable[..., Coroutine[Any, Any, Any]]]
turn_off_fn: Callable[[Portainer], Callable[..., Coroutine[Any, Any, Any]]]
PARALLEL_UPDATES = 1
async def perform_container_action(
action: str, portainer: Portainer, endpoint_id: int, container_id: str
async def _perform_action(
coordinator: PortainerCoordinator,
coroutine: Coroutine[Any, Any, Any],
) -> None:
"""Perform an action on a container."""
"""Perform a Portainer action with error handling and coordinator refresh."""
try:
match action:
case "start":
await portainer.start_container(endpoint_id, container_id)
case "stop":
await portainer.stop_container(endpoint_id, container_id)
except PortainerAuthenticationError as err:
raise HomeAssistantError(
translation_domain=DOMAIN,
translation_key="invalid_auth",
translation_placeholders={"error": repr(err)},
) from err
except PortainerConnectionError as err:
raise HomeAssistantError(
translation_domain=DOMAIN,
translation_key="cannot_connect",
translation_placeholders={"error": repr(err)},
) from err
except PortainerTimeoutError as err:
raise HomeAssistantError(
translation_domain=DOMAIN,
translation_key="timeout_connect",
translation_placeholders={"error": repr(err)},
) from err
async def perform_stack_action(
action: str, portainer: Portainer, endpoint_id: int, stack_id: int
) -> None:
"""Perform an action on a stack."""
try:
match action:
case "start":
await portainer.start_stack(stack_id, endpoint_id)
case "stop":
await portainer.stop_stack(stack_id, endpoint_id)
await coroutine
except PortainerAuthenticationError as err:
raise HomeAssistantError(
translation_domain=DOMAIN,
@@ -112,6 +79,8 @@ async def perform_stack_action(
translation_domain=DOMAIN,
translation_key="timeout_connect_no_details",
) from err
else:
await coordinator.async_request_refresh()
CONTAINER_SWITCHES: tuple[PortainerSwitchEntityDescription, ...] = (
@@ -120,8 +89,8 @@ CONTAINER_SWITCHES: tuple[PortainerSwitchEntityDescription, ...] = (
translation_key="container",
device_class=SwitchDeviceClass.SWITCH,
is_on_fn=lambda data: data.container.state == "running",
turn_on_fn=perform_container_action,
turn_off_fn=perform_container_action,
turn_on_fn=lambda portainer: portainer.start_container,
turn_off_fn=lambda portainer: portainer.stop_container,
),
)
@@ -131,8 +100,8 @@ STACK_SWITCHES: tuple[PortainerStackSwitchEntityDescription, ...] = (
translation_key="stack",
device_class=SwitchDeviceClass.SWITCH,
is_on_fn=lambda data: data.stack.status == STACK_STATUS_ACTIVE,
turn_on_fn=perform_stack_action,
turn_off_fn=perform_stack_action,
turn_on_fn=lambda portainer: portainer.start_stack,
turn_off_fn=lambda portainer: portainer.stop_stack,
),
)
@@ -218,23 +187,21 @@ class PortainerContainerSwitch(PortainerContainerEntity, SwitchEntity):
async def async_turn_on(self, **kwargs: Any) -> None:
"""Start (turn on) the container."""
await self.entity_description.turn_on_fn(
"start",
self.coordinator.portainer,
self.endpoint_id,
self.container_data.container.id,
await _perform_action(
self.coordinator,
self.entity_description.turn_on_fn(self.coordinator.portainer)(
self.endpoint_id, self.container_data.container.id
),
)
await self.coordinator.async_request_refresh()
async def async_turn_off(self, **kwargs: Any) -> None:
"""Stop (turn off) the container."""
await self.entity_description.turn_off_fn(
"stop",
self.coordinator.portainer,
self.endpoint_id,
self.container_data.container.id,
await _perform_action(
self.coordinator,
self.entity_description.turn_off_fn(self.coordinator.portainer)(
self.endpoint_id, self.container_data.container.id
),
)
await self.coordinator.async_request_refresh()
class PortainerStackSwitch(PortainerStackEntity, SwitchEntity):
@@ -262,20 +229,18 @@ class PortainerStackSwitch(PortainerStackEntity, SwitchEntity):
async def async_turn_on(self, **kwargs: Any) -> None:
"""Start (turn on) the stack."""
await self.entity_description.turn_on_fn(
"start",
self.coordinator.portainer,
self.endpoint_id,
self.stack_data.stack.id,
await _perform_action(
self.coordinator,
self.entity_description.turn_on_fn(self.coordinator.portainer)(
self.endpoint_id, self.stack_data.stack.id
),
)
await self.coordinator.async_request_refresh()
async def async_turn_off(self, **kwargs: Any) -> None:
"""Stop (turn off) the stack."""
await self.entity_description.turn_off_fn(
"stop",
self.coordinator.portainer,
self.endpoint_id,
self.stack_data.stack.id,
await _perform_action(
self.coordinator,
self.entity_description.turn_off_fn(self.coordinator.portainer)(
self.endpoint_id, self.stack_data.stack.id
),
)
await self.coordinator.async_request_refresh()

View File

@@ -131,7 +131,7 @@ async def root_payload(
)
for child in children:
child.thumbnail = "https://brands.home-assistant.io/_/roku/logo.png"
child.thumbnail = "/api/brands/integration/roku/logo.png"
try:
browse_item = await media_source.async_browse_media(hass, None)

View File

@@ -35,7 +35,7 @@ async def _root_payload(
media_class=MediaClass.DIRECTORY,
media_content_id="",
media_content_type="presets",
thumbnail="https://brands.home-assistant.io/_/russound_rio/logo.png",
thumbnail="/api/brands/integration/russound_rio/logo.png",
can_play=False,
can_expand=True,
)

View File

@@ -100,6 +100,7 @@ class SmartTubOnline(SmartTubOnboardSensorBase, BinarySensorEntity):
_attr_device_class = BinarySensorDeviceClass.CONNECTIVITY
# This seems to be very noisy and not generally useful, so disable by default.
_attr_entity_registry_enabled_default = False
_attr_translation_key = "online"
def __init__(
self, coordinator: DataUpdateCoordinator[dict[str, Any]], spa: Spa
@@ -117,6 +118,7 @@ class SmartTubReminder(SmartTubEntity, BinarySensorEntity):
"""Reminders for maintenance actions."""
_attr_device_class = BinarySensorDeviceClass.PROBLEM
_attr_translation_key = "reminder"
def __init__(
self,
@@ -132,6 +134,9 @@ class SmartTubReminder(SmartTubEntity, BinarySensorEntity):
)
self.reminder_id = reminder.id
self._attr_unique_id = f"{spa.id}-reminder-{reminder.id}"
self._attr_translation_placeholders = {
"reminder_name": reminder.name.title(),
}
@property
def reminder(self) -> SpaReminder:
@@ -169,6 +174,7 @@ class SmartTubError(SmartTubEntity, BinarySensorEntity):
"""
_attr_device_class = BinarySensorDeviceClass.PROBLEM
_attr_translation_key = "error"
def __init__(
self, coordinator: DataUpdateCoordinator[dict[str, Any]], spa: Spa
@@ -213,6 +219,7 @@ class SmartTubCoverSensor(SmartTubExternalSensorBase, BinarySensorEntity):
"""Wireless magnetic cover sensor."""
_attr_device_class = BinarySensorDeviceClass.OPENING
_attr_translation_key = "cover_sensor"
@property
def is_on(self) -> bool:

View File

@@ -74,6 +74,7 @@ class SmartTubThermostat(SmartTubEntity, ClimateEntity):
_attr_min_temp = DEFAULT_MIN_TEMP
_attr_max_temp = DEFAULT_MAX_TEMP
_attr_preset_modes = list(PRESET_MODES.values())
_attr_translation_key = "thermostat"
def __init__(
self, coordinator: DataUpdateCoordinator[dict[str, Any]], spa: Spa

View File

@@ -17,6 +17,8 @@ from .helpers import get_spa_name
class SmartTubEntity(CoordinatorEntity):
"""Base class for SmartTub entities."""
_attr_has_entity_name = True
def __init__(
self,
coordinator: DataUpdateCoordinator[dict[str, Any]],
@@ -36,9 +38,8 @@ class SmartTubEntity(CoordinatorEntity):
identifiers={(DOMAIN, spa.id)},
manufacturer=spa.brand,
model=spa.model,
name=get_spa_name(spa),
)
spa_name = get_spa_name(self.spa)
self._attr_name = f"{spa_name} {entity_name}"
@property
def spa_status(self) -> SpaState:
@@ -70,6 +71,8 @@ class SmartTubOnboardSensorBase(SmartTubEntity):
class SmartTubExternalSensorBase(SmartTubEntity):
"""Class for additional BLE wireless sensors sold separately."""
_attr_translation_key = "external_sensor"
def __init__(
self,
coordinator: DataUpdateCoordinator[dict[str, Any]],
@@ -77,12 +80,21 @@ class SmartTubExternalSensorBase(SmartTubEntity):
sensor: SpaSensor,
) -> None:
"""Initialize the external sensor entity."""
super().__init__(coordinator, spa, self._sensor_key(sensor))
self.sensor_address = sensor.address
self._attr_unique_id = f"{spa.id}-externalsensor-{sensor.address}"
super().__init__(coordinator, spa, self._human_readable_name(sensor))
self._attr_translation_placeholders = {
"sensor_name": self._human_readable_name(sensor),
}
@staticmethod
def _sensor_key(sensor: SpaSensor) -> str:
"""Return a key for the sensor suitable for unique_id generation."""
return sensor.name.strip("{}").replace("-", "_")
@staticmethod
def _human_readable_name(sensor: SpaSensor) -> str:
"""Return a human-readable name for the sensor."""
return " ".join(
word.capitalize() for word in sensor.name.strip("{}").split("-")
)

View File

@@ -19,7 +19,6 @@ from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
from .const import ATTR_LIGHTS, DEFAULT_LIGHT_BRIGHTNESS, DEFAULT_LIGHT_EFFECT
from .controller import SmartTubConfigEntry
from .entity import SmartTubEntity
from .helpers import get_spa_name
PARALLEL_UPDATES = 0
@@ -56,8 +55,8 @@ class SmartTubLight(SmartTubEntity, LightEntity):
super().__init__(coordinator, light.spa, "light")
self.light_zone = light.zone
self._attr_unique_id = f"{super().unique_id}-{light.zone}"
spa_name = get_spa_name(self.spa)
self._attr_name = f"{spa_name} Light {light.zone}"
self._attr_translation_key = "light_zone"
self._attr_translation_placeholders = {"zone": str(light.zone)}
@property
def light(self) -> SpaLight:

View File

@@ -95,6 +95,17 @@ async def async_setup_entry(
class SmartTubBuiltinSensor(SmartTubOnboardSensorBase, SensorEntity):
"""Generic class for SmartTub status sensors."""
def __init__(
self,
coordinator: DataUpdateCoordinator[dict[str, Any]],
spa: smarttub.Spa,
sensor_name: str,
state_key: str,
) -> None:
"""Initialize the entity."""
super().__init__(coordinator, spa, sensor_name, state_key)
self._attr_translation_key = state_key
@property
def native_value(self) -> str | None:
"""Return the current state of the sensor."""
@@ -117,6 +128,7 @@ class SmartTubPrimaryFiltrationCycle(SmartTubBuiltinSensor):
super().__init__(
coordinator, spa, "Primary Filtration Cycle", "primary_filtration"
)
self._attr_translation_key = "primary_filtration_cycle"
@property
def cycle(self) -> smarttub.SpaPrimaryFiltrationCycle:
@@ -157,6 +169,7 @@ class SmartTubSecondaryFiltrationCycle(SmartTubBuiltinSensor):
super().__init__(
coordinator, spa, "Secondary Filtration Cycle", "secondary_filtration"
)
self._attr_translation_key = "secondary_filtration_cycle"
@property
def cycle(self) -> smarttub.SpaSecondaryFiltrationCycle:

View File

@@ -34,6 +34,69 @@
}
}
},
"entity": {
"binary_sensor": {
"cover_sensor": {
"name": "Cover sensor"
},
"error": {
"name": "Error"
},
"online": {
"name": "Online"
},
"reminder": {
"name": "{reminder_name} reminder"
}
},
"climate": {
"thermostat": {
"name": "Thermostat"
}
},
"light": {
"light_zone": {
"name": "Light {zone}"
}
},
"sensor": {
"blowout_cycle": {
"name": "Blowout cycle"
},
"cleanup_cycle": {
"name": "Cleanup cycle"
},
"flow_switch": {
"name": "Flow switch"
},
"ozone": {
"name": "Ozone"
},
"primary_filtration_cycle": {
"name": "Primary filtration cycle"
},
"secondary_filtration_cycle": {
"name": "Secondary filtration cycle"
},
"state": {
"name": "State"
},
"uv": {
"name": "UV"
}
},
"switch": {
"circulation_pump": {
"name": "Circulation pump"
},
"jet": {
"name": "Jet {pump_id}"
},
"pump": {
"name": "Pump {pump_id}"
}
}
},
"services": {
"reset_reminder": {
"description": "Resets the maintenance reminder on a hot tub.",

View File

@@ -13,7 +13,6 @@ from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
from .const import API_TIMEOUT, ATTR_PUMPS
from .controller import SmartTubConfigEntry
from .entity import SmartTubEntity
from .helpers import get_spa_name
PARALLEL_UPDATES = 0
@@ -47,22 +46,20 @@ class SmartTubPump(SmartTubEntity, SwitchEntity):
self.pump_id = pump.id
self.pump_type = pump.type
self._attr_unique_id = f"{super().unique_id}-{pump.id}"
if pump.type == SpaPump.PumpType.CIRCULATION:
self._attr_translation_key = "circulation_pump"
elif pump.type == SpaPump.PumpType.JET:
self._attr_translation_key = "jet"
self._attr_translation_placeholders = {"pump_id": str(pump.id)}
else:
self._attr_translation_key = "pump"
self._attr_translation_placeholders = {"pump_id": str(pump.id)}
@property
def pump(self) -> SpaPump:
"""Return the underlying SpaPump object for this entity."""
return self.coordinator.data[self.spa.id][ATTR_PUMPS][self.pump_id]
@property
def name(self) -> str:
"""Return a name for this pump entity."""
spa_name = get_spa_name(self.spa)
if self.pump_type == SpaPump.PumpType.CIRCULATION:
return f"{spa_name} Circulation Pump"
if self.pump_type == SpaPump.PumpType.JET:
return f"{spa_name} Jet {self.pump_id}"
return f"{spa_name} pump {self.pump_id}"
@property
def is_on(self) -> bool:
"""Return True if the pump is on."""

View File

@@ -2,8 +2,6 @@
from __future__ import annotations
from typing import Any
from aiopyarr.models.host_configuration import PyArrHostConfiguration
from aiopyarr.sonarr_client import SonarrClient
@@ -18,7 +16,9 @@ from homeassistant.const import (
Platform,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.typing import ConfigType
from .const import (
CONF_BASE_PATH,
@@ -35,15 +35,26 @@ from .coordinator import (
DiskSpaceDataUpdateCoordinator,
QueueDataUpdateCoordinator,
SeriesDataUpdateCoordinator,
SonarrConfigEntry,
SonarrData,
SonarrDataUpdateCoordinator,
StatusDataUpdateCoordinator,
WantedDataUpdateCoordinator,
)
from .services import async_setup_services
PLATFORMS = [Platform.SENSOR]
CONFIG_SCHEMA = cv.config_entry_only_config_schema(DOMAIN)
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the Sonarr integration."""
async_setup_services(hass)
return True
async def async_setup_entry(hass: HomeAssistant, entry: SonarrConfigEntry) -> bool:
"""Set up Sonarr from a config entry."""
if not entry.options:
options = {
@@ -65,29 +76,34 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
host_configuration=host_configuration,
session=async_get_clientsession(hass),
)
coordinators: dict[str, SonarrDataUpdateCoordinator[Any]] = {
"upcoming": CalendarDataUpdateCoordinator(
data = SonarrData(
upcoming=CalendarDataUpdateCoordinator(hass, entry, host_configuration, sonarr),
commands=CommandsDataUpdateCoordinator(hass, entry, host_configuration, sonarr),
diskspace=DiskSpaceDataUpdateCoordinator(
hass, entry, host_configuration, sonarr
),
"commands": CommandsDataUpdateCoordinator(
hass, entry, host_configuration, sonarr
),
"diskspace": DiskSpaceDataUpdateCoordinator(
hass, entry, host_configuration, sonarr
),
"queue": QueueDataUpdateCoordinator(hass, entry, host_configuration, sonarr),
"series": SeriesDataUpdateCoordinator(hass, entry, host_configuration, sonarr),
"status": StatusDataUpdateCoordinator(hass, entry, host_configuration, sonarr),
"wanted": WantedDataUpdateCoordinator(hass, entry, host_configuration, sonarr),
}
queue=QueueDataUpdateCoordinator(hass, entry, host_configuration, sonarr),
series=SeriesDataUpdateCoordinator(hass, entry, host_configuration, sonarr),
status=StatusDataUpdateCoordinator(hass, entry, host_configuration, sonarr),
wanted=WantedDataUpdateCoordinator(hass, entry, host_configuration, sonarr),
)
# Temporary, until we add diagnostic entities
_version = None
for coordinator in coordinators.values():
coordinators: list[SonarrDataUpdateCoordinator] = [
data.upcoming,
data.commands,
data.diskspace,
data.queue,
data.series,
data.status,
data.wanted,
]
for coordinator in coordinators:
await coordinator.async_config_entry_first_refresh()
if isinstance(coordinator, StatusDataUpdateCoordinator):
_version = coordinator.data.version
coordinator.system_version = _version
hass.data.setdefault(DOMAIN, {})[entry.entry_id] = coordinators
entry.runtime_data = data
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
return True
@@ -117,11 +133,6 @@ async def async_migrate_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
return True
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
async def async_unload_entry(hass: HomeAssistant, entry: SonarrConfigEntry) -> bool:
"""Unload a config entry."""
unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
if unload_ok:
hass.data[DOMAIN].pop(entry.entry_id)
return unload_ok
return await hass.config_entries.async_unload_platforms(entry, PLATFORMS)

View File

@@ -1,8 +1,9 @@
"""Constants for Sonarr."""
import logging
from typing import Final
DOMAIN = "sonarr"
DOMAIN: Final = "sonarr"
# Config Keys
CONF_BASE_PATH = "base_path"
@@ -17,5 +18,20 @@ DEFAULT_NAME = "Sonarr"
DEFAULT_UPCOMING_DAYS = 1
DEFAULT_VERIFY_SSL = False
DEFAULT_WANTED_MAX_ITEMS = 50
DEFAULT_MAX_RECORDS: Final = 20
LOGGER = logging.getLogger(__package__)
# Service names
SERVICE_GET_SERIES: Final = "get_series"
SERVICE_GET_EPISODES: Final = "get_episodes"
SERVICE_GET_QUEUE: Final = "get_queue"
SERVICE_GET_DISKSPACE: Final = "get_diskspace"
SERVICE_GET_UPCOMING: Final = "get_upcoming"
SERVICE_GET_WANTED: Final = "get_wanted"
# Service attributes
ATTR_SHOWS: Final = "shows"
ATTR_DISKS: Final = "disks"
ATTR_EPISODES: Final = "episodes"
ATTR_ENTRY_ID: Final = "entry_id"

View File

@@ -2,6 +2,7 @@
from __future__ import annotations
from dataclasses import dataclass
from datetime import timedelta
from typing import TypeVar, cast
@@ -40,15 +41,31 @@ SonarrDataT = TypeVar(
)
@dataclass
class SonarrData:
"""Sonarr data type."""
upcoming: CalendarDataUpdateCoordinator
commands: CommandsDataUpdateCoordinator
diskspace: DiskSpaceDataUpdateCoordinator
queue: QueueDataUpdateCoordinator
series: SeriesDataUpdateCoordinator
status: StatusDataUpdateCoordinator
wanted: WantedDataUpdateCoordinator
type SonarrConfigEntry = ConfigEntry[SonarrData]
class SonarrDataUpdateCoordinator(DataUpdateCoordinator[SonarrDataT]):
"""Data update coordinator for the Sonarr integration."""
config_entry: ConfigEntry
config_entry: SonarrConfigEntry
def __init__(
self,
hass: HomeAssistant,
config_entry: ConfigEntry,
config_entry: SonarrConfigEntry,
host_configuration: PyArrHostConfiguration,
api_client: SonarrClient,
) -> None:

View File

@@ -0,0 +1,416 @@
"""Helper functions for Sonarr."""
from typing import Any
from aiopyarr import (
Diskspace,
SonarrCalendar,
SonarrEpisode,
SonarrQueue,
SonarrSeries,
SonarrWantedMissing,
)
def format_queue_item(item: Any, base_url: str | None = None) -> dict[str, Any]:
"""Format a single queue item."""
# Calculate progress
remaining = 1 if item.size == 0 else item.sizeleft / item.size
remaining_pct = 100 * (1 - remaining)
result: dict[str, Any] = {
"id": item.id,
"series_id": getattr(item, "seriesId", None),
"episode_id": getattr(item, "episodeId", None),
"title": item.series.title,
"download_title": item.title,
"season_number": getattr(item, "seasonNumber", None),
"progress": f"{remaining_pct:.2f}%",
"size": item.size,
"size_left": item.sizeleft,
"status": item.status,
"tracked_download_status": getattr(item, "trackedDownloadStatus", None),
"tracked_download_state": getattr(item, "trackedDownloadState", None),
"download_client": getattr(item, "downloadClient", None),
"download_id": getattr(item, "downloadId", None),
"indexer": getattr(item, "indexer", None),
"protocol": str(getattr(item, "protocol", None)),
"episode_has_file": getattr(item, "episodeHasFile", None),
"estimated_completion_time": str(
getattr(item, "estimatedCompletionTime", None)
),
"time_left": str(getattr(item, "timeleft", None)),
}
# Add episode information from the episode object if available
if episode := getattr(item, "episode", None):
result["episode_number"] = getattr(episode, "episodeNumber", None)
result["episode_title"] = getattr(episode, "title", None)
# Add formatted identifier like the sensor uses (if we have both season and episode)
if result["season_number"] is not None and result["episode_number"] is not None:
result["episode_identifier"] = (
f"S{result['season_number']:02d}E{result['episode_number']:02d}"
)
# Add quality information if available
if quality := getattr(item, "quality", None):
result["quality"] = quality.quality.name
# Add language information if available
if languages := getattr(item, "languages", None):
result["languages"] = [lang["name"] for lang in languages]
# Add custom format score if available
if custom_format_score := getattr(item, "customFormatScore", None):
result["custom_format_score"] = custom_format_score
# Add series images if available
if images := getattr(item.series, "images", None):
result["images"] = {}
for image in images:
cover_type = image.coverType
# Prefer remoteUrl (public TVDB URL) over local path
if remote_url := getattr(image, "remoteUrl", None):
result["images"][cover_type] = remote_url
elif base_url and (url := getattr(image, "url", None)):
result["images"][cover_type] = f"{base_url.rstrip('/')}{url}"
return result
def format_queue(
queue: SonarrQueue, base_url: str | None = None
) -> dict[str, dict[str, Any]]:
"""Format queue for service response."""
# Group queue items by download ID to handle season packs
downloads: dict[str, list[Any]] = {}
for item in queue.records:
download_id = getattr(item, "downloadId", None)
if download_id:
if download_id not in downloads:
downloads[download_id] = []
downloads[download_id].append(item)
shows = {}
for items in downloads.values():
if len(items) == 1:
# Single episode download
item = items[0]
shows[item.title] = format_queue_item(item, base_url)
else:
# Multiple episodes (season pack) - use first item for main data
item = items[0]
formatted = format_queue_item(item, base_url)
# Get all episode numbers for this download
episode_numbers = sorted(
getattr(i.episode, "episodeNumber", 0)
for i in items
if hasattr(i, "episode")
)
# Format as season pack
if episode_numbers:
min_ep = min(episode_numbers)
max_ep = max(episode_numbers)
formatted["is_season_pack"] = True
formatted["episode_count"] = len(episode_numbers)
formatted["episode_range"] = f"E{min_ep:02d}-E{max_ep:02d}"
# Update identifier to show it's a season pack
if formatted.get("season_number") is not None:
formatted["episode_identifier"] = (
f"S{formatted['season_number']:02d} "
f"({len(episode_numbers)} episodes)"
)
shows[item.title] = formatted
return shows
def format_episode_item(
series: SonarrSeries, episode_data: dict[str, Any], base_url: str | None = None
) -> dict[str, Any]:
"""Format a single episode item."""
result: dict[str, Any] = {
"id": episode_data.get("id"),
"episode_number": episode_data.get("episodeNumber"),
"season_number": episode_data.get("seasonNumber"),
"title": episode_data.get("title"),
"air_date": str(episode_data.get("airDate", "")),
"overview": episode_data.get("overview"),
"has_file": episode_data.get("hasFile", False),
"monitored": episode_data.get("monitored", False),
}
# Add episode images if available
if images := episode_data.get("images"):
result["images"] = {}
for image in images:
cover_type = image.coverType
# Prefer remoteUrl (public TVDB URL) over local path
if remote_url := getattr(image, "remoteUrl", None):
result["images"][cover_type] = remote_url
elif base_url and (url := getattr(image, "url", None)):
result["images"][cover_type] = f"{base_url.rstrip('/')}{url}"
return result
def format_series(
series_list: list[SonarrSeries], base_url: str | None = None
) -> dict[str, dict[str, Any]]:
"""Format series list for service response."""
formatted_shows = {}
for series in series_list:
series_title = series.title
formatted_shows[series_title] = {
"id": series.id,
"year": series.year,
"tvdb_id": getattr(series, "tvdbId", None),
"imdb_id": getattr(series, "imdbId", None),
"status": series.status,
"monitored": series.monitored,
}
# Add episode statistics if available (like the sensor shows)
if statistics := getattr(series, "statistics", None):
episode_file_count = getattr(statistics, "episodeFileCount", None)
episode_count = getattr(statistics, "episodeCount", None)
formatted_shows[series_title]["episode_file_count"] = episode_file_count
formatted_shows[series_title]["episode_count"] = episode_count
# Only format episodes_info if we have valid data
if episode_file_count is not None and episode_count is not None:
formatted_shows[series_title]["episodes_info"] = (
f"{episode_file_count}/{episode_count} Episodes"
)
else:
formatted_shows[series_title]["episodes_info"] = None
# Add series images if available
if images := getattr(series, "images", None):
images_dict: dict[str, str] = {}
for image in images:
cover_type = image.coverType
# Prefer remoteUrl (public TVDB URL) over local path
if remote_url := getattr(image, "remoteUrl", None):
images_dict[cover_type] = remote_url
elif base_url and (url := getattr(image, "url", None)):
images_dict[cover_type] = f"{base_url.rstrip('/')}{url}"
formatted_shows[series_title]["images"] = images_dict
return formatted_shows
# Space unit conversion factors (divisors from bytes)
SPACE_UNITS: dict[str, int] = {
"bytes": 1,
"kb": 1000,
"kib": 1024,
"mb": 1000**2,
"mib": 1024**2,
"gb": 1000**3,
"gib": 1024**3,
"tb": 1000**4,
"tib": 1024**4,
"pb": 1000**5,
"pib": 1024**5,
}
def format_diskspace(
disks: list[Diskspace], space_unit: str = "bytes"
) -> dict[str, dict[str, Any]]:
"""Format diskspace for service response.
Args:
disks: List of disk space objects from Sonarr.
space_unit: Unit for space values (bytes, kb, kib, mb, mib, gb, gib, tb, tib, pb, pib).
Returns:
Dictionary of disk information keyed by path.
"""
result = {}
divisor = SPACE_UNITS.get(space_unit, 1)
for disk in disks:
path = disk.path
free_space = disk.freeSpace / divisor
total_space = disk.totalSpace / divisor
result[path] = {
"path": path,
"label": getattr(disk, "label", None) or "",
"free_space": free_space,
"total_space": total_space,
"unit": space_unit,
}
return result
def _format_series_images(series: Any, base_url: str | None = None) -> dict[str, str]:
"""Format series images."""
images_dict: dict[str, str] = {}
if images := getattr(series, "images", None):
for image in images:
cover_type = image.coverType
# Prefer remoteUrl (public TVDB URL) over local path
if remote_url := getattr(image, "remoteUrl", None):
images_dict[cover_type] = remote_url
elif base_url and (url := getattr(image, "url", None)):
images_dict[cover_type] = f"{base_url.rstrip('/')}{url}"
return images_dict
def format_upcoming_item(
episode: SonarrCalendar, base_url: str | None = None
) -> dict[str, Any]:
"""Format a single upcoming episode item."""
result: dict[str, Any] = {
"id": episode.id,
"series_id": episode.seriesId,
"season_number": episode.seasonNumber,
"episode_number": episode.episodeNumber,
"episode_identifier": f"S{episode.seasonNumber:02d}E{episode.episodeNumber:02d}",
"title": episode.title,
"air_date": str(getattr(episode, "airDate", None)),
"air_date_utc": str(getattr(episode, "airDateUtc", None)),
"overview": getattr(episode, "overview", None),
"has_file": getattr(episode, "hasFile", False),
"monitored": getattr(episode, "monitored", True),
"runtime": getattr(episode, "runtime", None),
"finale_type": getattr(episode, "finaleType", None),
}
# Add series information
if series := getattr(episode, "series", None):
result["series_title"] = series.title
result["series_year"] = getattr(series, "year", None)
result["series_tvdb_id"] = getattr(series, "tvdbId", None)
result["series_imdb_id"] = getattr(series, "imdbId", None)
result["series_status"] = getattr(series, "status", None)
result["network"] = getattr(series, "network", None)
result["images"] = _format_series_images(series, base_url)
return result
def format_upcoming(
calendar: list[SonarrCalendar], base_url: str | None = None
) -> dict[str, dict[str, Any]]:
"""Format upcoming calendar for service response."""
episodes = {}
for episode in calendar:
# Create a unique key combining series title and episode identifier
series_title = episode.series.title if hasattr(episode, "series") else "Unknown"
identifier = f"S{episode.seasonNumber:02d}E{episode.episodeNumber:02d}"
key = f"{series_title} {identifier}"
episodes[key] = format_upcoming_item(episode, base_url)
return episodes
def format_wanted_item(item: Any, base_url: str | None = None) -> dict[str, Any]:
"""Format a single wanted episode item."""
result: dict[str, Any] = {
"id": item.id,
"series_id": item.seriesId,
"season_number": item.seasonNumber,
"episode_number": item.episodeNumber,
"episode_identifier": f"S{item.seasonNumber:02d}E{item.episodeNumber:02d}",
"title": item.title,
"air_date": str(getattr(item, "airDate", None)),
"air_date_utc": str(getattr(item, "airDateUtc", None)),
"overview": getattr(item, "overview", None),
"has_file": getattr(item, "hasFile", False),
"monitored": getattr(item, "monitored", True),
"runtime": getattr(item, "runtime", None),
"tvdb_id": getattr(item, "tvdbId", None),
}
# Add series information
if series := getattr(item, "series", None):
result["series_title"] = series.title
result["series_year"] = getattr(series, "year", None)
result["series_tvdb_id"] = getattr(series, "tvdbId", None)
result["series_imdb_id"] = getattr(series, "imdbId", None)
result["series_status"] = getattr(series, "status", None)
result["network"] = getattr(series, "network", None)
result["images"] = _format_series_images(series, base_url)
return result
def format_wanted(
wanted: SonarrWantedMissing, base_url: str | None = None
) -> dict[str, dict[str, Any]]:
"""Format wanted missing episodes for service response."""
episodes = {}
for item in wanted.records:
# Create a unique key combining series title and episode identifier
series_title = (
item.series.title if hasattr(item, "series") and item.series else "Unknown"
)
identifier = f"S{item.seasonNumber:02d}E{item.episodeNumber:02d}"
key = f"{series_title} {identifier}"
episodes[key] = format_wanted_item(item, base_url)
return episodes
def format_episode(episode: SonarrEpisode) -> dict[str, Any]:
"""Format a single episode from a series."""
result: dict[str, Any] = {
"id": episode.id,
"series_id": episode.seriesId,
"tvdb_id": getattr(episode, "tvdbId", None),
"season_number": episode.seasonNumber,
"episode_number": episode.episodeNumber,
"episode_identifier": f"S{episode.seasonNumber:02d}E{episode.episodeNumber:02d}",
"title": episode.title,
"air_date": str(getattr(episode, "airDate", None)),
"air_date_utc": str(getattr(episode, "airDateUtc", None)),
"has_file": getattr(episode, "hasFile", False),
"monitored": getattr(episode, "monitored", False),
"runtime": getattr(episode, "runtime", None),
"episode_file_id": getattr(episode, "episodeFileId", None),
}
# Add overview if available (not always present)
if overview := getattr(episode, "overview", None):
result["overview"] = overview
# Add finale type if applicable
if finale_type := getattr(episode, "finaleType", None):
result["finale_type"] = finale_type
return result
def format_episodes(
episodes: list[SonarrEpisode], season_number: int | None = None
) -> dict[str, dict[str, Any]]:
"""Format episodes list for service response.
Args:
episodes: List of episodes to format.
season_number: Optional season number to filter by.
Returns:
Dictionary of episodes keyed by episode identifier (e.g., "S01E01").
"""
result = {}
for episode in episodes:
# Filter by season if specified
if season_number is not None and episode.seasonNumber != season_number:
continue
identifier = f"S{episode.seasonNumber:02d}E{episode.episodeNumber:02d}"
result[identifier] = format_episode(episode)
return result

View File

@@ -20,5 +20,25 @@
"default": "mdi:television"
}
}
},
"services": {
"get_diskspace": {
"service": "mdi:harddisk"
},
"get_episodes": {
"service": "mdi:filmstrip"
},
"get_queue": {
"service": "mdi:download"
},
"get_series": {
"service": "mdi:television"
},
"get_upcoming": {
"service": "mdi:calendar-clock"
},
"get_wanted": {
"service": "mdi:magnify"
}
}
}

View File

@@ -20,15 +20,13 @@ from homeassistant.components.sensor import (
SensorEntity,
SensorEntityDescription,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import UnitOfInformation
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from homeassistant.helpers.typing import StateType
from homeassistant.util import dt as dt_util
from .const import DOMAIN
from .coordinator import SonarrDataT, SonarrDataUpdateCoordinator
from .coordinator import SonarrConfigEntry, SonarrDataT, SonarrDataUpdateCoordinator
from .entity import SonarrEntity
@@ -40,7 +38,7 @@ class SonarrSensorEntityDescriptionMixIn(Generic[SonarrDataT]):
value_fn: Callable[[SonarrDataT], StateType]
@dataclass(frozen=True)
@dataclass(frozen=True, kw_only=True)
class SonarrSensorEntityDescription(
SensorEntityDescription, SonarrSensorEntityDescriptionMixIn[SonarrDataT]
):
@@ -143,15 +141,12 @@ SENSOR_TYPES: dict[str, SonarrSensorEntityDescription[Any]] = {
async def async_setup_entry(
hass: HomeAssistant,
entry: ConfigEntry,
entry: SonarrConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up Sonarr sensors based on a config entry."""
coordinators: dict[str, SonarrDataUpdateCoordinator[Any]] = hass.data[DOMAIN][
entry.entry_id
]
async_add_entities(
SonarrSensor(coordinators[coordinator_type], description)
SonarrSensor(getattr(entry.runtime_data, coordinator_type), description)
for coordinator_type, description in SENSOR_TYPES.items()
)
@@ -162,6 +157,7 @@ class SonarrSensor(SonarrEntity[SonarrDataT], SensorEntity):
coordinator: SonarrDataUpdateCoordinator[SonarrDataT]
entity_description: SonarrSensorEntityDescription[SonarrDataT]
# Note: Sensor extra_state_attributes are deprecated and will be removed in 2026.9
@property
def extra_state_attributes(self) -> dict[str, str]:
"""Return the state attributes of the entity."""

View File

@@ -0,0 +1,284 @@
"""Define services for the Sonarr integration."""
from collections.abc import Awaitable, Callable
from datetime import timedelta
from typing import Any, cast
from aiopyarr import exceptions
import voluptuous as vol
from homeassistant.config_entries import ConfigEntryState
from homeassistant.const import CONF_URL
from homeassistant.core import HomeAssistant, ServiceCall, SupportsResponse, callback
from homeassistant.exceptions import HomeAssistantError, ServiceValidationError
from homeassistant.helpers import selector
from homeassistant.util import dt as dt_util
from .const import (
ATTR_DISKS,
ATTR_ENTRY_ID,
ATTR_EPISODES,
ATTR_SHOWS,
DEFAULT_UPCOMING_DAYS,
DOMAIN,
SERVICE_GET_DISKSPACE,
SERVICE_GET_EPISODES,
SERVICE_GET_QUEUE,
SERVICE_GET_SERIES,
SERVICE_GET_UPCOMING,
SERVICE_GET_WANTED,
)
from .coordinator import SonarrConfigEntry
from .helpers import (
format_diskspace,
format_episodes,
format_queue,
format_series,
format_upcoming,
format_wanted,
)
# Service parameter constants
CONF_DAYS = "days"
CONF_MAX_ITEMS = "max_items"
CONF_SERIES_ID = "series_id"
CONF_SEASON_NUMBER = "season_number"
CONF_SPACE_UNIT = "space_unit"
# Valid space units
SPACE_UNITS = ["bytes", "kb", "kib", "mb", "mib", "gb", "gib", "tb", "tib", "pb", "pib"]
DEFAULT_SPACE_UNIT = "bytes"
# Default values - 0 means no limit
DEFAULT_MAX_ITEMS = 0
SERVICE_BASE_SCHEMA = vol.Schema(
{
vol.Required(ATTR_ENTRY_ID): selector.ConfigEntrySelector(
{"integration": DOMAIN}
),
}
)
SERVICE_GET_SERIES_SCHEMA = SERVICE_BASE_SCHEMA
SERVICE_GET_EPISODES_SCHEMA = SERVICE_BASE_SCHEMA.extend(
{
vol.Required(CONF_SERIES_ID): vol.All(vol.Coerce(int), vol.Range(min=1)),
vol.Optional(CONF_SEASON_NUMBER): vol.All(vol.Coerce(int), vol.Range(min=0)),
}
)
SERVICE_GET_QUEUE_SCHEMA = SERVICE_BASE_SCHEMA.extend(
{
vol.Optional(CONF_MAX_ITEMS, default=DEFAULT_MAX_ITEMS): vol.All(
vol.Coerce(int), vol.Range(min=0, max=500)
),
}
)
SERVICE_GET_DISKSPACE_SCHEMA = SERVICE_BASE_SCHEMA.extend(
{
vol.Optional(CONF_SPACE_UNIT, default=DEFAULT_SPACE_UNIT): vol.In(SPACE_UNITS),
}
)
SERVICE_GET_UPCOMING_SCHEMA = SERVICE_BASE_SCHEMA.extend(
{
vol.Optional(CONF_DAYS, default=DEFAULT_UPCOMING_DAYS): vol.All(
vol.Coerce(int), vol.Range(min=1, max=30)
),
}
)
SERVICE_GET_WANTED_SCHEMA = SERVICE_BASE_SCHEMA.extend(
{
vol.Optional(CONF_MAX_ITEMS, default=DEFAULT_MAX_ITEMS): vol.All(
vol.Coerce(int), vol.Range(min=0, max=500)
),
}
)
def _get_config_entry_from_service_data(call: ServiceCall) -> SonarrConfigEntry:
"""Return config entry for entry id."""
config_entry_id: str = call.data[ATTR_ENTRY_ID]
if not (entry := call.hass.config_entries.async_get_entry(config_entry_id)):
raise ServiceValidationError(
translation_domain=DOMAIN,
translation_key="integration_not_found",
translation_placeholders={"target": config_entry_id},
)
if entry.state is not ConfigEntryState.LOADED:
raise ServiceValidationError(
translation_domain=DOMAIN,
translation_key="not_loaded",
translation_placeholders={"target": entry.title},
)
return cast(SonarrConfigEntry, entry)
async def _handle_api_errors[_T](func: Callable[[], Awaitable[_T]]) -> _T:
"""Handle API errors and raise HomeAssistantError with user-friendly messages."""
try:
return await func()
except exceptions.ArrAuthenticationException as ex:
raise HomeAssistantError("Authentication failed for Sonarr") from ex
except exceptions.ArrConnectionException as ex:
raise HomeAssistantError("Failed to connect to Sonarr") from ex
except exceptions.ArrException as ex:
raise HomeAssistantError(f"Sonarr API error: {ex}") from ex
async def _async_get_series(service: ServiceCall) -> dict[str, Any]:
"""Get all Sonarr series."""
entry = _get_config_entry_from_service_data(service)
api_client = entry.runtime_data.status.api_client
series_list = await _handle_api_errors(api_client.async_get_series)
base_url = entry.data[CONF_URL]
shows = format_series(cast(list, series_list), base_url)
return {ATTR_SHOWS: shows}
async def _async_get_episodes(service: ServiceCall) -> dict[str, Any]:
"""Get episodes for a specific series."""
entry = _get_config_entry_from_service_data(service)
series_id: int = service.data[CONF_SERIES_ID]
season_number: int | None = service.data.get(CONF_SEASON_NUMBER)
api_client = entry.runtime_data.status.api_client
episodes = await _handle_api_errors(
lambda: api_client.async_get_episodes(series_id, series=True)
)
formatted_episodes = format_episodes(cast(list, episodes), season_number)
return {ATTR_EPISODES: formatted_episodes}
async def _async_get_queue(service: ServiceCall) -> dict[str, Any]:
"""Get Sonarr queue."""
entry = _get_config_entry_from_service_data(service)
max_items: int = service.data[CONF_MAX_ITEMS]
api_client = entry.runtime_data.status.api_client
# 0 means no limit - use a large page size to get all items
page_size = max_items if max_items > 0 else 10000
queue = await _handle_api_errors(
lambda: api_client.async_get_queue(
page_size=page_size, include_series=True, include_episode=True
)
)
base_url = entry.data[CONF_URL]
shows = format_queue(queue, base_url)
return {ATTR_SHOWS: shows}
async def _async_get_diskspace(service: ServiceCall) -> dict[str, Any]:
"""Get Sonarr diskspace information."""
entry = _get_config_entry_from_service_data(service)
space_unit: str = service.data[CONF_SPACE_UNIT]
api_client = entry.runtime_data.status.api_client
disks = await _handle_api_errors(api_client.async_get_diskspace)
return {ATTR_DISKS: format_diskspace(disks, space_unit)}
async def _async_get_upcoming(service: ServiceCall) -> dict[str, Any]:
"""Get Sonarr upcoming episodes."""
entry = _get_config_entry_from_service_data(service)
days: int = service.data[CONF_DAYS]
api_client = entry.runtime_data.status.api_client
local = dt_util.start_of_local_day().replace(microsecond=0)
start = dt_util.as_utc(local)
end = start + timedelta(days=days)
calendar = await _handle_api_errors(
lambda: api_client.async_get_calendar(
start_date=start, end_date=end, include_series=True
)
)
base_url = entry.data[CONF_URL]
episodes = format_upcoming(cast(list, calendar), base_url)
return {ATTR_EPISODES: episodes}
async def _async_get_wanted(service: ServiceCall) -> dict[str, Any]:
"""Get Sonarr wanted/missing episodes."""
entry = _get_config_entry_from_service_data(service)
max_items: int = service.data[CONF_MAX_ITEMS]
api_client = entry.runtime_data.status.api_client
# 0 means no limit - use a large page size to get all items
page_size = max_items if max_items > 0 else 10000
wanted = await _handle_api_errors(
lambda: api_client.async_get_wanted(page_size=page_size, include_series=True)
)
base_url = entry.data[CONF_URL]
episodes = format_wanted(wanted, base_url)
return {ATTR_EPISODES: episodes}
@callback
def async_setup_services(hass: HomeAssistant) -> None:
"""Register services for the Sonarr integration."""
hass.services.async_register(
DOMAIN,
SERVICE_GET_SERIES,
_async_get_series,
schema=SERVICE_GET_SERIES_SCHEMA,
supports_response=SupportsResponse.ONLY,
)
hass.services.async_register(
DOMAIN,
SERVICE_GET_EPISODES,
_async_get_episodes,
schema=SERVICE_GET_EPISODES_SCHEMA,
supports_response=SupportsResponse.ONLY,
)
hass.services.async_register(
DOMAIN,
SERVICE_GET_QUEUE,
_async_get_queue,
schema=SERVICE_GET_QUEUE_SCHEMA,
supports_response=SupportsResponse.ONLY,
)
hass.services.async_register(
DOMAIN,
SERVICE_GET_DISKSPACE,
_async_get_diskspace,
schema=SERVICE_GET_DISKSPACE_SCHEMA,
supports_response=SupportsResponse.ONLY,
)
hass.services.async_register(
DOMAIN,
SERVICE_GET_UPCOMING,
_async_get_upcoming,
schema=SERVICE_GET_UPCOMING_SCHEMA,
supports_response=SupportsResponse.ONLY,
)
hass.services.async_register(
DOMAIN,
SERVICE_GET_WANTED,
_async_get_wanted,
schema=SERVICE_GET_WANTED_SCHEMA,
supports_response=SupportsResponse.ONLY,
)

View File

@@ -0,0 +1,100 @@
get_series:
fields:
entry_id:
required: true
selector:
config_entry:
integration: sonarr
get_queue:
fields:
entry_id:
required: true
selector:
config_entry:
integration: sonarr
max_items:
required: false
default: 0
selector:
number:
min: 0
max: 500
mode: box
get_diskspace:
fields:
entry_id:
required: true
selector:
config_entry:
integration: sonarr
space_unit:
required: false
default: bytes
selector:
select:
options:
- bytes
- kb
- kib
- mb
- mib
- gb
- gib
- tb
- tib
- pb
- pib
get_upcoming:
fields:
entry_id:
required: true
selector:
config_entry:
integration: sonarr
days:
required: false
default: 1
selector:
number:
min: 1
max: 30
mode: box
get_wanted:
fields:
entry_id:
required: true
selector:
config_entry:
integration: sonarr
max_items:
required: false
default: 0
selector:
number:
min: 0
max: 500
mode: box
get_episodes:
fields:
entry_id:
required: true
selector:
config_entry:
integration: sonarr
series_id:
required: true
selector:
number:
min: 1
mode: box
season_number:
required: false
selector:
number:
min: 0
mode: box

View File

@@ -51,6 +51,14 @@
}
}
},
"exceptions": {
"integration_not_found": {
"message": "Config entry for integration \"{target}\" not found."
},
"not_loaded": {
"message": "Config entry \"{target}\" is not loaded."
}
},
"options": {
"step": {
"init": {
@@ -60,5 +68,91 @@
}
}
}
},
"services": {
"get_diskspace": {
"description": "Gets disk space information for all configured paths.",
"fields": {
"entry_id": {
"description": "ID of the config entry to use.",
"name": "Sonarr entry"
},
"space_unit": {
"description": "Unit for space values. Use binary units (kib, mib, gib, tib, pib) for 1024-based values or decimal units (kb, mb, gb, tb, pb) for 1000-based values.",
"name": "Space unit"
}
},
"name": "Get disk space"
},
"get_episodes": {
"description": "Gets episodes for a specific series.",
"fields": {
"entry_id": {
"description": "[%key:component::sonarr::services::get_diskspace::fields::entry_id::description%]",
"name": "[%key:component::sonarr::services::get_diskspace::fields::entry_id::name%]"
},
"season_number": {
"description": "Optional season number to filter episodes by.",
"name": "Season number"
},
"series_id": {
"description": "The ID of the series to get episodes for.",
"name": "Series ID"
}
},
"name": "Get episodes"
},
"get_queue": {
"description": "Gets all episodes currently in the download queue with their progress and details.",
"fields": {
"entry_id": {
"description": "[%key:component::sonarr::services::get_diskspace::fields::entry_id::description%]",
"name": "[%key:component::sonarr::services::get_diskspace::fields::entry_id::name%]"
},
"max_items": {
"description": "Maximum number of items to return (0 = no limit).",
"name": "Max items"
}
},
"name": "Get queue"
},
"get_series": {
"description": "Gets all series in Sonarr with their details and statistics.",
"fields": {
"entry_id": {
"description": "[%key:component::sonarr::services::get_diskspace::fields::entry_id::description%]",
"name": "[%key:component::sonarr::services::get_diskspace::fields::entry_id::name%]"
}
},
"name": "Get series"
},
"get_upcoming": {
"description": "Gets upcoming episodes from the calendar.",
"fields": {
"days": {
"description": "Number of days to look ahead for upcoming episodes.",
"name": "Days"
},
"entry_id": {
"description": "[%key:component::sonarr::services::get_diskspace::fields::entry_id::description%]",
"name": "[%key:component::sonarr::services::get_diskspace::fields::entry_id::name%]"
}
},
"name": "Get upcoming"
},
"get_wanted": {
"description": "Gets wanted/missing episodes that are being searched for.",
"fields": {
"entry_id": {
"description": "[%key:component::sonarr::services::get_diskspace::fields::entry_id::description%]",
"name": "[%key:component::sonarr::services::get_diskspace::fields::entry_id::name%]"
},
"max_items": {
"description": "[%key:component::sonarr::services::get_queue::fields::max_items::description%]",
"name": "[%key:component::sonarr::services::get_queue::fields::max_items::name%]"
}
},
"name": "Get wanted"
}
}
}

View File

@@ -330,7 +330,7 @@ async def root_payload(
media_class=MediaClass.DIRECTORY,
media_content_id="",
media_content_type="favorites",
thumbnail="https://brands.home-assistant.io/_/sonos/logo.png",
thumbnail="/api/brands/integration/sonos/logo.png",
can_play=False,
can_expand=True,
)
@@ -345,7 +345,7 @@ async def root_payload(
media_class=MediaClass.DIRECTORY,
media_content_id="",
media_content_type="library",
thumbnail="https://brands.home-assistant.io/_/sonos/logo.png",
thumbnail="/api/brands/integration/sonos/logo.png",
can_play=False,
can_expand=True,
)
@@ -358,7 +358,7 @@ async def root_payload(
media_class=MediaClass.APP,
media_content_id="",
media_content_type="plex",
thumbnail="https://brands.home-assistant.io/_/plex/logo.png",
thumbnail="/api/brands/integration/plex/logo.png",
can_play=False,
can_expand=True,
)

View File

@@ -212,7 +212,7 @@ async def async_browse_media(
media_class=MediaClass.APP,
media_content_id=f"{MEDIA_PLAYER_PREFIX}{config_entry.entry_id}",
media_content_type=f"{MEDIA_PLAYER_PREFIX}library",
thumbnail="https://brands.home-assistant.io/_/spotify/logo.png",
thumbnail="/api/brands/integration/spotify/logo.png",
can_play=False,
can_expand=True,
)
@@ -223,7 +223,7 @@ async def async_browse_media(
media_class=MediaClass.APP,
media_content_id=MEDIA_PLAYER_PREFIX,
media_content_type="spotify",
thumbnail="https://brands.home-assistant.io/_/spotify/logo.png",
thumbnail="/api/brands/integration/spotify/logo.png",
can_play=False,
can_expand=True,
children=children,

View File

@@ -6,4 +6,4 @@ from homeassistant.const import Platform
DOMAIN = "systemnexa2"
MANUFACTURER = "NEXA"
PLATFORMS: Final = [Platform.LIGHT, Platform.SWITCH]
PLATFORMS: Final = [Platform.LIGHT, Platform.SENSOR, Platform.SWITCH]

View File

@@ -0,0 +1,40 @@
"""Diagnostics support for System Nexa 2."""
from __future__ import annotations
from dataclasses import asdict
from typing import Any
from homeassistant.components.diagnostics import async_redact_data
from homeassistant.const import CONF_DEVICE_ID, CONF_HOST
from homeassistant.core import HomeAssistant
from .coordinator import SystemNexa2ConfigEntry
TO_REDACT = {
CONF_HOST,
CONF_DEVICE_ID,
"unique_id",
"wifi_ssid",
}
async def async_get_config_entry_diagnostics(
hass: HomeAssistant, entry: SystemNexa2ConfigEntry
) -> dict[str, Any]:
"""Return diagnostics for a config entry."""
coordinator = entry.runtime_data
return {
"config_entry": async_redact_data(dict(entry.data), TO_REDACT),
"device_info": async_redact_data(asdict(coordinator.data.info_data), TO_REDACT),
"coordinator_available": coordinator.last_update_success,
"state": coordinator.data.state,
"settings": {
name: {
"name": setting.name,
"enabled": setting.is_enabled(),
}
for name, setting in coordinator.data.on_off_settings.items()
},
}

View File

@@ -45,7 +45,7 @@ rules:
# Gold
devices: done
diagnostics: todo
diagnostics: done
discovery-update-info: done
discovery: done
docs-data-update: done

View File

@@ -0,0 +1,77 @@
"""Sensor platform for SystemNexa2 integration."""
from collections.abc import Callable
from dataclasses import dataclass
from homeassistant.components.sensor import (
SensorDeviceClass,
SensorEntity,
SensorEntityDescription,
SensorStateClass,
)
from homeassistant.const import SIGNAL_STRENGTH_DECIBELS_MILLIWATT, EntityCategory
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from .coordinator import SystemNexa2ConfigEntry, SystemNexa2DataUpdateCoordinator
from .entity import SystemNexa2Entity
PARALLEL_UPDATES = 0
@dataclass(frozen=True, kw_only=True)
class SystemNexa2SensorEntityDescription(SensorEntityDescription):
"""Describes SystemNexa2 sensor entity."""
value_fn: Callable[[SystemNexa2DataUpdateCoordinator], str | int | None]
SENSOR_DESCRIPTIONS: tuple[SystemNexa2SensorEntityDescription, ...] = (
SystemNexa2SensorEntityDescription(
key="wifi_dbm",
native_unit_of_measurement=SIGNAL_STRENGTH_DECIBELS_MILLIWATT,
device_class=SensorDeviceClass.SIGNAL_STRENGTH,
state_class=SensorStateClass.MEASUREMENT,
entity_category=EntityCategory.DIAGNOSTIC,
value_fn=lambda coordinator: coordinator.data.info_data.wifi_dbm,
entity_registry_enabled_default=False,
),
)
async def async_setup_entry(
hass: HomeAssistant,
entry: SystemNexa2ConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up sensors based on a config entry."""
coordinator = entry.runtime_data
async_add_entities(
SystemNexa2Sensor(coordinator, description)
for description in SENSOR_DESCRIPTIONS
if description.value_fn(coordinator) is not None
)
class SystemNexa2Sensor(SystemNexa2Entity, SensorEntity):
"""Representation of a SystemNexa2 sensor."""
entity_description: SystemNexa2SensorEntityDescription
def __init__(
self,
coordinator: SystemNexa2DataUpdateCoordinator,
entity_description: SystemNexa2SensorEntityDescription,
) -> None:
"""Initialize the sensor."""
super().__init__(
coordinator=coordinator,
key=entity_description.key,
)
self.entity_description = entity_description
@property
def native_value(self) -> str | int | None:
"""Return the state of the sensor."""
return self.entity_description.value_fn(self.coordinator)

View File

@@ -266,7 +266,7 @@ class StateUpdateEntity(TemplateEntity, AbstractTemplateUpdate):
# The default picture for update entities would use `self.platform.platform_name` in
# place of `template`. This does not work when creating an entity preview because
# the platform does not exist for that entity, therefore this is hardcoded as `template`.
return "https://brands.home-assistant.io/_/template/icon.png"
return "/api/brands/integration/template/icon.png"
return self._attr_entity_picture

View File

@@ -0,0 +1,87 @@
rules:
# Bronze
action-setup:
status: exempt
comment: |
No custom actions are defined. Only entity-based actions exist.
appropriate-polling: done
brands: done
common-modules: done
config-flow: done
config-flow-test-coverage: done
dependency-transparency: done
docs-actions:
status: exempt
comment: |
No custom actions are defined. Only entity-based actions exist.
docs-high-level-description: done
docs-installation-instructions: done
docs-removal-instructions: done
entity-event-setup:
status: exempt
comment: |
Integration uses coordinators for data updates, no explicit event subscriptions.
entity-unique-id: done
has-entity-name: done
runtime-data: done
test-before-configure: done
test-before-setup: done
unique-config-entry: done
# Silver
action-exceptions:
status: exempt
comment: |
No custom actions are defined. Only entity-based actions exist.
config-entry-unloading: done
docs-configuration-parameters: todo
docs-installation-parameters: done
entity-unavailable: done
integration-owner: done
log-when-unavailable:
status: done
comment: |
Handled by coordinators.
parallel-updates: done
reauthentication-flow: done
test-coverage: done
# Gold
devices: done
diagnostics: done
discovery:
status: exempt
comment: |
Cloud-based service without local discovery capabilities.
discovery-update-info:
status: exempt
comment: |
Cloud-based service without local discovery capabilities.
docs-data-update: todo
docs-examples: done
docs-known-limitations: done
docs-supported-devices: done
docs-supported-functions: done
docs-troubleshooting: done
docs-use-cases: done
dynamic-devices: done
entity-category: done
entity-device-class: done
entity-disabled-by-default: done
entity-translations: done
exception-translations:
status: todo
comment: |
Most user-facing exceptions have translations (HomeAssistantError and
ServiceValidationError use translation keys from strings.json). Remaining:
entity.py raises bare HomeAssistantError for ClientResponseError, and
coordinators raise UpdateFailed with untranslated messages.
icon-translations: done
reconfiguration-flow: todo
repair-issues: todo
stale-devices: todo
# Platinum
async-dependency: done
inject-websession: done
strict-typing: todo

View File

@@ -10,9 +10,9 @@ from .coordinator import DeviceNotFound, ToGrillConfigEntry, ToGrillCoordinator
_PLATFORMS: list[Platform] = [
Platform.EVENT,
Platform.NUMBER,
Platform.SELECT,
Platform.SENSOR,
Platform.NUMBER,
]

View File

@@ -214,7 +214,7 @@ class TTSMediaSource(MediaSource):
media_class=MediaClass.APP,
media_content_type="provider",
title=engine_instance.name,
thumbnail=f"https://brands.home-assistant.io/_/{engine_domain}/logo.png",
thumbnail=f"/api/brands/integration/{engine_domain}/logo.png",
can_play=False,
can_expand=True,
)

View File

@@ -290,9 +290,7 @@ class UpdateEntity(
Update entities return the brand icon based on the integration
domain by default.
"""
return (
f"https://brands.home-assistant.io/_/{self.platform.platform_name}/icon.png"
)
return f"/api/brands/integration/{self.platform.platform_name}/icon.png"
@cached_property
def in_progress(self) -> bool | None:

View File

@@ -14,7 +14,7 @@
"velbus-protocol"
],
"quality_scale": "silver",
"requirements": ["velbus-aio==2026.1.4"],
"requirements": ["velbus-aio==2026.2.0"],
"usb": [
{
"pid": "0B1B",

View File

@@ -411,7 +411,7 @@
},
"services": {
"get_image_url": {
"description": "Get the URL for one or more vehicle-specific images.",
"description": "Retrieves the URL for one or more vehicle-specific images.",
"fields": {
"entry": {
"description": "The entry to retrieve the vehicle images for.",

View File

@@ -4,6 +4,7 @@
"codeowners": ["@mampfes"],
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/wiffi",
"integration_type": "hub",
"iot_class": "local_push",
"loggers": ["wiffi"],
"requirements": ["wiffi==1.1.2"]

View File

@@ -4,6 +4,7 @@
"codeowners": ["@emlove"],
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/zerproc",
"integration_type": "hub",
"iot_class": "local_polling",
"loggers": ["bleak", "pyzerproc"],
"requirements": ["pyzerproc==0.4.8"]

View File

@@ -16,8 +16,8 @@ from .coordinator import ZinvoltConfigEntry, ZinvoltDeviceCoordinator
_PLATFORMS: list[Platform] = [
Platform.BINARY_SENSOR,
Platform.SENSOR,
Platform.NUMBER,
Platform.SENSOR,
]
@@ -34,7 +34,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ZinvoltConfigEntry) -> b
coordinators: dict[str, ZinvoltDeviceCoordinator] = {}
tasks = []
for battery in batteries:
coordinator = ZinvoltDeviceCoordinator(hass, entry, client, battery.identifier)
coordinator = ZinvoltDeviceCoordinator(hass, entry, client, battery)
tasks.append(coordinator.async_config_entry_first_refresh())
coordinators[battery.identifier] = coordinator
await asyncio.gather(*tasks)

View File

@@ -5,7 +5,7 @@ import logging
from zinvolt import ZinvoltClient
from zinvolt.exceptions import ZinvoltError
from zinvolt.models import BatteryState
from zinvolt.models import Battery, BatteryState
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
@@ -26,23 +26,23 @@ class ZinvoltDeviceCoordinator(DataUpdateCoordinator[BatteryState]):
hass: HomeAssistant,
config_entry: ZinvoltConfigEntry,
client: ZinvoltClient,
battery_id: str,
battery: Battery,
) -> None:
"""Initialize the Zinvolt device."""
super().__init__(
hass,
_LOGGER,
config_entry=config_entry,
name=f"Zinvolt {battery_id}",
name=f"Zinvolt {battery.identifier}",
update_interval=timedelta(minutes=5),
)
self.battery_id = battery_id
self.battery = battery
self.client = client
async def _async_update_data(self) -> BatteryState:
"""Update data from Zinvolt."""
try:
return await self.client.get_battery_status(self.battery_id)
return await self.client.get_battery_status(self.battery.identifier)
except ZinvoltError as err:
raise UpdateFailed(
translation_key="update_failed",

View File

@@ -18,6 +18,6 @@ class ZinvoltEntity(CoordinatorEntity[ZinvoltDeviceCoordinator]):
self._attr_device_info = DeviceInfo(
identifiers={(DOMAIN, coordinator.data.serial_number)},
manufacturer="Zinvolt",
name=coordinator.data.name,
name=coordinator.battery.name,
serial_number=coordinator.data.serial_number,
)

View File

@@ -126,5 +126,6 @@ class ZinvoltBatteryStateNumber(ZinvoltEntity, NumberEntity):
async def async_set_native_value(self, value: float) -> None:
"""Set the state of the sensor."""
await self.entity_description.set_value_fn(
self.coordinator.client, self.coordinator.battery_id, int(value)
self.coordinator.client, self.coordinator.battery.identifier, int(value)
)
await self.coordinator.async_request_refresh()

View File

@@ -9,8 +9,9 @@ from homeassistant.components.sensor import (
SensorDeviceClass,
SensorEntity,
SensorEntityDescription,
SensorStateClass,
)
from homeassistant.const import PERCENTAGE, EntityCategory
from homeassistant.const import PERCENTAGE, UnitOfPower
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
@@ -28,11 +29,18 @@ class ZinvoltBatteryStateDescription(SensorEntityDescription):
SENSORS: tuple[ZinvoltBatteryStateDescription, ...] = (
ZinvoltBatteryStateDescription(
key="state_of_charge",
entity_category=EntityCategory.DIAGNOSTIC,
device_class=SensorDeviceClass.BATTERY,
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=PERCENTAGE,
value_fn=lambda state: state.current_power.state_of_charge,
),
ZinvoltBatteryStateDescription(
key="power",
device_class=SensorDeviceClass.POWER,
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=UnitOfPower.WATT,
value_fn=lambda state: 0 - state.current_power.power_socket_output,
),
)

View File

@@ -8,6 +8,9 @@
"invalid_auth": "[%key:common::config_flow::error::invalid_auth%]",
"unknown": "[%key:common::config_flow::error::unknown%]"
},
"initiate_flow": {
"user": "[%key:common::config_flow::initiate_flow::account%]"
},
"step": {
"user": {
"data": {

View File

@@ -882,6 +882,11 @@ class Integration:
"""Return if the integration has translations."""
return "translations" in self._top_level_files
@cached_property
def has_branding(self) -> bool:
"""Return if the integration has brand assets."""
return "brand" in self._top_level_files
@cached_property
def has_triggers(self) -> bool:
"""Return if the integration has triggers."""

View File

@@ -40,7 +40,7 @@ habluetooth==5.8.0
hass-nabucasa==1.15.0
hassil==3.5.0
home-assistant-bluetooth==1.13.1
home-assistant-frontend==20260128.6
home-assistant-frontend==20260225.0
home-assistant-intents==2026.2.13
httpx==0.28.1
ifaddr==0.2.0
@@ -71,7 +71,7 @@ standard-telnetlib==3.13.0
typing-extensions>=4.15.0,<5.0
ulid-transform==1.5.2
urllib3>=2.0
uv==0.9.26
uv==0.10.6
voluptuous-openapi==0.2.0
voluptuous-serialize==2.7.0
voluptuous==0.15.2

View File

@@ -36,7 +36,7 @@ class HassEnforceSortedPlatformsChecker(BaseChecker):
"""Check for sorted PLATFORMS const."""
if (
isinstance(target, nodes.AssignName)
and target.name == "PLATFORMS"
and target.name in {"PLATFORMS", "_PLATFORMS"}
and isinstance(node.value, nodes.List)
):
platforms = [v.as_string() for v in node.value.elts]

View File

@@ -78,7 +78,7 @@ dependencies = [
"typing-extensions>=4.15.0,<5.0",
"ulid-transform==1.5.2",
"urllib3>=2.0",
"uv==0.9.26",
"uv==0.10.6",
"voluptuous==0.15.2",
"voluptuous-serialize==2.7.0",
"voluptuous-openapi==0.2.0",

2
requirements.txt generated
View File

@@ -54,7 +54,7 @@ standard-telnetlib==3.13.0
typing-extensions>=4.15.0,<5.0
ulid-transform==1.5.2
urllib3>=2.0
uv==0.9.26
uv==0.10.6
voluptuous-openapi==0.2.0
voluptuous-serialize==2.7.0
voluptuous==0.15.2

10
requirements_all.txt generated
View File

@@ -1192,7 +1192,7 @@ hassil==3.5.0
hdate[astral]==1.1.2
# homeassistant.components.hdfury
hdfury==1.5.0
hdfury==1.6.0
# homeassistant.components.heatmiser
heatmiserV3==2.0.4
@@ -1226,7 +1226,7 @@ hole==0.9.0
holidays==0.84
# homeassistant.components.frontend
home-assistant-frontend==20260128.6
home-assistant-frontend==20260225.0
# homeassistant.components.conversation
home-assistant-intents==2026.2.13
@@ -1374,7 +1374,7 @@ kiwiki-client==0.1.1
knocki==0.4.2
# homeassistant.components.knx
knx-frontend==2026.2.13.222258
knx-frontend==2026.2.25.165736
# homeassistant.components.konnected
konnected==1.2.0
@@ -2530,7 +2530,7 @@ python-awair==0.2.5
python-blockchain-api==0.0.2
# homeassistant.components.bsblan
python-bsblan==5.0.1
python-bsblan==5.1.0
# homeassistant.components.citybikes
python-citybikes==0.3.3
@@ -3189,7 +3189,7 @@ vegehub==0.1.26
vehicle==2.2.2
# homeassistant.components.velbus
velbus-aio==2026.1.4
velbus-aio==2026.2.0
# homeassistant.components.venstar
venstarcolortouch==0.21

View File

@@ -1062,7 +1062,7 @@ hassil==3.5.0
hdate[astral]==1.1.2
# homeassistant.components.hdfury
hdfury==1.5.0
hdfury==1.6.0
# homeassistant.components.hegel
hegel-ip-client==0.1.4
@@ -1087,7 +1087,7 @@ hole==0.9.0
holidays==0.84
# homeassistant.components.frontend
home-assistant-frontend==20260128.6
home-assistant-frontend==20260225.0
# homeassistant.components.conversation
home-assistant-intents==2026.2.13
@@ -1211,7 +1211,7 @@ kegtron-ble==1.0.2
knocki==0.4.2
# homeassistant.components.knx
knx-frontend==2026.2.13.222258
knx-frontend==2026.2.25.165736
# homeassistant.components.konnected
konnected==1.2.0
@@ -2153,7 +2153,7 @@ python-MotionMount==2.3.0
python-awair==0.2.5
# homeassistant.components.bsblan
python-bsblan==5.0.1
python-bsblan==5.1.0
# homeassistant.components.ecobee
python-ecobee-api==0.3.2
@@ -2683,7 +2683,7 @@ vegehub==0.1.26
vehicle==2.2.2
# homeassistant.components.velbus
velbus-aio==2026.1.4
velbus-aio==2026.2.0
# homeassistant.components.venstar
venstarcolortouch==0.21

View File

@@ -14,7 +14,7 @@ WORKDIR "/github/workspace"
COPY . /usr/src/homeassistant
# Uv is only needed during build
RUN --mount=from=ghcr.io/astral-sh/uv:0.9.26,source=/uv,target=/bin/uv \
RUN --mount=from=ghcr.io/astral-sh/uv:0.10.6,source=/uv,target=/bin/uv \
# Uv creates a lock file in /tmp
--mount=type=tmpfs,target=/tmp \
# Required for PyTurboJPEG

View File

@@ -62,6 +62,7 @@ NO_IOT_CLASS = [
"auth",
"automation",
"blueprint",
"brands",
"color_extractor",
"config",
"configurator",

View File

@@ -941,7 +941,6 @@ INTEGRATIONS_WITHOUT_QUALITY_SCALE_FILE = [
"template",
"tesla_fleet",
"tesla_wall_connector",
"tessie",
"tfiac",
"thermobeacon",
"thermopro",
@@ -2106,6 +2105,7 @@ NO_QUALITY_SCALE = [
"auth",
"automation",
"blueprint",
"brands",
"config",
"configurator",
"counter",

View File

@@ -40,7 +40,7 @@
'attributes': ReadOnlyDict({
'auto_update': False,
'display_precision': 0,
'entity_picture': 'https://brands.home-assistant.io/_/adguard/icon.png',
'entity_picture': '/api/brands/integration/adguard/icon.png',
'friendly_name': 'AdGuard Home',
'in_progress': False,
'installed_version': 'v0.107.50',

View File

@@ -41,7 +41,7 @@
'auto_update': False,
'device_class': 'firmware',
'display_precision': 0,
'entity_picture': 'https://brands.home-assistant.io/_/airgradient/icon.png',
'entity_picture': '/api/brands/integration/airgradient/icon.png',
'friendly_name': 'Airgradient Firmware',
'in_progress': False,
'installed_version': '3.1.1',

View File

@@ -0,0 +1,52 @@
# serializer version: 1
# name: test_cover_entities[cover.test_door-entry]
EntityRegistryEntrySnapshot({
'aliases': set({
}),
'area_id': None,
'capabilities': None,
'config_entry_id': <ANY>,
'config_subentry_id': <ANY>,
'device_class': None,
'device_id': <ANY>,
'disabled_by': None,
'domain': 'cover',
'entity_category': None,
'entity_id': 'cover.test_door',
'has_entity_name': True,
'hidden_by': None,
'icon': None,
'id': <ANY>,
'labels': set({
}),
'name': None,
'object_id_base': None,
'options': dict({
}),
'original_device_class': <CoverDeviceClass.GARAGE: 'garage'>,
'original_icon': None,
'original_name': None,
'platform': 'aladdin_connect',
'previous_unique_id': None,
'suggested_object_id': None,
'supported_features': <CoverEntityFeature: 3>,
'translation_key': None,
'unique_id': 'test_device_id-1',
'unit_of_measurement': None,
})
# ---
# name: test_cover_entities[cover.test_door-state]
StateSnapshot({
'attributes': ReadOnlyDict({
'device_class': 'garage',
'friendly_name': 'Test Door',
'supported_features': <CoverEntityFeature: 3>,
}),
'context': <ANY>,
'entity_id': 'cover.test_door',
'last_changed': <ANY>,
'last_reported': <ANY>,
'last_updated': <ANY>,
'state': 'closed',
})
# ---

View File

@@ -0,0 +1,55 @@
# serializer version: 1
# name: test_sensor_entities[sensor.test_door_battery-entry]
EntityRegistryEntrySnapshot({
'aliases': set({
}),
'area_id': None,
'capabilities': dict({
'state_class': <SensorStateClass.MEASUREMENT: 'measurement'>,
}),
'config_entry_id': <ANY>,
'config_subentry_id': <ANY>,
'device_class': None,
'device_id': <ANY>,
'disabled_by': None,
'domain': 'sensor',
'entity_category': <EntityCategory.DIAGNOSTIC: 'diagnostic'>,
'entity_id': 'sensor.test_door_battery',
'has_entity_name': True,
'hidden_by': None,
'icon': None,
'id': <ANY>,
'labels': set({
}),
'name': None,
'object_id_base': 'Battery',
'options': dict({
}),
'original_device_class': <SensorDeviceClass.BATTERY: 'battery'>,
'original_icon': None,
'original_name': 'Battery',
'platform': 'aladdin_connect',
'previous_unique_id': None,
'suggested_object_id': None,
'supported_features': 0,
'translation_key': None,
'unique_id': 'test_device_id-1-battery_level',
'unit_of_measurement': '%',
})
# ---
# name: test_sensor_entities[sensor.test_door_battery-state]
StateSnapshot({
'attributes': ReadOnlyDict({
'device_class': 'battery',
'friendly_name': 'Test Door Battery',
'state_class': <SensorStateClass.MEASUREMENT: 'measurement'>,
'unit_of_measurement': '%',
}),
'context': <ANY>,
'entity_id': 'sensor.test_door_battery',
'last_changed': <ANY>,
'last_reported': <ANY>,
'last_updated': <ANY>,
'state': '100',
})
# ---

View File

@@ -0,0 +1,135 @@
"""Tests for the Aladdin Connect cover platform."""
from unittest.mock import AsyncMock, patch
import aiohttp
from freezegun.api import FrozenDateTimeFactory
import pytest
from syrupy.assertion import SnapshotAssertion
from homeassistant.components.cover import (
DOMAIN as COVER_DOMAIN,
SERVICE_CLOSE_COVER,
SERVICE_OPEN_COVER,
)
from homeassistant.const import ATTR_ENTITY_ID, STATE_UNAVAILABLE, Platform
from homeassistant.core import HomeAssistant
from homeassistant.helpers import entity_registry as er
from . import init_integration
from tests.common import MockConfigEntry, async_fire_time_changed, snapshot_platform
ENTITY_ID = "cover.test_door"
async def _setup(hass: HomeAssistant, entry: MockConfigEntry) -> None:
"""Set up integration with only the cover platform."""
with patch("homeassistant.components.aladdin_connect.PLATFORMS", [Platform.COVER]):
await init_integration(hass, entry)
async def test_cover_entities(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
entity_registry: er.EntityRegistry,
snapshot: SnapshotAssertion,
) -> None:
"""Test the cover entity states and attributes."""
await _setup(hass, mock_config_entry)
await snapshot_platform(hass, entity_registry, snapshot, mock_config_entry.entry_id)
async def test_open_cover(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
mock_aladdin_connect_api: AsyncMock,
) -> None:
"""Test opening the cover."""
await _setup(hass, mock_config_entry)
await hass.services.async_call(
COVER_DOMAIN,
SERVICE_OPEN_COVER,
{ATTR_ENTITY_ID: ENTITY_ID},
blocking=True,
)
mock_aladdin_connect_api.open_door.assert_called_once_with("test_device_id", 1)
async def test_close_cover(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
mock_aladdin_connect_api: AsyncMock,
) -> None:
"""Test closing the cover."""
await _setup(hass, mock_config_entry)
await hass.services.async_call(
COVER_DOMAIN,
SERVICE_CLOSE_COVER,
{ATTR_ENTITY_ID: ENTITY_ID},
blocking=True,
)
mock_aladdin_connect_api.close_door.assert_called_once_with("test_device_id", 1)
@pytest.mark.parametrize(
("status", "expected_closed", "expected_opening", "expected_closing"),
[
("closed", True, False, False),
("open", False, False, False),
("opening", False, True, False),
("closing", False, False, True),
],
)
async def test_cover_states(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
mock_aladdin_connect_api: AsyncMock,
status: str,
expected_closed: bool,
expected_opening: bool,
expected_closing: bool,
) -> None:
"""Test cover state properties."""
mock_aladdin_connect_api.get_doors.return_value[0].status = status
await _setup(hass, mock_config_entry)
state = hass.states.get(ENTITY_ID)
assert state is not None
assert (state.state == "closed") == expected_closed
assert (state.state == "opening") == expected_opening
assert (state.state == "closing") == expected_closing
async def test_cover_none_status(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
mock_aladdin_connect_api: AsyncMock,
) -> None:
"""Test cover state when status is None."""
mock_aladdin_connect_api.get_doors.return_value[0].status = None
await _setup(hass, mock_config_entry)
state = hass.states.get(ENTITY_ID)
assert state is not None
assert state.state == "unknown"
async def test_cover_unavailable(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
mock_aladdin_connect_api: AsyncMock,
freezer: FrozenDateTimeFactory,
) -> None:
"""Test cover becomes unavailable when coordinator update fails."""
await _setup(hass, mock_config_entry)
state = hass.states.get(ENTITY_ID)
assert state is not None
assert state.state != STATE_UNAVAILABLE
mock_aladdin_connect_api.update_door.side_effect = aiohttp.ClientError()
freezer.tick(15)
async_fire_time_changed(hass)
await hass.async_block_till_done()
state = hass.states.get(ENTITY_ID)
assert state is not None
assert state.state == STATE_UNAVAILABLE

View File

@@ -0,0 +1,59 @@
"""Tests for the Aladdin Connect sensor platform."""
from unittest.mock import AsyncMock, patch
import aiohttp
from freezegun.api import FrozenDateTimeFactory
import pytest
from syrupy.assertion import SnapshotAssertion
from homeassistant.const import STATE_UNAVAILABLE, Platform
from homeassistant.core import HomeAssistant
from homeassistant.helpers import entity_registry as er
from . import init_integration
from tests.common import MockConfigEntry, async_fire_time_changed, snapshot_platform
ENTITY_ID = "sensor.test_door_battery"
async def _setup(hass: HomeAssistant, entry: MockConfigEntry) -> None:
"""Set up integration with only the sensor platform."""
with patch("homeassistant.components.aladdin_connect.PLATFORMS", [Platform.SENSOR]):
await init_integration(hass, entry)
@pytest.mark.usefixtures("entity_registry_enabled_by_default")
async def test_sensor_entities(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
entity_registry: er.EntityRegistry,
snapshot: SnapshotAssertion,
) -> None:
"""Test the sensor entity states and attributes."""
await _setup(hass, mock_config_entry)
await snapshot_platform(hass, entity_registry, snapshot, mock_config_entry.entry_id)
@pytest.mark.usefixtures("entity_registry_enabled_by_default")
async def test_sensor_unavailable(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
mock_aladdin_connect_api: AsyncMock,
freezer: FrozenDateTimeFactory,
) -> None:
"""Test sensor becomes unavailable when coordinator update fails."""
await _setup(hass, mock_config_entry)
state = hass.states.get(ENTITY_ID)
assert state is not None
assert state.state != STATE_UNAVAILABLE
mock_aladdin_connect_api.update_door.side_effect = aiohttp.ClientError()
freezer.tick(15)
async_fire_time_changed(hass)
await hass.async_block_till_done()
state = hass.states.get(ENTITY_ID)
assert state is not None
assert state.state == STATE_UNAVAILABLE

View File

@@ -122,12 +122,19 @@ async def test_abort_if_already_configured(
assert result["reason"] == "already_configured"
@pytest.mark.parametrize(
("endpoint_url"),
[
("@@@"),
("http://example.com"),
],
)
async def test_flow_create_not_aws_endpoint(
hass: HomeAssistant,
hass: HomeAssistant, endpoint_url: str
) -> None:
"""Test config flow with a not aws endpoint should raise an error."""
result = await _async_start_flow(
hass, USER_INPUT | {CONF_ENDPOINT_URL: "http://example.com"}
hass, USER_INPUT | {CONF_ENDPOINT_URL: endpoint_url}
)
assert result["type"] is FlowResultType.FORM

View File

@@ -0,0 +1 @@
"""Tests for the Brands integration."""

View File

@@ -0,0 +1,20 @@
"""Test configuration for the Brands integration."""
import pytest
from tests.typing import ClientSessionGenerator
@pytest.fixture
def hass_config_dir(hass_tmp_config_dir: str) -> str:
"""Use temporary config directory for brands tests."""
return hass_tmp_config_dir
@pytest.fixture
def aiohttp_client(
aiohttp_client: ClientSessionGenerator,
socket_enabled: None,
) -> ClientSessionGenerator:
"""Return aiohttp_client and allow opening sockets."""
return aiohttp_client

View File

@@ -0,0 +1,903 @@
"""Tests for the Brands integration."""
from datetime import timedelta
from http import HTTPStatus
import os
from pathlib import Path
import time
from unittest.mock import patch
from aiohttp import ClientError
from freezegun.api import FrozenDateTimeFactory
import pytest
from homeassistant.components.brands.const import (
BRANDS_CDN_URL,
CACHE_TTL,
DOMAIN,
TOKEN_CHANGE_INTERVAL,
)
from homeassistant.core import HomeAssistant
from homeassistant.loader import Integration
from homeassistant.setup import async_setup_component
from tests.common import async_fire_time_changed
from tests.test_util.aiohttp import AiohttpClientMocker
from tests.typing import ClientSessionGenerator, WebSocketGenerator
FAKE_PNG = b"\x89PNG\r\n\x1a\nfakeimagedata"
@pytest.fixture(autouse=True)
async def setup_brands(hass: HomeAssistant) -> None:
"""Set up the brands integration for all tests."""
assert await async_setup_component(hass, "http", {"http": {}})
assert await async_setup_component(hass, DOMAIN, {})
def _create_custom_integration(
hass: HomeAssistant,
domain: str,
*,
has_branding: bool = False,
) -> Integration:
"""Create a mock custom integration."""
top_level = {"__init__.py", "manifest.json"}
if has_branding:
top_level.add("brand")
return Integration(
hass,
f"custom_components.{domain}",
Path(hass.config.config_dir) / "custom_components" / domain,
{
"name": domain,
"domain": domain,
"config_flow": False,
"dependencies": [],
"requirements": [],
"version": "1.0.0",
},
top_level,
)
# ------------------------------------------------------------------
# Integration view: /api/brands/integration/{domain}/{image}
# ------------------------------------------------------------------
async def test_integration_view_serves_from_cdn(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
aioclient_mock: AiohttpClientMocker,
) -> None:
"""Test serving an integration brand image from the CDN."""
aioclient_mock.get(
f"{BRANDS_CDN_URL}/brands/hue/icon.png",
content=FAKE_PNG,
)
client = await hass_client()
resp = await client.get("/api/brands/integration/hue/icon.png")
assert resp.status == HTTPStatus.OK
assert resp.content_type == "image/png"
assert await resp.read() == FAKE_PNG
async def test_integration_view_default_placeholder_fallback(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
aioclient_mock: AiohttpClientMocker,
) -> None:
"""Test that CDN 404 serves placeholder by default."""
aioclient_mock.get(
f"{BRANDS_CDN_URL}/brands/nonexistent/icon.png",
status=HTTPStatus.NOT_FOUND,
)
aioclient_mock.get(
f"{BRANDS_CDN_URL}/_/_placeholder/icon.png",
content=FAKE_PNG,
)
client = await hass_client()
resp = await client.get("/api/brands/integration/nonexistent/icon.png")
assert resp.status == HTTPStatus.OK
assert await resp.read() == FAKE_PNG
async def test_integration_view_no_placeholder(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
aioclient_mock: AiohttpClientMocker,
) -> None:
"""Test that CDN 404 returns 404 when placeholder=no is set."""
aioclient_mock.get(
f"{BRANDS_CDN_URL}/brands/nonexistent/icon.png",
status=HTTPStatus.NOT_FOUND,
)
client = await hass_client()
resp = await client.get(
"/api/brands/integration/nonexistent/icon.png?placeholder=no"
)
assert resp.status == HTTPStatus.NOT_FOUND
async def test_integration_view_invalid_domain(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
) -> None:
"""Test that invalid domain names return 404."""
client = await hass_client()
resp = await client.get("/api/brands/integration/INVALID/icon.png")
assert resp.status == HTTPStatus.NOT_FOUND
resp = await client.get("/api/brands/integration/../etc/icon.png")
assert resp.status == HTTPStatus.NOT_FOUND
resp = await client.get("/api/brands/integration/has spaces/icon.png")
assert resp.status == HTTPStatus.NOT_FOUND
resp = await client.get("/api/brands/integration/_leading/icon.png")
assert resp.status == HTTPStatus.NOT_FOUND
resp = await client.get("/api/brands/integration/trailing_/icon.png")
assert resp.status == HTTPStatus.NOT_FOUND
resp = await client.get("/api/brands/integration/double__under/icon.png")
assert resp.status == HTTPStatus.NOT_FOUND
async def test_integration_view_invalid_image(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
) -> None:
"""Test that invalid image filenames return 404."""
client = await hass_client()
resp = await client.get("/api/brands/integration/hue/malicious.jpg")
assert resp.status == HTTPStatus.NOT_FOUND
resp = await client.get("/api/brands/integration/hue/../../etc/passwd")
assert resp.status == HTTPStatus.NOT_FOUND
resp = await client.get("/api/brands/integration/hue/notallowed.png")
assert resp.status == HTTPStatus.NOT_FOUND
async def test_integration_view_all_allowed_images(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
aioclient_mock: AiohttpClientMocker,
) -> None:
"""Test that all allowed image filenames are accepted."""
allowed = [
"icon.png",
"logo.png",
"icon@2x.png",
"logo@2x.png",
"dark_icon.png",
"dark_logo.png",
"dark_icon@2x.png",
"dark_logo@2x.png",
]
for image in allowed:
aioclient_mock.get(
f"{BRANDS_CDN_URL}/brands/hue/{image}",
content=FAKE_PNG,
)
client = await hass_client()
for image in allowed:
resp = await client.get(f"/api/brands/integration/hue/{image}")
assert resp.status == HTTPStatus.OK, f"Failed for {image}"
async def test_integration_view_cdn_error_returns_none(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
aioclient_mock: AiohttpClientMocker,
) -> None:
"""Test that CDN connection errors result in 404 with placeholder=no."""
aioclient_mock.get(
f"{BRANDS_CDN_URL}/brands/broken/icon.png",
exc=ClientError(),
)
client = await hass_client()
resp = await client.get("/api/brands/integration/broken/icon.png?placeholder=no")
assert resp.status == HTTPStatus.NOT_FOUND
async def test_integration_view_cdn_unexpected_status(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
aioclient_mock: AiohttpClientMocker,
) -> None:
"""Test that unexpected CDN status codes result in 404 with placeholder=no."""
aioclient_mock.get(
f"{BRANDS_CDN_URL}/brands/broken/icon.png",
status=HTTPStatus.INTERNAL_SERVER_ERROR,
)
client = await hass_client()
resp = await client.get("/api/brands/integration/broken/icon.png?placeholder=no")
assert resp.status == HTTPStatus.NOT_FOUND
# ------------------------------------------------------------------
# Disk caching
# ------------------------------------------------------------------
async def test_disk_cache_hit(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
aioclient_mock: AiohttpClientMocker,
) -> None:
"""Test that a second request is served from disk cache."""
aioclient_mock.get(
f"{BRANDS_CDN_URL}/brands/hue/icon.png",
content=FAKE_PNG,
)
client = await hass_client()
# First request: fetches from CDN
resp = await client.get("/api/brands/integration/hue/icon.png")
assert resp.status == HTTPStatus.OK
assert aioclient_mock.call_count == 1
# Second request: served from disk cache
resp = await client.get("/api/brands/integration/hue/icon.png")
assert resp.status == HTTPStatus.OK
assert await resp.read() == FAKE_PNG
assert aioclient_mock.call_count == 1 # No additional CDN call
async def test_disk_cache_404_marker(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
aioclient_mock: AiohttpClientMocker,
) -> None:
"""Test that 404s are cached as empty files."""
aioclient_mock.get(
f"{BRANDS_CDN_URL}/brands/nothing/icon.png",
status=HTTPStatus.NOT_FOUND,
)
client = await hass_client()
# First request: CDN returns 404, cached as empty file
resp = await client.get("/api/brands/integration/nothing/icon.png?placeholder=no")
assert resp.status == HTTPStatus.NOT_FOUND
assert aioclient_mock.call_count == 1
# Second request: served from cached 404 marker
resp = await client.get("/api/brands/integration/nothing/icon.png?placeholder=no")
assert resp.status == HTTPStatus.NOT_FOUND
assert aioclient_mock.call_count == 1 # No additional CDN call
async def test_stale_cache_triggers_background_refresh(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
aioclient_mock: AiohttpClientMocker,
) -> None:
"""Test that stale cache entries trigger background refresh."""
aioclient_mock.get(
f"{BRANDS_CDN_URL}/brands/hue/icon.png",
content=FAKE_PNG,
)
client = await hass_client()
# Prime the cache
resp = await client.get("/api/brands/integration/hue/icon.png")
assert resp.status == HTTPStatus.OK
assert aioclient_mock.call_count == 1
# Make the cache stale by backdating the file mtime
cache_path = (
Path(hass.config.cache_path(DOMAIN)) / "integrations" / "hue" / "icon.png"
)
assert cache_path.is_file()
stale_time = time.time() - CACHE_TTL - 1
os.utime(cache_path, (stale_time, stale_time))
# Request with stale cache should still return cached data
# but trigger a background refresh
resp = await client.get("/api/brands/integration/hue/icon.png")
assert resp.status == HTTPStatus.OK
assert await resp.read() == FAKE_PNG
# Wait for the background task to complete
await hass.async_block_till_done()
# Background refresh should have fetched from CDN again
assert aioclient_mock.call_count == 2
async def test_stale_cache_404_marker_with_placeholder(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
aioclient_mock: AiohttpClientMocker,
) -> None:
"""Test that stale cached 404 serves placeholder by default."""
aioclient_mock.get(
f"{BRANDS_CDN_URL}/brands/gone/icon.png",
status=HTTPStatus.NOT_FOUND,
)
aioclient_mock.get(
f"{BRANDS_CDN_URL}/_/_placeholder/icon.png",
content=FAKE_PNG,
)
client = await hass_client()
# First request caches the 404 (with placeholder=no)
resp = await client.get("/api/brands/integration/gone/icon.png?placeholder=no")
assert resp.status == HTTPStatus.NOT_FOUND
assert aioclient_mock.call_count == 1
# Make the cache stale
cache_path = (
Path(hass.config.cache_path(DOMAIN)) / "integrations" / "gone" / "icon.png"
)
assert cache_path.is_file()
stale_time = time.time() - CACHE_TTL - 1
os.utime(cache_path, (stale_time, stale_time))
# Stale 404 with default placeholder serves the placeholder
resp = await client.get("/api/brands/integration/gone/icon.png")
assert resp.status == HTTPStatus.OK
assert await resp.read() == FAKE_PNG
async def test_stale_cache_404_marker_no_placeholder(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
aioclient_mock: AiohttpClientMocker,
) -> None:
"""Test that stale cached 404 with placeholder=no returns 404."""
aioclient_mock.get(
f"{BRANDS_CDN_URL}/brands/gone/icon.png",
status=HTTPStatus.NOT_FOUND,
)
client = await hass_client()
# First request caches the 404
resp = await client.get("/api/brands/integration/gone/icon.png?placeholder=no")
assert resp.status == HTTPStatus.NOT_FOUND
assert aioclient_mock.call_count == 1
# Make the cache stale
cache_path = (
Path(hass.config.cache_path(DOMAIN)) / "integrations" / "gone" / "icon.png"
)
assert cache_path.is_file()
stale_time = time.time() - CACHE_TTL - 1
os.utime(cache_path, (stale_time, stale_time))
# Stale 404 with placeholder=no still returns 404
resp = await client.get("/api/brands/integration/gone/icon.png?placeholder=no")
assert resp.status == HTTPStatus.NOT_FOUND
# Background refresh should have been triggered
await hass.async_block_till_done()
assert aioclient_mock.call_count == 2
# ------------------------------------------------------------------
# Custom integration brand files
# ------------------------------------------------------------------
async def test_custom_integration_brand_served(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
aioclient_mock: AiohttpClientMocker,
) -> None:
"""Test that custom integration brand files are served."""
custom = _create_custom_integration(hass, "my_custom", has_branding=True)
# Create the brand file on disk
brand_dir = Path(custom.file_path) / "brand"
brand_dir.mkdir(parents=True, exist_ok=True)
(brand_dir / "icon.png").write_bytes(FAKE_PNG)
with patch(
"homeassistant.components.brands.async_get_custom_components",
return_value={"my_custom": custom},
):
client = await hass_client()
resp = await client.get("/api/brands/integration/my_custom/icon.png")
assert resp.status == HTTPStatus.OK
assert await resp.read() == FAKE_PNG
# Should not have called CDN
assert aioclient_mock.call_count == 0
async def test_custom_integration_no_brand_falls_through(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
aioclient_mock: AiohttpClientMocker,
) -> None:
"""Test that custom integration without brand falls through to CDN."""
custom = _create_custom_integration(hass, "my_custom", has_branding=False)
aioclient_mock.get(
f"{BRANDS_CDN_URL}/brands/my_custom/icon.png",
content=FAKE_PNG,
)
with patch(
"homeassistant.components.brands.async_get_custom_components",
return_value={"my_custom": custom},
):
client = await hass_client()
resp = await client.get("/api/brands/integration/my_custom/icon.png")
assert resp.status == HTTPStatus.OK
assert aioclient_mock.call_count == 1
async def test_custom_integration_brand_missing_file_falls_through(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
aioclient_mock: AiohttpClientMocker,
) -> None:
"""Test that custom integration with brand dir but missing file falls through."""
custom = _create_custom_integration(hass, "my_custom", has_branding=True)
# Create the brand directory but NOT the requested file
brand_dir = Path(custom.file_path) / "brand"
brand_dir.mkdir(parents=True, exist_ok=True)
aioclient_mock.get(
f"{BRANDS_CDN_URL}/brands/my_custom/icon.png",
content=FAKE_PNG,
)
with patch(
"homeassistant.components.brands.async_get_custom_components",
return_value={"my_custom": custom},
):
client = await hass_client()
resp = await client.get("/api/brands/integration/my_custom/icon.png")
assert resp.status == HTTPStatus.OK
assert aioclient_mock.call_count == 1
async def test_custom_integration_takes_priority_over_cache(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
aioclient_mock: AiohttpClientMocker,
) -> None:
"""Test that custom integration brand takes priority over disk cache."""
custom_png = b"\x89PNGcustom"
# Prime the CDN cache first
aioclient_mock.get(
f"{BRANDS_CDN_URL}/brands/my_custom/icon.png",
content=FAKE_PNG,
)
client = await hass_client()
resp = await client.get("/api/brands/integration/my_custom/icon.png")
assert resp.status == HTTPStatus.OK
assert await resp.read() == FAKE_PNG
# Now create a custom integration with brand
custom = _create_custom_integration(hass, "my_custom", has_branding=True)
brand_dir = Path(custom.file_path) / "brand"
brand_dir.mkdir(parents=True, exist_ok=True)
(brand_dir / "icon.png").write_bytes(custom_png)
with patch(
"homeassistant.components.brands.async_get_custom_components",
return_value={"my_custom": custom},
):
resp = await client.get("/api/brands/integration/my_custom/icon.png")
# Custom integration brand takes priority
assert resp.status == HTTPStatus.OK
assert await resp.read() == custom_png
# ------------------------------------------------------------------
# Custom integration image fallback chains
# ------------------------------------------------------------------
async def test_custom_integration_logo_falls_back_to_icon(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
aioclient_mock: AiohttpClientMocker,
) -> None:
"""Test that requesting logo.png falls back to icon.png for custom integrations."""
custom = _create_custom_integration(hass, "my_custom", has_branding=True)
brand_dir = Path(custom.file_path) / "brand"
brand_dir.mkdir(parents=True, exist_ok=True)
(brand_dir / "icon.png").write_bytes(FAKE_PNG)
with patch(
"homeassistant.components.brands.async_get_custom_components",
return_value={"my_custom": custom},
):
client = await hass_client()
resp = await client.get("/api/brands/integration/my_custom/logo.png")
assert resp.status == HTTPStatus.OK
assert await resp.read() == FAKE_PNG
assert aioclient_mock.call_count == 0
async def test_custom_integration_dark_icon_falls_back_to_icon(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
aioclient_mock: AiohttpClientMocker,
) -> None:
"""Test that dark_icon.png falls back to icon.png for custom integrations."""
custom = _create_custom_integration(hass, "my_custom", has_branding=True)
brand_dir = Path(custom.file_path) / "brand"
brand_dir.mkdir(parents=True, exist_ok=True)
(brand_dir / "icon.png").write_bytes(FAKE_PNG)
with patch(
"homeassistant.components.brands.async_get_custom_components",
return_value={"my_custom": custom},
):
client = await hass_client()
resp = await client.get("/api/brands/integration/my_custom/dark_icon.png")
assert resp.status == HTTPStatus.OK
assert await resp.read() == FAKE_PNG
assert aioclient_mock.call_count == 0
async def test_custom_integration_dark_logo_falls_back_through_chain(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
aioclient_mock: AiohttpClientMocker,
) -> None:
"""Test that dark_logo.png walks the full fallback chain."""
custom = _create_custom_integration(hass, "my_custom", has_branding=True)
brand_dir = Path(custom.file_path) / "brand"
brand_dir.mkdir(parents=True, exist_ok=True)
# Only icon.png exists; dark_logo → dark_icon → logo → icon
(brand_dir / "icon.png").write_bytes(FAKE_PNG)
with patch(
"homeassistant.components.brands.async_get_custom_components",
return_value={"my_custom": custom},
):
client = await hass_client()
resp = await client.get("/api/brands/integration/my_custom/dark_logo.png")
assert resp.status == HTTPStatus.OK
assert await resp.read() == FAKE_PNG
assert aioclient_mock.call_count == 0
async def test_custom_integration_dark_logo_prefers_dark_icon(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
aioclient_mock: AiohttpClientMocker,
) -> None:
"""Test that dark_logo.png prefers dark_icon.png over icon.png."""
dark_icon_data = b"\x89PNGdarkicon"
custom = _create_custom_integration(hass, "my_custom", has_branding=True)
brand_dir = Path(custom.file_path) / "brand"
brand_dir.mkdir(parents=True, exist_ok=True)
(brand_dir / "icon.png").write_bytes(FAKE_PNG)
(brand_dir / "dark_icon.png").write_bytes(dark_icon_data)
with patch(
"homeassistant.components.brands.async_get_custom_components",
return_value={"my_custom": custom},
):
client = await hass_client()
resp = await client.get("/api/brands/integration/my_custom/dark_logo.png")
assert resp.status == HTTPStatus.OK
assert await resp.read() == dark_icon_data
async def test_custom_integration_icon2x_falls_back_to_icon(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
aioclient_mock: AiohttpClientMocker,
) -> None:
"""Test that icon@2x.png falls back to icon.png."""
custom = _create_custom_integration(hass, "my_custom", has_branding=True)
brand_dir = Path(custom.file_path) / "brand"
brand_dir.mkdir(parents=True, exist_ok=True)
(brand_dir / "icon.png").write_bytes(FAKE_PNG)
with patch(
"homeassistant.components.brands.async_get_custom_components",
return_value={"my_custom": custom},
):
client = await hass_client()
resp = await client.get("/api/brands/integration/my_custom/icon@2x.png")
assert resp.status == HTTPStatus.OK
assert await resp.read() == FAKE_PNG
assert aioclient_mock.call_count == 0
async def test_custom_integration_logo2x_falls_back_to_logo_then_icon(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
aioclient_mock: AiohttpClientMocker,
) -> None:
"""Test that logo@2x.png falls back to logo.png then icon.png."""
logo_data = b"\x89PNGlogodata"
custom = _create_custom_integration(hass, "my_custom", has_branding=True)
brand_dir = Path(custom.file_path) / "brand"
brand_dir.mkdir(parents=True, exist_ok=True)
(brand_dir / "icon.png").write_bytes(FAKE_PNG)
(brand_dir / "logo.png").write_bytes(logo_data)
with patch(
"homeassistant.components.brands.async_get_custom_components",
return_value={"my_custom": custom},
):
client = await hass_client()
resp = await client.get("/api/brands/integration/my_custom/logo@2x.png")
assert resp.status == HTTPStatus.OK
assert await resp.read() == logo_data
async def test_custom_integration_no_fallback_match_falls_through_to_cdn(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
aioclient_mock: AiohttpClientMocker,
) -> None:
"""Test that if no fallback image exists locally, we fall through to CDN."""
custom = _create_custom_integration(hass, "my_custom", has_branding=True)
brand_dir = Path(custom.file_path) / "brand"
brand_dir.mkdir(parents=True, exist_ok=True)
# brand dir exists but is empty - no icon.png either
aioclient_mock.get(
f"{BRANDS_CDN_URL}/brands/my_custom/icon.png",
content=FAKE_PNG,
)
with patch(
"homeassistant.components.brands.async_get_custom_components",
return_value={"my_custom": custom},
):
client = await hass_client()
resp = await client.get("/api/brands/integration/my_custom/icon.png")
assert resp.status == HTTPStatus.OK
assert aioclient_mock.call_count == 1
# ------------------------------------------------------------------
# Hardware view: /api/brands/hardware/{category}/{image:.+}
# ------------------------------------------------------------------
async def test_hardware_view_serves_from_cdn(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
aioclient_mock: AiohttpClientMocker,
) -> None:
"""Test serving a hardware brand image from CDN."""
aioclient_mock.get(
f"{BRANDS_CDN_URL}/hardware/boards/green.png",
content=FAKE_PNG,
)
client = await hass_client()
resp = await client.get("/api/brands/hardware/boards/green.png")
assert resp.status == HTTPStatus.OK
assert await resp.read() == FAKE_PNG
async def test_hardware_view_invalid_category(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
) -> None:
"""Test that invalid category names return 404."""
client = await hass_client()
resp = await client.get("/api/brands/hardware/INVALID/board.png")
assert resp.status == HTTPStatus.NOT_FOUND
async def test_hardware_view_invalid_image_extension(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
) -> None:
"""Test that non-png image names return 404."""
client = await hass_client()
resp = await client.get("/api/brands/hardware/boards/image.jpg")
assert resp.status == HTTPStatus.NOT_FOUND
async def test_hardware_view_invalid_image_characters(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
) -> None:
"""Test that image names with invalid characters return 404."""
client = await hass_client()
resp = await client.get("/api/brands/hardware/boards/Bad-Name.png")
assert resp.status == HTTPStatus.NOT_FOUND
resp = await client.get("/api/brands/hardware/boards/../etc.png")
assert resp.status == HTTPStatus.NOT_FOUND
# ------------------------------------------------------------------
# CDN timeout handling
# ------------------------------------------------------------------
async def test_cdn_timeout_returns_404(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
aioclient_mock: AiohttpClientMocker,
) -> None:
"""Test that CDN timeout results in 404 with placeholder=no."""
aioclient_mock.get(
f"{BRANDS_CDN_URL}/brands/slow/icon.png",
exc=TimeoutError(),
)
client = await hass_client()
resp = await client.get("/api/brands/integration/slow/icon.png?placeholder=no")
assert resp.status == HTTPStatus.NOT_FOUND
# ------------------------------------------------------------------
# Authentication
# ------------------------------------------------------------------
async def test_authenticated_request(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
aioclient_mock: AiohttpClientMocker,
) -> None:
"""Test that authenticated requests succeed."""
aioclient_mock.get(
f"{BRANDS_CDN_URL}/brands/hue/icon.png",
content=FAKE_PNG,
)
client = await hass_client()
resp = await client.get("/api/brands/integration/hue/icon.png")
assert resp.status == HTTPStatus.OK
async def test_token_query_param_authentication(
hass: HomeAssistant,
hass_client_no_auth: ClientSessionGenerator,
aioclient_mock: AiohttpClientMocker,
) -> None:
"""Test that a valid access token in query param authenticates."""
aioclient_mock.get(
f"{BRANDS_CDN_URL}/brands/hue/icon.png",
content=FAKE_PNG,
)
token = hass.data[DOMAIN][-1]
client = await hass_client_no_auth()
resp = await client.get(f"/api/brands/integration/hue/icon.png?token={token}")
assert resp.status == HTTPStatus.OK
assert await resp.read() == FAKE_PNG
async def test_unauthenticated_request_forbidden(
hass: HomeAssistant,
hass_client_no_auth: ClientSessionGenerator,
aioclient_mock: AiohttpClientMocker,
) -> None:
"""Test that unauthenticated requests are forbidden."""
client = await hass_client_no_auth()
resp = await client.get("/api/brands/integration/hue/icon.png")
assert resp.status == HTTPStatus.FORBIDDEN
resp = await client.get("/api/brands/hardware/boards/green.png")
assert resp.status == HTTPStatus.FORBIDDEN
async def test_invalid_token_forbidden(
hass: HomeAssistant,
hass_client_no_auth: ClientSessionGenerator,
) -> None:
"""Test that an invalid access token in query param is forbidden."""
client = await hass_client_no_auth()
resp = await client.get("/api/brands/integration/hue/icon.png?token=invalid_token")
assert resp.status == HTTPStatus.FORBIDDEN
async def test_invalid_bearer_token_unauthorized(
hass: HomeAssistant,
hass_client_no_auth: ClientSessionGenerator,
) -> None:
"""Test that an invalid Bearer token returns unauthorized."""
client = await hass_client_no_auth()
resp = await client.get(
"/api/brands/integration/hue/icon.png",
headers={"Authorization": "Bearer invalid_token"},
)
assert resp.status == HTTPStatus.UNAUTHORIZED
async def test_token_rotation(
hass: HomeAssistant,
hass_client_no_auth: ClientSessionGenerator,
aioclient_mock: AiohttpClientMocker,
freezer: FrozenDateTimeFactory,
) -> None:
"""Test that access tokens rotate over time."""
aioclient_mock.get(
f"{BRANDS_CDN_URL}/brands/hue/icon.png",
content=FAKE_PNG,
)
original_token = hass.data[DOMAIN][-1]
client = await hass_client_no_auth()
# Original token works
resp = await client.get(
f"/api/brands/integration/hue/icon.png?token={original_token}"
)
assert resp.status == HTTPStatus.OK
# Trigger token rotation
freezer.tick(TOKEN_CHANGE_INTERVAL + timedelta(seconds=1))
async_fire_time_changed(hass)
await hass.async_block_till_done()
# Deque now contains a different newest token
new_token = hass.data[DOMAIN][-1]
assert new_token != original_token
# New token works
resp = await client.get(f"/api/brands/integration/hue/icon.png?token={new_token}")
assert resp.status == HTTPStatus.OK
# ------------------------------------------------------------------
# WebSocket API
# ------------------------------------------------------------------
async def test_ws_access_token(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
) -> None:
"""Test the brands/access_token WebSocket command."""
client = await hass_ws_client(hass)
await client.send_json({"id": 1, "type": "brands/access_token"})
resp = await client.receive_json()
assert resp["success"]
assert resp["result"]["token"] == hass.data[DOMAIN][-1]

View File

@@ -9,7 +9,7 @@
'media_class': 'directory',
'media_content_id': '',
'media_content_type': 'presets',
'thumbnail': 'https://brands.home-assistant.io/_/cambridge_audio/logo.png',
'thumbnail': '/api/brands/integration/cambridge_audio/logo.png',
'title': 'Presets',
}),
])

View File

@@ -2167,7 +2167,7 @@ async def test_cast_platform_browse_media(
media_class=MediaClass.APP,
media_content_id="",
media_content_type="spotify",
thumbnail="https://brands.home-assistant.io/_/spotify/logo.png",
thumbnail="/api/brands/integration/spotify/logo.png",
can_play=False,
can_expand=True,
)
@@ -2219,7 +2219,7 @@ async def test_cast_platform_browse_media(
"can_play": False,
"can_expand": True,
"can_search": False,
"thumbnail": "https://brands.home-assistant.io/_/spotify/logo.png",
"thumbnail": "/api/brands/integration/spotify/logo.png",
"children_media_class": None,
}
assert expected_child in response["result"]["children"]

Some files were not shown because too many files have changed in this diff Show More