Compare commits

..

18 Commits

Author SHA1 Message Date
farmio a385700cc4 Add optional compression to Store 2026-04-23 15:16:05 +02:00
renovate[bot] 30d362dc8e Update uv to 0.11.7 (#168864)
Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
2026-04-23 10:43:51 +02:00
Erik Montnemery 67c818c7a8 Add comment to trigger base class (#168882) 2026-04-23 10:42:07 +02:00
epenet 5927f50bd2 Use runtime_data in Huawei LTE (#168876)
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-23 09:48:45 +02:00
epenet 66d7afa442 Migrate flux_led to use HassKey for FLUX_LED_DISCOVERY (#168872)
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-23 08:56:20 +02:00
epenet 51fcdaff7a Migrate slimproto to use runtime_data (#168869)
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-23 08:55:37 +02:00
Raphael Hehl 67baec27cf unifi_access: add missing WebSocket handlers for remote_view and device_update events (#168850)
Co-authored-by: RaHehl <rahehl@users.noreply.github.com>
2026-04-23 08:50:09 +02:00
epenet d45941d648 Migrate kraken to use runtime_data (#168870)
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-23 08:24:56 +02:00
Raphael Hehl a338d04441 unifi_access: bump py-unifi-access to 1.3.0 (#168851)
Co-authored-by: RaHehl <rahehl@users.noreply.github.com>
2026-04-23 08:24:41 +02:00
epenet 69eca62446 Clean up leftover hass.data[DOMAIN] usage in keenetic_ndms2 (#168871)
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-23 08:20:48 +02:00
Franck Nijhof 507b5f1bbf Add pylint plugin to detect polling interval fields in config flows (#168849) 2026-04-22 23:41:43 +02:00
A. Gideonse ee8a15b368 Fix incorrect sensor definition for Indevolt Gen-1 devices (#168835)
Co-authored-by: Ariel Ebersberger <31776703+justanotherariel@users.noreply.github.com>
2026-04-22 22:03:13 +02:00
Erik Montnemery 7f92d88606 Replace climate-control device with thermostat in climate translations (#161419) 2026-04-22 21:02:54 +02:00
epenet cc1c5e788f Revert Tuya camera quirk changes (#168820) 2026-04-22 20:54:49 +02:00
epenet 1159946391 Bump tuya-device-handlers to 0.0.18 (#168821) 2026-04-22 20:53:37 +02:00
Erik Montnemery 46208c034e Add tests asserting air_quality condition features (#168731) 2026-04-22 20:42:42 +02:00
puddly abdd132bdc Register optimized ESPHome serial proxy transport with serialx (#168817) 2026-04-22 13:16:56 -04:00
Denis Shulyaka 1b71ef2a60 Add gpt-image-2 model support for OpenAI (#168826) 2026-04-22 18:13:04 +01:00
69 changed files with 1760 additions and 438 deletions
+56 -56
View File
@@ -9,34 +9,34 @@
},
"conditions": {
"is_cooling": {
"description": "Tests if one or more climate-control devices are cooling.",
"description": "Tests if one or more thermostats are cooling.",
"fields": {
"behavior": {
"name": "[%key:component::climate::common::condition_behavior_name%]"
}
},
"name": "Climate-control device is cooling"
"name": "Thermostat is cooling"
},
"is_drying": {
"description": "Tests if one or more climate-control devices are drying.",
"description": "Tests if one or more thermostats are drying.",
"fields": {
"behavior": {
"name": "[%key:component::climate::common::condition_behavior_name%]"
}
},
"name": "Climate-control device is drying"
"name": "Thermostat is drying"
},
"is_heating": {
"description": "Tests if one or more climate-control devices are heating.",
"description": "Tests if one or more thermostats are heating.",
"fields": {
"behavior": {
"name": "[%key:component::climate::common::condition_behavior_name%]"
}
},
"name": "Climate-control device is heating"
"name": "Thermostat is heating"
},
"is_hvac_mode": {
"description": "Tests if one or more climate-control devices are set to a specific HVAC mode.",
"description": "Tests if one or more thermostats are set to a specific HVAC mode.",
"fields": {
"behavior": {
"name": "[%key:component::climate::common::condition_behavior_name%]"
@@ -46,10 +46,10 @@
"name": "Modes"
}
},
"name": "Climate-control device HVAC mode"
"name": "Thermostat HVAC mode"
},
"is_off": {
"description": "Tests if one or more climate-control devices are off.",
"description": "Tests if one or more thermostats are off.",
"fields": {
"behavior": {
"name": "[%key:component::climate::common::condition_behavior_name%]"
@@ -58,19 +58,19 @@
"name": "[%key:component::climate::common::condition_for_name%]"
}
},
"name": "Climate-control device is off"
"name": "Thermostat is off"
},
"is_on": {
"description": "Tests if one or more climate-control devices are on.",
"description": "Tests if one or more thermostats are on.",
"fields": {
"behavior": {
"name": "[%key:component::climate::common::condition_behavior_name%]"
}
},
"name": "Climate-control device is on"
"name": "Thermostat is on"
},
"target_humidity": {
"description": "Tests the humidity setpoint of one or more climate-control devices.",
"description": "Tests the humidity setpoint of one or more thermostats.",
"fields": {
"behavior": {
"name": "[%key:component::climate::common::condition_behavior_name%]"
@@ -79,10 +79,10 @@
"name": "[%key:component::climate::common::condition_threshold_name%]"
}
},
"name": "Climate-control device target humidity"
"name": "Thermostat target humidity"
},
"target_temperature": {
"description": "Tests the temperature setpoint of one or more climate-control devices.",
"description": "Tests the temperature setpoint of one or more thermostats.",
"fields": {
"behavior": {
"name": "[%key:component::climate::common::condition_behavior_name%]"
@@ -91,7 +91,7 @@
"name": "[%key:component::climate::common::condition_threshold_name%]"
}
},
"name": "Climate-control device target temperature"
"name": "Thermostat target temperature"
}
},
"device_automation": {
@@ -288,67 +288,67 @@
},
"services": {
"set_fan_mode": {
"description": "Sets the fan mode of a climate-control device.",
"description": "Sets the fan mode of a thermostat.",
"fields": {
"fan_mode": {
"description": "Fan operation mode.",
"name": "Fan mode"
}
},
"name": "Set climate-control device fan mode"
"name": "Set thermostat fan mode"
},
"set_humidity": {
"description": "Sets the target humidity of a climate-control device.",
"description": "Sets the target humidity of a thermostat.",
"fields": {
"humidity": {
"description": "Target humidity.",
"name": "Humidity"
}
},
"name": "Set climate-control device target humidity"
"name": "Set thermostat target humidity"
},
"set_hvac_mode": {
"description": "Sets the HVAC mode of a climate-control device.",
"description": "Sets the HVAC mode of a thermostat.",
"fields": {
"hvac_mode": {
"description": "HVAC operation mode.",
"name": "HVAC mode"
}
},
"name": "Set climate-control device HVAC mode"
"name": "Set thermostat HVAC mode"
},
"set_preset_mode": {
"description": "Sets the preset mode of a climate-control device.",
"description": "Sets the preset mode of a thermostat.",
"fields": {
"preset_mode": {
"description": "Preset mode.",
"name": "Preset mode"
}
},
"name": "Set climate-control device preset mode"
"name": "Set thermostat preset mode"
},
"set_swing_horizontal_mode": {
"description": "Sets the horizontal swing mode of a climate-control device.",
"description": "Sets the horizontal swing mode of a thermostat.",
"fields": {
"swing_horizontal_mode": {
"description": "Horizontal swing operation mode.",
"name": "Horizontal swing mode"
}
},
"name": "Set climate-control device horizontal swing mode"
"name": "Set thermostat horizontal swing mode"
},
"set_swing_mode": {
"description": "Sets the swing mode of a climate-control device.",
"description": "Sets the swing mode of a thermostat.",
"fields": {
"swing_mode": {
"description": "Swing operation mode.",
"name": "Swing mode"
}
},
"name": "Set climate-control device swing mode"
"name": "Set thermostat swing mode"
},
"set_temperature": {
"description": "Sets the target temperature of a climate-control device.",
"description": "Sets the target temperature of a thermostat.",
"fields": {
"hvac_mode": {
"description": "HVAC operation mode.",
@@ -367,25 +367,25 @@
"name": "Target temperature"
}
},
"name": "Set climate-control device target temperature"
"name": "Set thermostat target temperature"
},
"toggle": {
"description": "Toggles a climate-control device on/off.",
"name": "Toggle climate-control device"
"description": "Toggles a thermostat on/off.",
"name": "Toggle thermostat"
},
"turn_off": {
"description": "Turns off a climate-control device.",
"name": "Turn off climate-control device"
"description": "Turns off a thermostat.",
"name": "Turn off thermostat"
},
"turn_on": {
"description": "Turns on a climate-control device.",
"name": "Turn on climate-control device"
"description": "Turns on a thermostat.",
"name": "Turn on thermostat"
}
},
"title": "Climate",
"triggers": {
"hvac_mode_changed": {
"description": "Triggers after the mode of one or more climate-control devices changes.",
"description": "Triggers after the mode of one or more thermostats changes.",
"fields": {
"behavior": {
"name": "[%key:component::climate::common::trigger_behavior_name%]"
@@ -398,10 +398,10 @@
"name": "Modes"
}
},
"name": "Climate-control device mode changed"
"name": "Thermostat mode changed"
},
"started_cooling": {
"description": "Triggers after one or more climate-control devices start cooling.",
"description": "Triggers after one or more thermostats start cooling.",
"fields": {
"behavior": {
"name": "[%key:component::climate::common::trigger_behavior_name%]"
@@ -410,10 +410,10 @@
"name": "[%key:component::climate::common::trigger_for_name%]"
}
},
"name": "Climate-control device started cooling"
"name": "Thermostat started cooling"
},
"started_drying": {
"description": "Triggers after one or more climate-control devices start drying.",
"description": "Triggers after one or more thermostats start drying.",
"fields": {
"behavior": {
"name": "[%key:component::climate::common::trigger_behavior_name%]"
@@ -422,10 +422,10 @@
"name": "[%key:component::climate::common::trigger_for_name%]"
}
},
"name": "Climate-control device started drying"
"name": "Thermostat started drying"
},
"started_heating": {
"description": "Triggers after one or more climate-control devices start heating.",
"description": "Triggers after one or more thermostats start heating.",
"fields": {
"behavior": {
"name": "[%key:component::climate::common::trigger_behavior_name%]"
@@ -434,19 +434,19 @@
"name": "[%key:component::climate::common::trigger_for_name%]"
}
},
"name": "Climate-control device started heating"
"name": "Thermostat started heating"
},
"target_humidity_changed": {
"description": "Triggers after the humidity setpoint of one or more climate-control devices changes.",
"description": "Triggers after the humidity setpoint of one or more thermostats changes.",
"fields": {
"threshold": {
"name": "[%key:component::climate::common::trigger_threshold_name%]"
}
},
"name": "Climate-control device target humidity changed"
"name": "Thermostat target humidity changed"
},
"target_humidity_crossed_threshold": {
"description": "Triggers after the humidity setpoint of one or more climate-control devices crosses a threshold.",
"description": "Triggers after the humidity setpoint of one or more thermostats crosses a threshold.",
"fields": {
"behavior": {
"name": "[%key:component::climate::common::trigger_behavior_name%]"
@@ -458,19 +458,19 @@
"name": "[%key:component::climate::common::trigger_threshold_name%]"
}
},
"name": "Climate-control device target humidity crossed threshold"
"name": "Thermostat target humidity crossed threshold"
},
"target_temperature_changed": {
"description": "Triggers after the temperature setpoint of one or more climate-control devices changes.",
"description": "Triggers after the temperature setpoint of one or more thermostats changes.",
"fields": {
"threshold": {
"name": "[%key:component::climate::common::trigger_threshold_name%]"
}
},
"name": "Climate-control device target temperature changed"
"name": "Thermostat target temperature changed"
},
"target_temperature_crossed_threshold": {
"description": "Triggers after the temperature setpoint of one or more climate-control devices crosses a threshold.",
"description": "Triggers after the temperature setpoint of one or more thermostats crosses a threshold.",
"fields": {
"behavior": {
"name": "[%key:component::climate::common::trigger_behavior_name%]"
@@ -482,10 +482,10 @@
"name": "[%key:component::climate::common::trigger_threshold_name%]"
}
},
"name": "Climate-control device target temperature crossed threshold"
"name": "Thermostat target temperature crossed threshold"
},
"turned_off": {
"description": "Triggers after one or more climate-control devices turn off.",
"description": "Triggers after one or more thermostats turn off.",
"fields": {
"behavior": {
"name": "[%key:component::climate::common::trigger_behavior_name%]"
@@ -494,10 +494,10 @@
"name": "[%key:component::climate::common::trigger_for_name%]"
}
},
"name": "Climate-control device turned off"
"name": "Thermostat turned off"
},
"turned_on": {
"description": "Triggers after one or more climate-control devices turn on, regardless of the mode.",
"description": "Triggers after one or more thermostats turn on, regardless of the mode.",
"fields": {
"behavior": {
"name": "[%key:component::climate::common::trigger_behavior_name%]"
@@ -506,7 +506,7 @@
"name": "[%key:component::climate::common::trigger_for_name%]"
}
},
"name": "Climate-control device turned on"
"name": "Thermostat turned on"
}
}
}
@@ -169,6 +169,8 @@ class OptionsFlowHandler(OptionsFlowWithReload):
data_schema = vol.Schema(
{
# Polling interval is user-configurable, which is no longer allowed
# pylint: disable-next=hass-config-flow-polling-field
vol.Optional(
CONF_SCAN_INTERVAL,
default=self.config_entry.options.get(
+44 -2
View File
@@ -8,18 +8,24 @@ from aioesphomeapi import APIClient, APIConnectionError
from homeassistant.components import zeroconf
from homeassistant.components.bluetooth import async_remove_scanner
from homeassistant.components.usb import (
SerialDevice,
USBDevice,
async_register_serial_port_scanner,
)
from homeassistant.const import (
CONF_HOST,
CONF_PASSWORD,
CONF_PORT,
__version__ as ha_version,
)
from homeassistant.core import HomeAssistant
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.issue_registry import async_delete_issue
from homeassistant.helpers.typing import ConfigType
from homeassistant.util import slugify
from . import assist_satellite, dashboard, ffmpeg_proxy
from . import assist_satellite, dashboard, ffmpeg_proxy, serial_proxy
from .const import CONF_BLUETOOTH_MAC_ADDRESS, CONF_NOISE_PSK, DOMAIN
from .domain_data import DomainData
from .encryption_key_storage import async_get_encryption_key_storage
@@ -34,12 +40,48 @@ CONFIG_SCHEMA = cv.config_entry_only_config_schema(DOMAIN)
CLIENT_INFO = f"Home Assistant {ha_version}"
@callback
def _async_scan_serial_ports(
hass: HomeAssistant,
) -> list[USBDevice | SerialDevice]:
"""Return serial-proxy ports exposed by connected ESPHome devices."""
ports: list[USBDevice | SerialDevice] = []
for entry in hass.config_entries.async_loaded_entries(DOMAIN):
entry_data = entry.runtime_data
if not entry_data.available:
continue
device_info = entry_data.device_info
if device_info is None:
continue
ports.extend(
SerialDevice(
device=str(serial_proxy.build_url(entry.entry_id, proxy.name)),
serial_number=(
device_info.mac_address.replace(":", "") + "-" + slugify(proxy.name)
),
manufacturer=device_info.manufacturer,
description=f"{device_info.model} ({proxy.name})",
)
for proxy in device_info.serial_proxies
)
return ports
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the esphome component."""
ffmpeg_proxy.async_setup(hass)
await assist_satellite.async_setup(hass)
await dashboard.async_setup(hass)
async_setup_websocket_api(hass)
if "usb" in hass.config.components:
async_register_serial_port_scanner(hass, _async_scan_serial_ports)
serial_proxy.set_hass_loop(hass.loop)
return True
@@ -1,7 +1,7 @@
{
"domain": "esphome",
"name": "ESPHome",
"after_dependencies": ["hassio", "zeroconf", "tag"],
"after_dependencies": ["hassio", "tag", "usb", "zeroconf"],
"codeowners": ["@jesserockz", "@kbx81", "@bdraco"],
"config_flow": true,
"dependencies": ["assist_pipeline", "bluetooth", "intent", "ffmpeg", "http"],
@@ -0,0 +1,113 @@
"""Home Assistant-aware ESPHome serial proxy URI handler for serialx."""
from __future__ import annotations
import asyncio
from typing import cast
from aioesphomeapi import APIClient
from serialx import register_uri_handler
from serialx.platforms.serial_esphome import (
ESPHomeSerial,
ESPHomeSerialTransport,
InvalidSettingsError,
)
from yarl import URL
from homeassistant.config_entries import ConfigEntryState
from homeassistant.core import HomeAssistant, async_get_hass
from .const import DOMAIN
from .entry_data import ESPHomeConfigEntry
SCHEME = "esphome-hass://"
# This is required so that serialx can safely query Core for an instance of an
# aioesphomeapi client. We cannot make any assumptions here, some packages run separate
# asyncio event loops in dedicated threads.
_HASS_LOOP: asyncio.AbstractEventLoop | None = None
def set_hass_loop(loop: asyncio.AbstractEventLoop) -> None:
"""Store a reference to the Core event loop."""
global _HASS_LOOP # noqa: PLW0603 # pylint: disable=global-statement
_HASS_LOOP = loop
def build_url(entry_id: str, port_name: str) -> URL:
"""Build a canonical `esphome-hass://` URL."""
return URL.build(
scheme="esphome-hass",
host="esphome",
path=f"/{entry_id}",
query={"port_name": port_name},
)
async def _resolve_client(entry_id: str) -> APIClient:
"""Look up the `APIClient` for a specific config entry."""
# This function is async specifically so that we can get a reference to the Home
# Assistant Core instance from its own thread
hass: HomeAssistant = async_get_hass()
entry = cast(ESPHomeConfigEntry, hass.config_entries.async_get_entry(entry_id))
if entry is None or entry.domain != DOMAIN:
raise InvalidSettingsError(f"No ESPHome config entry with id {entry_id!r}")
if entry.state is not ConfigEntryState.LOADED:
raise InvalidSettingsError(f"ESPHome config entry {entry_id!r} is not loaded")
return entry.runtime_data.client
class HassESPHomeSerial(ESPHomeSerial):
"""ESPHomeSerial that resolves an HA config entry's APIClient from the URL."""
_api: APIClient | None
_path: str | None
async def _async_open(self) -> None:
"""Resolve the HA config entry's APIClient, then open the proxy."""
if self._api is None and self._path is not None:
parsed = URL(str(self._path))
entry_id = parsed.path.lstrip("/")
if not entry_id:
raise InvalidSettingsError(
f"No ESPHome config entry id in URL {self._path!r}"
)
if "port_name" not in parsed.query:
raise InvalidSettingsError("Port name is required")
self._port_name = parsed.query["port_name"]
hass_loop = _HASS_LOOP
if hass_loop is None:
raise InvalidSettingsError(
"ESPHome integration has not registered its event loop"
)
# Fetch the `APIClient` from the Core via the appropriate event loop
self._api = await asyncio.wrap_future(
asyncio.run_coroutine_threadsafe(_resolve_client(entry_id), hass_loop)
)
self._client_loop = self._api._loop # noqa: SLF001
await super()._async_open()
class HassESPHomeSerialTransport(ESPHomeSerialTransport):
"""Transport variant that constructs :class:`HassESPHomeSerial`."""
transport_name = "esphome-hass"
_serial_cls = HassESPHomeSerial
register_uri_handler(
scheme=SCHEME,
unique_scheme=SCHEME,
sync_cls=HassESPHomeSerial,
async_transport_cls=HassESPHomeSerialTransport,
)
@@ -87,8 +87,7 @@ def async_wifi_bulb_for_host(
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the flux_led component."""
domain_data = hass.data.setdefault(DOMAIN, {})
domain_data[FLUX_LED_DISCOVERY] = []
hass.data[FLUX_LED_DISCOVERY] = []
@callback
def _async_start_background_discovery(*_: Any) -> None:
+3 -1
View File
@@ -9,8 +9,10 @@ from flux_led.const import (
COLOR_MODE_RGBW as FLUX_COLOR_MODE_RGBW,
COLOR_MODE_RGBWW as FLUX_COLOR_MODE_RGBWW,
)
from flux_led.scanner import FluxLEDDiscovery
from homeassistant.components.light import ColorMode
from homeassistant.util.hass_dict import HassKey
DOMAIN: Final = "flux_led"
@@ -34,7 +36,7 @@ DEFAULT_NETWORK_SCAN_INTERVAL: Final = 120
DEFAULT_SCAN_INTERVAL: Final = 5
DEFAULT_EFFECT_SPEED: Final = 50
FLUX_LED_DISCOVERY: Final = "flux_led_discovery"
FLUX_LED_DISCOVERY: HassKey[list[FluxLEDDiscovery]] = HassKey(DOMAIN)
FLUX_LED_EXCEPTIONS: Final = (
TimeoutError,
@@ -1,5 +1,4 @@
"""The Flux LED/MagicLight integration discovery."""
# pylint: disable=hass-use-runtime-data # Uses legacy hass.data[DOMAIN] pattern
from __future__ import annotations
@@ -154,8 +153,7 @@ def async_update_entry_from_discovery(
@callback
def async_get_discovery(hass: HomeAssistant, host: str) -> FluxLEDDiscovery | None:
"""Check if a device was already discovered via a broadcast discovery."""
discoveries: list[FluxLEDDiscovery] = hass.data[DOMAIN][FLUX_LED_DISCOVERY]
for discovery in discoveries:
for discovery in hass.data[FLUX_LED_DISCOVERY]:
if discovery[ATTR_IPADDR] == host:
return discovery
return None
@@ -164,10 +162,10 @@ def async_get_discovery(hass: HomeAssistant, host: str) -> FluxLEDDiscovery | No
@callback
def async_clear_discovery_cache(hass: HomeAssistant, host: str) -> None:
"""Clear the host from the discovery cache."""
domain_data = hass.data[DOMAIN]
discoveries: list[FluxLEDDiscovery] = domain_data[FLUX_LED_DISCOVERY]
domain_data[FLUX_LED_DISCOVERY] = [
discovery for discovery in discoveries if discovery[ATTR_IPADDR] != host
hass.data[FLUX_LED_DISCOVERY] = [
discovery
for discovery in hass.data[FLUX_LED_DISCOVERY]
if discovery[ATTR_IPADDR] != host
]
@@ -219,6 +219,8 @@ class HiveOptionsFlowHandler(OptionsFlow):
schema = vol.Schema(
{
# Polling interval is user-configurable, which is no longer allowed
# pylint: disable-next=hass-config-flow-polling-field
vol.Optional(CONF_SCAN_INTERVAL, default=self.interval): vol.All(
vol.Coerce(int), vol.Range(min=30)
)
+23 -24
View File
@@ -1,5 +1,4 @@
"""Support for Huawei LTE routers."""
# pylint: disable=hass-use-runtime-data # Uses legacy hass.data[DOMAIN] pattern
from __future__ import annotations
@@ -9,7 +8,7 @@ from contextlib import suppress
from dataclasses import dataclass, field
from datetime import timedelta
import logging
from typing import Any, NamedTuple, cast
from typing import Any, cast
from xml.parsers.expat import ExpatError
from huawei_lte_api.Client import Client
@@ -64,6 +63,7 @@ from .const import (
DEFAULT_MANUFACTURER,
DEFAULT_NOTIFY_SERVICE_NAME,
DOMAIN,
HUAWEI_LTE_CONFIG,
KEY_DEVICE_BASIC_INFORMATION,
KEY_DEVICE_INFORMATION,
KEY_DEVICE_SIGNAL,
@@ -108,7 +108,7 @@ class Router:
"""Class for router state."""
hass: HomeAssistant
config_entry: ConfigEntry
config_entry: HuaweiLteConfigEntry
connection: Connection
url: str
@@ -278,14 +278,10 @@ class Router:
self.connection.requests_session.close()
class HuaweiLteData(NamedTuple):
"""Shared state."""
hass_config: ConfigType
routers: dict[str, Router]
type HuaweiLteConfigEntry = ConfigEntry[Router]
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
async def async_setup_entry(hass: HomeAssistant, entry: HuaweiLteConfigEntry) -> bool:
"""Set up Huawei LTE component from config entry."""
url = entry.data[CONF_URL]
@@ -352,7 +348,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
return False
# Store reference to router
hass.data[DOMAIN].routers[entry.entry_id] = router
entry.runtime_data = router
# Clear all subscriptions, enabled entities will push back theirs
router.subscriptions.clear()
@@ -417,7 +413,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
CONF_NAME: entry.options.get(CONF_NAME, DEFAULT_NOTIFY_SERVICE_NAME),
CONF_RECIPIENT: entry.options.get(CONF_RECIPIENT),
},
hass.data[DOMAIN].hass_config,
hass.data[HUAWEI_LTE_CONFIG],
)
def _update_router(*_: Any) -> None:
@@ -440,15 +436,16 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
return True
async def async_unload_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> bool:
async def async_unload_entry(
hass: HomeAssistant, config_entry: HuaweiLteConfigEntry
) -> bool:
"""Unload config entry."""
# Forward config entry unload to platforms
await hass.config_entries.async_unload_platforms(config_entry, PLATFORMS)
# Forget about the router and invoke its cleanup
router = hass.data[DOMAIN].routers.pop(config_entry.entry_id)
await hass.async_add_executor_job(router.cleanup)
# Invoke router cleanup
await hass.async_add_executor_job(config_entry.runtime_data.cleanup)
return True
@@ -456,8 +453,7 @@ async def async_unload_entry(hass: HomeAssistant, config_entry: ConfigEntry) ->
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up Huawei LTE component."""
if DOMAIN not in hass.data:
hass.data[DOMAIN] = HuaweiLteData(hass_config=config, routers={})
hass.data[HUAWEI_LTE_CONFIG] = config
def service_handler(service: ServiceCall) -> None:
"""Apply a service.
@@ -465,21 +461,22 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
We key this using the router URL instead of its unique id / serial number,
because the latter is not available anywhere in the UI.
"""
routers = hass.data[DOMAIN].routers
routers = [
entry.runtime_data
for entry in hass.config_entries.async_loaded_entries(DOMAIN)
]
if url := service.data.get(CONF_URL):
router = next(
(router for router in routers.values() if router.url == url), None
)
router = next((router for router in routers if router.url == url), None)
elif not routers:
_LOGGER.error("%s: no routers configured", service.service)
return
elif len(routers) == 1:
router = next(iter(routers.values()))
router = routers[0]
else:
_LOGGER.error(
"%s: more than one router configured, must specify one of URLs %s",
service.service,
sorted(router.url for router in routers.values()),
sorted(router.url for router in routers),
)
return
if not router:
@@ -509,7 +506,9 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
return True
async def async_migrate_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> bool:
async def async_migrate_entry(
hass: HomeAssistant, config_entry: HuaweiLteConfigEntry
) -> bool:
"""Migrate config entry to new version."""
if config_entry.version == 1:
options = dict(config_entry.options)
@@ -12,13 +12,12 @@ from homeassistant.components.binary_sensor import (
BinarySensorDeviceClass,
BinarySensorEntity,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity import Entity
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from . import HuaweiLteConfigEntry
from .const import (
DOMAIN,
KEY_MONITORING_CHECK_NOTIFICATIONS,
KEY_MONITORING_STATUS,
KEY_WLAN_WIFI_FEATURE_SWITCH,
@@ -30,13 +29,11 @@ _LOGGER = logging.getLogger(__name__)
async def async_setup_entry(
hass: HomeAssistant,
config_entry: ConfigEntry,
config_entry: HuaweiLteConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up from config entry."""
# Uses legacy hass.data[DOMAIN] pattern
# pylint: disable-next=hass-use-runtime-data
router = hass.data[DOMAIN].routers[config_entry.entry_id]
router = config_entry.runtime_data
entities: list[Entity] = []
if router.data.get(KEY_MONITORING_STATUS):
@@ -11,12 +11,11 @@ from homeassistant.components.button import (
ButtonEntity,
ButtonEntityDescription,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import EntityCategory
from homeassistant.core import HomeAssistant
from homeassistant.helpers import entity_platform
from .const import DOMAIN
from . import HuaweiLteConfigEntry
from .entity import HuaweiLteBaseEntityWithDevice
_LOGGER = logging.getLogger(__name__)
@@ -24,13 +23,11 @@ _LOGGER = logging.getLogger(__name__)
async def async_setup_entry(
hass: HomeAssistant,
config_entry: ConfigEntry,
config_entry: HuaweiLteConfigEntry,
async_add_entities: entity_platform.AddConfigEntryEntitiesCallback,
) -> None:
"""Set up Huawei LTE buttons."""
# Uses legacy hass.data[DOMAIN] pattern
# pylint: disable-next=hass-use-runtime-data
router = hass.data[DOMAIN].routers[config_entry.entry_id]
router = config_entry.runtime_data
buttons = [
ClearTrafficStatisticsButton(router),
RestartButton(router),
@@ -21,12 +21,7 @@ from requests.exceptions import SSLError, Timeout
from url_normalize import url_normalize
import voluptuous as vol
from homeassistant.config_entries import (
ConfigEntry,
ConfigFlow,
ConfigFlowResult,
OptionsFlow,
)
from homeassistant.config_entries import ConfigFlow, ConfigFlowResult, OptionsFlow
from homeassistant.const import (
CONF_MAC,
CONF_NAME,
@@ -47,6 +42,7 @@ from homeassistant.helpers.service_info.ssdp import (
SsdpServiceInfo,
)
from . import HuaweiLteConfigEntry
from .const import (
CONF_MANUFACTURER,
CONF_TRACK_WIRED_CLIENTS,
@@ -76,7 +72,7 @@ class HuaweiLteConfigFlow(ConfigFlow, domain=DOMAIN):
@staticmethod
@callback
def async_get_options_flow(
config_entry: ConfigEntry,
config_entry: HuaweiLteConfigEntry,
) -> HuaweiLteOptionsFlow:
"""Get options flow."""
return HuaweiLteOptionsFlow()
@@ -1,7 +1,12 @@
"""Huawei LTE constants."""
from homeassistant.helpers.typing import ConfigType
from homeassistant.util.hass_dict import HassKey
DOMAIN = "huawei_lte"
HUAWEI_LTE_CONFIG: HassKey[ConfigType] = HassKey(DOMAIN)
CONF_MANUFACTURER = "manufacturer"
CONF_TRACK_WIRED_CLIENTS = "track_wired_clients"
CONF_UNAUTHENTICATED_MODE = "unauthenticated_mode"
@@ -9,7 +9,6 @@ from homeassistant.components.device_tracker import (
DOMAIN as DEVICE_TRACKER_DOMAIN,
ScannerEntity,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import entity_registry as er
from homeassistant.helpers.dispatcher import async_dispatcher_connect
@@ -17,11 +16,10 @@ from homeassistant.helpers.entity import Entity
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from homeassistant.util import snakecase
from . import Router
from . import HuaweiLteConfigEntry, Router
from .const import (
CONF_TRACK_WIRED_CLIENTS,
DEFAULT_TRACK_WIRED_CLIENTS,
DOMAIN,
KEY_LAN_HOST_INFO,
KEY_WLAN_HOST_LIST,
UPDATE_SIGNAL,
@@ -50,7 +48,7 @@ def _get_hosts(
async def async_setup_entry(
hass: HomeAssistant,
config_entry: ConfigEntry,
config_entry: HuaweiLteConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up from config entry."""
@@ -58,9 +56,7 @@ async def async_setup_entry(
# Grab hosts list once to examine whether the initial fetch has got some data for
# us, i.e. if wlan host list is supported. Only set up a subscription and proceed
# with adding and tracking entities if it is.
# Uses legacy hass.data[DOMAIN] pattern
# pylint: disable-next=hass-use-runtime-data
router = hass.data[DOMAIN].routers[config_entry.entry_id]
router = config_entry.runtime_data
if (hosts := _get_hosts(router, True)) is None:
return
@@ -5,10 +5,9 @@ from __future__ import annotations
from typing import Any
from homeassistant.components.diagnostics import async_redact_data
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from .const import DOMAIN
from . import HuaweiLteConfigEntry
ENTRY_FIELDS_DATA_TO_REDACT = {
"mac",
@@ -74,13 +73,13 @@ TO_REDACT = {
async def async_get_config_entry_diagnostics(
hass: HomeAssistant, entry: ConfigEntry
hass: HomeAssistant, entry: HuaweiLteConfigEntry
) -> dict[str, Any]:
"""Return diagnostics for a config entry."""
return async_redact_data(
{
"entry": entry.data,
"router": hass.data[DOMAIN].routers[entry.entry_id].data,
"router": entry.runtime_data.data,
},
TO_REDACT,
)
@@ -12,8 +12,7 @@ from homeassistant.const import ATTR_CONFIG_ENTRY_ID, CONF_RECIPIENT
from homeassistant.core import HomeAssistant
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from . import Router
from .const import DOMAIN
from . import HuaweiLteConfigEntry, Router
_LOGGER = logging.getLogger(__name__)
@@ -27,9 +26,11 @@ async def async_get_service(
if discovery_info is None:
return None
# Uses legacy hass.data[DOMAIN] pattern
# pylint: disable-next=hass-use-runtime-data
router = hass.data[DOMAIN].routers[discovery_info[ATTR_CONFIG_ENTRY_ID]]
entry: HuaweiLteConfigEntry | None = hass.config_entries.async_get_entry(
discovery_info[ATTR_CONFIG_ENTRY_ID]
)
assert entry is not None
router = entry.runtime_data
default_targets = discovery_info[CONF_RECIPIENT] or []
return HuaweiLteSmsNotificationService(router, default_targets)
@@ -22,7 +22,7 @@ rules:
entity-event-setup: done
entity-unique-id: done
has-entity-name: done
runtime-data: todo
runtime-data: done
test-before-configure: done
test-before-setup: done
unique-config-entry: done
@@ -6,6 +6,7 @@ from collections.abc import Callable
from dataclasses import dataclass
from functools import partial
import logging
from typing import Any
from huawei_lte_api.enums.net import LTEBandEnum, NetworkBandEnum, NetworkModeEnum
@@ -14,14 +15,13 @@ from homeassistant.components.select import (
SelectEntity,
SelectEntityDescription,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import EntityCategory
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity import Entity
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from . import Router
from .const import DOMAIN, KEY_NET_NET_MODE
from . import HuaweiLteConfigEntry, Router
from .const import KEY_NET_NET_MODE
from .entity import HuaweiLteBaseEntityWithDevice
_LOGGER = logging.getLogger(__name__)
@@ -31,18 +31,16 @@ _LOGGER = logging.getLogger(__name__)
class HuaweiSelectEntityDescription(SelectEntityDescription):
"""Class describing Huawei LTE select entities."""
setter_fn: Callable[[str], None]
setter_fn: Callable[[str], Any]
async def async_setup_entry(
hass: HomeAssistant,
config_entry: ConfigEntry,
config_entry: HuaweiLteConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up from config entry."""
# Uses legacy hass.data[DOMAIN] pattern
# pylint: disable-next=hass-use-runtime-data
router = hass.data[DOMAIN].routers[config_entry.entry_id]
router = config_entry.runtime_data
selects: list[Entity] = []
desc = HuaweiSelectEntityDescription(
@@ -17,7 +17,6 @@ from homeassistant.components.sensor import (
SensorEntityDescription,
SensorStateClass,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
PERCENTAGE,
EntityCategory,
@@ -31,9 +30,8 @@ from homeassistant.helpers.entity import Entity
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from homeassistant.helpers.typing import StateType
from . import Router
from . import HuaweiLteConfigEntry, Router
from .const import (
DOMAIN,
KEY_DEVICE_INFORMATION,
KEY_DEVICE_SIGNAL,
KEY_MONITORING_CHECK_NOTIFICATIONS,
@@ -795,13 +793,11 @@ SENSOR_META: dict[str, HuaweiSensorGroup] = {
async def async_setup_entry(
hass: HomeAssistant,
config_entry: ConfigEntry,
config_entry: HuaweiLteConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up from config entry."""
# Uses legacy hass.data[DOMAIN] pattern
# pylint: disable-next=hass-use-runtime-data
router = hass.data[DOMAIN].routers[config_entry.entry_id]
router = config_entry.runtime_data
sensors: list[Entity] = []
for key in SENSOR_KEYS:
if not (items := router.data.get(key)):
+4 -10
View File
@@ -10,16 +10,12 @@ from homeassistant.components.switch import (
SwitchDeviceClass,
SwitchEntity,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity import Entity
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from .const import (
DOMAIN,
KEY_DIALUP_MOBILE_DATASWITCH,
KEY_WLAN_WIFI_GUEST_NETWORK_SWITCH,
)
from . import HuaweiLteConfigEntry
from .const import KEY_DIALUP_MOBILE_DATASWITCH, KEY_WLAN_WIFI_GUEST_NETWORK_SWITCH
from .entity import HuaweiLteBaseEntityWithDevice
_LOGGER = logging.getLogger(__name__)
@@ -27,13 +23,11 @@ _LOGGER = logging.getLogger(__name__)
async def async_setup_entry(
hass: HomeAssistant,
config_entry: ConfigEntry,
config_entry: HuaweiLteConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up from config entry."""
# Uses legacy hass.data[DOMAIN] pattern
# pylint: disable-next=hass-use-runtime-data
router = hass.data[DOMAIN].routers[config_entry.entry_id]
router = config_entry.runtime_data
switches: list[Entity] = []
if router.data.get(KEY_DIALUP_MOBILE_DATASWITCH):
@@ -43,7 +43,6 @@ NUMBERS: Final = (
native_max_value=100,
native_step=1,
native_unit_of_measurement=PERCENTAGE,
device_class=NumberDeviceClass.BATTERY,
),
IndevoltNumberEntityDescription(
key="max_ac_output_power",
+2 -4
View File
@@ -69,10 +69,8 @@ SENSORS: Final = (
IndevoltSensorEntityDescription(
key="6105",
generation=[1],
translation_key="rated_capacity",
native_unit_of_measurement=UnitOfEnergy.KILO_WATT_HOUR,
device_class=SensorDeviceClass.ENERGY,
state_class=SensorStateClass.TOTAL_INCREASING,
translation_key="discharge_limit",
native_unit_of_measurement=PERCENTAGE,
),
IndevoltSensorEntityDescription(
key="2101",
@@ -223,6 +223,9 @@
"dc_output_power": {
"name": "DC output power"
},
"discharge_limit": {
"name": "[%key:component::indevolt::entity::number::discharge_limit::name%]"
},
"energy_mode": {
"name": "Energy mode",
"state": {
@@ -4,7 +4,7 @@ from __future__ import annotations
import logging
from homeassistant.const import CONF_HOST, CONF_SCAN_INTERVAL, Platform
from homeassistant.const import CONF_SCAN_INTERVAL, Platform
from homeassistant.core import HomeAssistant
from homeassistant.helpers import device_registry as dr, entity_registry as er
@@ -17,7 +17,6 @@ from .const import (
DEFAULT_CONSIDER_HOME,
DEFAULT_INTERFACE,
DEFAULT_SCAN_INTERVAL,
DOMAIN,
)
from .router import KeeneticConfigEntry, KeeneticRouter
@@ -27,7 +26,6 @@ _LOGGER = logging.getLogger(__name__)
async def async_setup_entry(hass: HomeAssistant, entry: KeeneticConfigEntry) -> bool:
"""Set up the component."""
hass.data.setdefault(DOMAIN, {})
async_add_defaults(hass, entry)
router = KeeneticRouter(hass, entry)
@@ -85,12 +83,8 @@ async def async_unload_entry(
return unload_ok
def async_add_defaults(hass: HomeAssistant, entry: KeeneticConfigEntry):
def async_add_defaults(hass: HomeAssistant, entry: KeeneticConfigEntry) -> None:
"""Populate default options."""
host: str = entry.data[CONF_HOST]
# Uses legacy hass.data[DOMAIN] pattern
# pylint: disable-next=hass-use-runtime-data
imported_options: dict = hass.data[DOMAIN].get(f"imported_options_{host}", {})
options = {
CONF_SCAN_INTERVAL: DEFAULT_SCAN_INTERVAL,
CONF_CONSIDER_HOME: DEFAULT_CONSIDER_HOME,
@@ -98,7 +92,6 @@ def async_add_defaults(hass: HomeAssistant, entry: KeeneticConfigEntry):
CONF_TRY_HOTSPOT: True,
CONF_INCLUDE_ARP: True,
CONF_INCLUDE_ASSOCIATED: True,
**imported_options,
**entry.options,
}
@@ -198,6 +198,8 @@ class KeeneticOptionsFlowHandler(OptionsFlowWithReload):
options = vol.Schema(
{
# Polling interval is user-configurable, which is no longer allowed
# pylint: disable-next=hass-config-flow-polling-field
vol.Required(
CONF_SCAN_INTERVAL,
default=self.config_entry.options.get(
+14 -16
View File
@@ -1,41 +1,39 @@
"""The kraken integration."""
# pylint: disable=hass-use-runtime-data # Uses legacy hass.data[DOMAIN] pattern
from __future__ import annotations
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_SCAN_INTERVAL, Platform
from homeassistant.core import HomeAssistant
from homeassistant.helpers.dispatcher import async_dispatcher_send
from .const import DISPATCH_CONFIG_UPDATED, DOMAIN
from .coordinator import KrakenData
from .const import DISPATCH_CONFIG_UPDATED
from .coordinator import KrakenConfigEntry, KrakenData
PLATFORMS = [Platform.SENSOR]
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
async def async_setup_entry(hass: HomeAssistant, entry: KrakenConfigEntry) -> bool:
"""Set up kraken from a config entry."""
kraken_data = KrakenData(hass, entry)
await kraken_data.async_setup()
hass.data[DOMAIN] = kraken_data
entry.runtime_data = kraken_data
entry.async_on_unload(entry.add_update_listener(async_options_updated))
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
return True
async def async_unload_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> bool:
async def async_unload_entry(
hass: HomeAssistant, config_entry: KrakenConfigEntry
) -> bool:
"""Unload a config entry."""
unload_ok = await hass.config_entries.async_unload_platforms(
config_entry, PLATFORMS
)
if unload_ok:
hass.data.pop(DOMAIN)
return unload_ok
return await hass.config_entries.async_unload_platforms(config_entry, PLATFORMS)
async def async_options_updated(hass: HomeAssistant, config_entry: ConfigEntry) -> None:
async def async_options_updated(
hass: HomeAssistant, config_entry: KrakenConfigEntry
) -> None:
"""Triggered by config entry options updates."""
hass.data[DOMAIN].set_update_interval(config_entry.options[CONF_SCAN_INTERVAL])
config_entry.runtime_data.set_update_interval(
config_entry.options[CONF_SCAN_INTERVAL]
)
async_dispatcher_send(hass, DISPATCH_CONFIG_UPDATED, hass, config_entry)
@@ -8,17 +8,13 @@ import krakenex
from pykrakenapi.pykrakenapi import KrakenAPI
import voluptuous as vol
from homeassistant.config_entries import (
ConfigEntry,
ConfigFlow,
ConfigFlowResult,
OptionsFlow,
)
from homeassistant.config_entries import ConfigFlow, ConfigFlowResult, OptionsFlow
from homeassistant.const import CONF_SCAN_INTERVAL
from homeassistant.core import callback
from homeassistant.helpers import config_validation as cv
from .const import CONF_TRACKED_ASSET_PAIRS, DEFAULT_SCAN_INTERVAL, DOMAIN
from .coordinator import KrakenConfigEntry
from .utils import get_tradable_asset_pairs
@@ -30,7 +26,7 @@ class KrakenConfigFlow(ConfigFlow, domain=DOMAIN):
@staticmethod
@callback
def async_get_options_flow(
config_entry: ConfigEntry,
config_entry: KrakenConfigEntry,
) -> KrakenOptionsFlowHandler:
"""Get the options flow for this handler."""
return KrakenOptionsFlowHandler()
@@ -79,6 +75,8 @@ class KrakenOptionsFlowHandler(OptionsFlow):
)
options = {
# Polling interval is user-configurable, which is no longer allowed
# pylint: disable-next=hass-config-flow-polling-field
vol.Optional(
CONF_SCAN_INTERVAL,
default=self.config_entry.options.get(
@@ -28,10 +28,13 @@ CALL_RATE_LIMIT_SLEEP = 1
_LOGGER = logging.getLogger(__name__)
type KrakenConfigEntry = ConfigEntry[KrakenData]
class KrakenData:
"""Define an object to hold kraken data."""
def __init__(self, hass: HomeAssistant, config_entry: ConfigEntry) -> None:
def __init__(self, hass: HomeAssistant, config_entry: KrakenConfigEntry) -> None:
"""Initialize."""
self._hass = hass
self._config_entry = config_entry
+6 -7
View File
@@ -11,7 +11,6 @@ from homeassistant.components.sensor import (
SensorEntityDescription,
SensorStateClass,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import device_registry as dr
from homeassistant.helpers.device_registry import DeviceInfo
@@ -28,7 +27,7 @@ from .const import (
DOMAIN,
KrakenResponse,
)
from .coordinator import KrakenData
from .coordinator import KrakenConfigEntry, KrakenData
_LOGGER = logging.getLogger(__name__)
@@ -138,7 +137,7 @@ SENSOR_TYPES: tuple[KrakenSensorEntityDescription, ...] = (
async def async_setup_entry(
hass: HomeAssistant,
config_entry: ConfigEntry,
config_entry: KrakenConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Add kraken entities from a config_entry."""
@@ -149,9 +148,7 @@ async def async_setup_entry(
entities.extend(
[
KrakenSensor(
# Uses legacy hass.data[DOMAIN] pattern
# pylint: disable-next=hass-use-runtime-data
hass.data[DOMAIN],
config_entry.runtime_data,
tracked_asset_pair,
description,
)
@@ -163,7 +160,9 @@ async def async_setup_entry(
_async_add_kraken_sensors(config_entry.options[CONF_TRACKED_ASSET_PAIRS])
@callback
def async_update_sensors(hass: HomeAssistant, config_entry: ConfigEntry) -> None:
def async_update_sensors(
hass: HomeAssistant, config_entry: KrakenConfigEntry
) -> None:
"""Add or remove sensors for configured tracked asset pairs."""
dev_reg = dr.async_get(hass)
@@ -167,6 +167,8 @@ async def _async_build_schema_with_user_input(
if include_options:
schema.update(
{
# Approved exemption: nmap scan interval is user-configurable
# pylint: disable-next=hass-config-flow-polling-field
vol.Optional(
CONF_SCAN_INTERVAL,
default=user_input.get(CONF_SCAN_INTERVAL, TRACKER_SCAN_INTERVAL),
@@ -90,6 +90,8 @@ class OptionsFlowHandler(OptionsFlow):
step_id="init",
data_schema=vol.Schema(
{
# Polling interval is user-configurable, which is no longer allowed
# pylint: disable-next=hass-config-flow-polling-field
vol.Optional(
CONF_SCAN_INTERVAL,
default=self.config_entry.options.get(
@@ -525,7 +525,12 @@ class OpenAISubentryFlowHandler(ConfigSubentryFlow):
vol.Optional(CONF_IMAGE_MODEL, default=RECOMMENDED_IMAGE_MODEL)
] = SelectSelector(
SelectSelectorConfig(
options=["gpt-image-1.5", "gpt-image-1", "gpt-image-1-mini"],
options=[
"gpt-image-2",
"gpt-image-1.5",
"gpt-image-1",
"gpt-image-1-mini",
],
mode=SelectSelectorMode.DROPDOWN,
)
)
@@ -40,7 +40,7 @@ CONF_WEB_SEARCH_TIMEZONE = "timezone"
CONF_WEB_SEARCH_INLINE_CITATIONS = "inline_citations"
RECOMMENDED_CODE_INTERPRETER = False
RECOMMENDED_CHAT_MODEL = "gpt-4o-mini"
RECOMMENDED_IMAGE_MODEL = "gpt-image-1.5"
RECOMMENDED_IMAGE_MODEL = "gpt-image-2"
RECOMMENDED_MAX_TOKENS = 3000
RECOMMENDED_REASONING_EFFORT = "low"
RECOMMENDED_STORE_RESPONSES = False
@@ -615,7 +615,7 @@ class OpenAIBaseLLMEntity(Entity):
model=image_model,
output_format="png",
)
if image_model != "gpt-image-1-mini":
if image_model not in ("gpt-image-1-mini", "gpt-image-2"):
image_tool["input_fidelity"] = "high"
tools.append(image_tool)
# Keep image state on OpenAI so follow-up prompts can continue by
@@ -210,6 +210,8 @@ class PlaatoOptionsFlowHandler(OptionsFlow):
step_id="user",
data_schema=vol.Schema(
{
# Polling interval is user-configurable, which is no longer allowed
# pylint: disable-next=hass-config-flow-polling-field
vol.Optional(
CONF_SCAN_INTERVAL,
default=self.config_entry.options.get(
@@ -234,6 +234,8 @@ class RiscoOptionsFlowHandler(OptionsFlow):
self._data = {**DEFAULT_ADVANCED_OPTIONS, **self._data}
schema = schema.extend(
{
# Polling interval is user-configurable, which is no longer allowed
# pylint: disable-next=hass-config-flow-polling-field
vol.Required(
CONF_SCAN_INTERVAL, default=self._data[CONF_SCAN_INTERVAL]
): int,
@@ -206,6 +206,8 @@ class ScreenLogicOptionsFlowHandler(OptionsFlow):
step_id="init",
data_schema=vol.Schema(
{
# Polling interval is user-configurable, which is no longer allowed
# pylint: disable-next=hass-config-flow-polling-field
vol.Required(
CONF_SCAN_INTERVAL,
default=current_interval,
+10 -10
View File
@@ -2,26 +2,24 @@
from __future__ import annotations
from aioslimproto import SlimServer
from aioslimproto.server import SlimServer
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import EVENT_HOMEASSISTANT_STOP, Platform
from homeassistant.core import Event, HomeAssistant
from homeassistant.helpers import device_registry as dr
from .const import DOMAIN
PLATFORMS = [Platform.MEDIA_PLAYER]
type SlimProtoConfigEntry = ConfigEntry[SlimServer]
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
async def async_setup_entry(hass: HomeAssistant, entry: SlimProtoConfigEntry) -> bool:
"""Set up from a config entry."""
slimserver = SlimServer()
await slimserver.start()
# Uses legacy hass.data[DOMAIN] pattern
# pylint: disable-next=hass-use-runtime-data
hass.data[DOMAIN] = slimserver
entry.runtime_data = slimserver
# initialize platform(s)
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
@@ -39,15 +37,17 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
async def async_remove_config_entry_device(
hass: HomeAssistant, config_entry: ConfigEntry, device_entry: dr.DeviceEntry
hass: HomeAssistant,
config_entry: SlimProtoConfigEntry,
device_entry: dr.DeviceEntry,
) -> bool:
"""Remove a config entry from a device."""
return True
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
async def async_unload_entry(hass: HomeAssistant, entry: SlimProtoConfigEntry) -> bool:
"""Unload a config entry."""
unload_success = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
if unload_success:
await hass.data.pop(DOMAIN).stop()
await entry.runtime_data.stop()
return unload_success
@@ -19,12 +19,12 @@ from homeassistant.components.media_player import (
MediaType,
async_process_play_media_url,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from homeassistant.util.dt import utcnow
from . import SlimProtoConfigEntry
from .const import DEFAULT_NAME, DOMAIN, PLAYER_EVENT
STATE_MAPPING = {
@@ -38,13 +38,11 @@ STATE_MAPPING = {
async def async_setup_entry(
hass: HomeAssistant,
config_entry: ConfigEntry,
config_entry: SlimProtoConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up SlimProto MediaPlayer(s) from Config Entry."""
# Uses legacy hass.data[DOMAIN] pattern
# pylint: disable-next=hass-use-runtime-data
slimserver: SlimServer = hass.data[DOMAIN]
slimserver = config_entry.runtime_data
added_ids = set()
async def async_add_player(player: SlimClient) -> None:
+2 -25
View File
@@ -2,9 +2,7 @@
from __future__ import annotations
from tuya_device_handlers import TUYA_QUIRKS_REGISTRY
from tuya_device_handlers.definition.camera import (
CameraQuirk,
TuyaCameraDefinition,
get_default_definition,
)
@@ -30,20 +28,6 @@ CAMERAS: dict[DeviceCategory, CameraEntityDescription] = {
}
def _get_quirk_entities(
manager: Manager, device: CustomerDevice
) -> list[TuyaCameraEntity] | None:
if (quirk := TUYA_QUIRKS_REGISTRY.get_quirk_for_device(device)) is None or (
entity_quirks := quirk.camera_quirks
) is None:
return None
return [
TuyaCameraEntity(device, manager, definition, quirk=entity_quirk)
for entity_quirk in entity_quirks
if (definition := entity_quirk.definition_fn(device))
]
async def async_setup_entry(
hass: HomeAssistant,
entry: TuyaConfigEntry,
@@ -58,13 +42,10 @@ async def async_setup_entry(
entities: list[TuyaCameraEntity] = []
for device_id in device_ids:
device = manager.device_map[device_id]
if (quirk_entities := _get_quirk_entities(manager, device)) is not None:
entities.extend(quirk_entities)
continue
if description := CAMERAS.get(device.category):
entities.append(
TuyaCameraEntity(
device, manager, get_default_definition(device), description
device, manager, description, get_default_definition(device)
)
)
@@ -88,10 +69,8 @@ class TuyaCameraEntity(TuyaEntity, CameraEntity):
self,
device: CustomerDevice,
device_manager: Manager,
description: CameraEntityDescription,
definition: TuyaCameraDefinition,
description: CameraEntityDescription | None = None,
*,
quirk: CameraQuirk | None = None,
) -> None:
"""Init Tuya Camera."""
super().__init__(device, device_manager, description)
@@ -99,8 +78,6 @@ class TuyaCameraEntity(TuyaEntity, CameraEntity):
self._attr_model = device.product_name
self._motion_detection_switch = definition.motion_detection_switch
self._recording_status = definition.recording_status
if quirk and quirk.key:
self._attr_unique_id = f"tuya.{device.id}_{quirk.key}"
@property
def is_recording(self) -> bool:
+3 -5
View File
@@ -24,13 +24,11 @@ class TuyaEntity(Entity):
self,
device: CustomerDevice,
device_manager: Manager,
description: EntityDescription | None,
description: EntityDescription,
) -> None:
"""Init TuyaEntity."""
self._attr_unique_id = f"tuya.{device.id}"
if description:
self.entity_description = description
self._attr_unique_id = f"tuya.{device.id}{description.key}"
self._attr_unique_id = f"tuya.{device.id}{description.key}"
self.entity_description = description
# TuyaEntity initialize mq can subscribe
device.set_up = True
self.device = device
+1 -1
View File
@@ -44,7 +44,7 @@
"iot_class": "cloud_push",
"loggers": ["tuya_sharing"],
"requirements": [
"tuya-device-handlers==0.0.17",
"tuya-device-handlers==0.0.18",
"tuya-device-sharing-sdk==0.2.8"
]
}
@@ -8,6 +8,7 @@ from dataclasses import dataclass, replace
import logging
import math
from typing import Any, cast
import unicodedata
from unifi_access_api import (
ApiAuthError,
@@ -24,13 +25,16 @@ from unifi_access_api import (
WsMessageHandler,
)
from unifi_access_api.models.websocket import (
DeviceUpdate,
HwDoorbell,
InsightsAdd,
LocationUpdateState,
LocationUpdateV2,
LogAdd,
RemoteView,
SettingUpdate,
ThumbnailInfo,
V2DeviceUpdate,
V2LocationState,
V2LocationUpdate,
WebsocketMessage,
@@ -172,7 +176,10 @@ class UnifiAccessCoordinator(DataUpdateCoordinator[UnifiAccessData]):
handlers: dict[str, WsMessageHandler] = {
"access.data.device.location_update_v2": self._handle_location_update,
"access.data.v2.location.update": self._handle_v2_location_update,
"access.data.v2.device.update": self._handle_v2_device_update,
"access.data.device.update": self._handle_device_update,
"access.hw.door_bell": self._handle_doorbell,
"access.remote_view": self._handle_remote_view,
"access.logs.insights.add": self._handle_insights_add,
"access.logs.add": self._handle_logs_add,
"access.data.setting.update": self._handle_setting_update,
@@ -345,12 +352,13 @@ class UnifiAccessCoordinator(DataUpdateCoordinator[UnifiAccessData]):
updated_lock_rule = current_lock_rule
lock_rule_updated = False
if ws_state is not None:
if ws_state.dps is not None:
if "dps" in ws_state.model_fields_set and ws_state.dps is not None:
updates["door_position_status"] = ws_state.dps
if ws_state.lock == "locked":
updates["door_lock_relay_status"] = DoorLockRelayStatus.LOCK
elif ws_state.lock == "unlocked":
updates["door_lock_relay_status"] = DoorLockRelayStatus.UNLOCK
if "lock" in ws_state.model_fields_set:
if ws_state.lock == "locked":
updates["door_lock_relay_status"] = DoorLockRelayStatus.LOCK
elif ws_state.lock == "unlocked":
updates["door_lock_relay_status"] = DoorLockRelayStatus.UNLOCK
if "remain_lock" in ws_state.model_fields_set:
lock_rule_updated = True
@@ -428,6 +436,51 @@ class UnifiAccessCoordinator(DataUpdateCoordinator[UnifiAccessData]):
{},
)
async def _handle_remote_view(self, msg: WebsocketMessage) -> None:
"""Handle remote view (video intercom doorbell press) events."""
remote_view = cast(RemoteView, msg)
device_id = remote_view.data.device_id
if device_id and device_id in self._device_to_door:
self._dispatch_door_event(
self._device_to_door[device_id], "doorbell", "ring", {}
)
return
door_name = remote_view.data.door_name
if self.data and door_name:
normalized = unicodedata.normalize("NFC", door_name.strip())
for door in self.data.doors.values():
if unicodedata.normalize("NFC", door.name.strip()) == normalized:
self._dispatch_door_event(door.id, "doorbell", "ring", {})
return
_LOGGER.debug(
"Received access.remote_view for unknown device %s (door '%s')",
device_id,
door_name,
)
async def _handle_v2_device_update(self, msg: WebsocketMessage) -> None:
"""Handle V2 device update messages."""
update = cast(V2DeviceUpdate, msg)
device_id = update.data.id
if not device_id:
return
first_valid_door_id: str | None = None
for loc_state in update.data.location_states:
door_id = loc_state.location_id
if not door_id:
continue
if first_valid_door_id is None:
first_valid_door_id = door_id
self._process_door_update(door_id, loc_state)
if first_valid_door_id is not None:
self._device_to_door[device_id] = first_valid_door_id
async def _handle_device_update(self, msg: WebsocketMessage) -> None:
"""Handle device update messages."""
update = cast(DeviceUpdate, msg)
if update.data.unique_id and update.data.door and update.data.door.unique_id:
self._device_to_door[update.data.unique_id] = update.data.door.unique_id
async def _handle_insights_add(self, msg: WebsocketMessage) -> None:
"""Handle access insights events (entry/exit)."""
insights = cast(InsightsAdd, msg)
@@ -9,5 +9,5 @@
"iot_class": "local_push",
"loggers": ["unifi_access_api"],
"quality_scale": "silver",
"requirements": ["py-unifi-access==1.2.0"]
"requirements": ["py-unifi-access==1.3.0"]
}
@@ -107,6 +107,8 @@ class UpCloudOptionsFlow(OptionsFlow):
data_schema = vol.Schema(
{
# Polling interval is user-configurable, which is no longer allowed
# pylint: disable-next=hass-config-flow-polling-field
vol.Optional(
CONF_SCAN_INTERVAL,
default=self.config_entry.options.get(CONF_SCAN_INTERVAL)
@@ -49,7 +49,6 @@ async def async_setup_entry(hass: HomeAssistant, entry: WhirlpoolConfigEntry) ->
translation_domain=DOMAIN, translation_key="invalid_auth"
)
# A potato is not an appliance
appliances_manager = AppliancesManager(backend_selector, auth, session)
if not await appliances_manager.fetch_appliances():
raise ConfigEntryNotReady(
+146 -60
View File
@@ -4,6 +4,7 @@ from __future__ import annotations
import asyncio
from collections.abc import Callable, Iterable, Mapping, Sequence
from compression import zstd
from contextlib import suppress
from copy import deepcopy
import inspect
@@ -48,6 +49,34 @@ STORAGE_MANAGER: HassKey[_StoreManager] = HassKey("storage_manager")
MANAGER_CLEANUP_DELAY = 60
def _load_json_file(path: str | Path) -> json_util.JsonValueType:
"""Load JSON from a file, transparently decompressing .zst files.
Returns ``{}`` (the same sentinel as :func:`json_util.load_json`) when
the file does not exist. Raises :class:`HomeAssistantError` wrapping the
original exception when the file is corrupt or cannot be read.
"""
if not str(path).endswith(".zst"):
return json_util.load_json(path)
try:
with open(path, "rb") as fh:
raw = zstd.decompress(fh.read())
except FileNotFoundError:
_LOGGER.debug("JSON file not found: %s", path)
return {}
except zstd.ZstdError as err:
_LOGGER.exception("Could not decompress storage file: %s", path)
raise HomeAssistantError(f"Error decompressing {path}: {err}") from err
except OSError as err:
_LOGGER.exception("Storage file reading failed: %s", path)
raise HomeAssistantError(f"Error while loading {path}: {err}") from err
try:
return json_util.json_loads(raw)
except json_util.JSON_DECODE_EXCEPTIONS as err:
_LOGGER.exception("Could not parse JSON content: %s", path)
raise HomeAssistantError(f"Error while loading {path}: {err}") from err
async def async_migrator[_T: Mapping[str, Any] | Sequence[Any]](
hass: HomeAssistant,
old_path: str,
@@ -214,7 +243,7 @@ class _StoreManager:
storage_file: Path = storage_path.joinpath(key)
try:
if storage_file.is_file():
data_preload[key] = json_util.load_json(storage_file)
data_preload[key] = _load_json_file(storage_file)
except Exception as ex: # noqa: BLE001
_LOGGER.debug("Error loading %s: %s", key, ex)
@@ -239,6 +268,7 @@ class Store[_T: Mapping[str, Any] | Sequence[Any]]:
max_readable_version: int | None = None,
minor_version: int = 1,
read_only: bool = False,
compress: bool = False,
serialize_in_event_loop: bool = True,
) -> None:
"""Initialize storage class.
@@ -282,10 +312,36 @@ class Store[_T: Mapping[str, Any] | Sequence[Any]]:
self._next_write_time = 0.0
self._manager = get_internal_store_manager(hass)
self._serialize_in_event_loop = serialize_in_event_loop
self._compress = compress
@cached_property
def path(self):
"""Return the config path."""
if self._compress:
return self.hass.config.path(STORAGE_DIR, self.key + ".zst")
return self.hass.config.path(STORAGE_DIR, self.key)
@cached_property
def _cache_key(self):
"""Return the cache key used with _StoreManager.
For compressed stores the file on disk is named ``key.zst``, which is
what ``_StoreManager._files`` contains. Using the bare ``key`` would
always produce a "file does not exist" cache hit, so we append the
suffix here to match the real filename.
"""
if self._compress:
return self.key + ".zst"
return self.key
@cached_property
def _uncompressed_path(self):
"""Return the plain (uncompressed) config path.
Used as a fallback when compress=True but only an uncompressed file
exists (e.g. the user manually extracted / edited the file), and to
clean up the old file after a successful compressed write.
"""
return self.hass.config.path(STORAGE_DIR, self.key)
def make_read_only(self) -> None:
@@ -359,66 +415,19 @@ class Store[_T: Mapping[str, Any] | Sequence[Any]]:
# We make a copy because code might assume it's safe to mutate loaded data
# and we don't want that to mess with what we're trying to store.
data = deepcopy(data)
elif cache := self._manager.async_fetch(self.key):
elif cache := self._manager.async_fetch(self._cache_key):
exists, data = cache
if not exists:
return None
else:
try:
data = await self.hass.async_add_executor_job(
json_util.load_json, self.path
)
data = await self.hass.async_add_executor_job(self._load_data_from_disk)
except HomeAssistantError as err:
if isinstance(err.__cause__, JSONDecodeError):
# If we have a JSONDecodeError, it means the file is corrupt.
# We can't recover from this, so we'll log an error, rename the file and
# return None so that we can start with a clean slate which will
# allow startup to continue so they can restore from a backup.
isotime = dt_util.utcnow().isoformat()
corrupt_postfix = f".corrupt.{isotime}"
corrupt_path = f"{self.path}{corrupt_postfix}"
await self.hass.async_add_executor_job(
os.rename, self.path, corrupt_path
)
storage_key = self.key
_LOGGER.error(
"Unrecoverable error decoding storage %s at %s; "
"This may indicate an unclean shutdown, invalid syntax "
"from manual edits, or disk corruption; "
"The corrupt file has been saved as %s; "
"It is recommended to restore from backup: %s",
storage_key,
self.path,
corrupt_path,
err,
)
from .issue_registry import ( # noqa: PLC0415
IssueSeverity,
async_create_issue,
)
issue_domain = HOMEASSISTANT_DOMAIN
if (
domain := (storage_key.partition(".")[0])
) and domain in self.hass.config.components:
issue_domain = domain
async_create_issue(
self.hass,
HOMEASSISTANT_DOMAIN,
f"storage_corruption_{storage_key}_{isotime}",
is_fixable=True,
issue_domain=issue_domain,
translation_key="storage_corruption",
is_persistent=True,
severity=IssueSeverity.CRITICAL,
translation_placeholders={
"storage_key": storage_key,
"original_path": self.path,
"corrupt_path": corrupt_path,
"error": str(err),
},
)
if isinstance(err.__cause__, (JSONDecodeError, zstd.ZstdError)):
# If the file is corrupt we log an error, rename it, and
# return None so startup can continue from a clean slate.
# The caller can restore from a backup.
await self._async_handle_corrupt_file(err)
return None
raise
@@ -570,7 +579,7 @@ class Store[_T: Mapping[str, Any] | Sequence[Any]]:
async def _async_handle_write_data(self, *_args):
"""Handle writing the config."""
async with self._write_lock:
self._manager.async_invalidate(self.key)
self._manager.async_invalidate(self._cache_key)
self._async_cleanup_delay_listener()
self._async_cleanup_final_write_listener()
@@ -607,6 +616,22 @@ class Store[_T: Mapping[str, Any] | Sequence[Any]]:
mode, json_data = json_helper.prepare_save_json(data, encoder=self._encoder)
self._write_prepared_data(mode, json_data)
def _load_data_from_disk(self) -> json_util.JsonValueType:
"""Load data from disk.
Called in the executor. For compressed stores the compressed path is
tried first; if it does not exist the plain file is used as a fallback
so that a user-edited uncompressed file is transparently picked up.
Returns ``{}`` (the same sentinel as :func:`json_util.load_json`) when
neither file exists.
"""
data = _load_json_file(self.path)
if data == {} and self._compress:
# .zst not found fall back to the plain file.
data = _load_json_file(self._uncompressed_path)
return data
def _write_prepared_data(self, mode: str, json_data: str | bytes) -> None:
"""Write the data."""
path = self.path
@@ -616,7 +641,62 @@ class Store[_T: Mapping[str, Any] | Sequence[Any]]:
write_method = (
write_utf8_file_atomic if self._atomic_writes else write_utf8_file
)
write_method(path, json_data, self._private, mode=mode)
if self._compress:
# Ensure we have bytes before compressing.
if isinstance(json_data, str):
json_data = json_data.encode("utf-8")
compressed = zstd.compress(json_data)
write_method(path, compressed, self._private, mode="wb")
# Remove the old uncompressed file (migration from plain → compressed).
uncompressed = self._uncompressed_path
if os.path.isfile(uncompressed):
os.unlink(uncompressed)
else:
write_method(path, json_data, self._private, mode=mode)
async def _async_handle_corrupt_file(self, err: Exception) -> None:
"""Rename a corrupt storage file and create a repair issue."""
from .issue_registry import IssueSeverity, async_create_issue # noqa: PLC0415
isotime = dt_util.utcnow().isoformat()
corrupt_postfix = f".corrupt.{isotime}"
corrupt_path = f"{self.path}{corrupt_postfix}"
await self.hass.async_add_executor_job(os.rename, self.path, corrupt_path)
storage_key = self.key
_LOGGER.error(
"Unrecoverable error decoding storage %s at %s; "
"This may indicate an unclean shutdown, invalid syntax "
"from manual edits, or disk corruption; "
"The corrupt file has been saved as %s; "
"It is recommended to restore from backup: %s",
storage_key,
self.path,
corrupt_path,
err,
)
issue_domain = HOMEASSISTANT_DOMAIN
if (
domain := (storage_key.partition(".")[0])
) and domain in self.hass.config.components:
issue_domain = domain
async_create_issue(
self.hass,
HOMEASSISTANT_DOMAIN,
f"storage_corruption_{storage_key}_{isotime}",
is_fixable=True,
issue_domain=issue_domain,
translation_key="storage_corruption",
is_persistent=True,
severity=IssueSeverity.CRITICAL,
translation_placeholders={
"storage_key": storage_key,
"original_path": self.path,
"corrupt_path": corrupt_path,
"error": str(err),
},
)
async def _async_migrate_func(self, old_major_version, old_minor_version, old_data):
"""Migrate to the new version."""
@@ -624,9 +704,15 @@ class Store[_T: Mapping[str, Any] | Sequence[Any]]:
async def async_remove(self) -> None:
"""Remove all data."""
self._manager.async_invalidate(self.key)
self._manager.async_invalidate(self._cache_key)
self._async_cleanup_delay_listener()
self._async_cleanup_final_write_listener()
with suppress(FileNotFoundError):
await self.hass.async_add_executor_job(os.unlink, self.path)
def _remove_files() -> None:
with suppress(FileNotFoundError):
os.unlink(self.path)
if self._compress:
with suppress(FileNotFoundError):
os.unlink(self._uncompressed_path)
await self.hass.async_add_executor_job(_remove_files)
+1
View File
@@ -499,6 +499,7 @@ class EntityTriggerBase(Trigger):
)
if not self._duration:
# Call action immediately if duration is not specified or 0
call_action()
return
+1 -1
View File
@@ -70,7 +70,7 @@ standard-telnetlib==3.13.0
typing-extensions>=4.15.0,<5.0
ulid-transform==2.2.0
urllib3>=2.0
uv==0.11.6
uv==0.11.7
voluptuous-openapi==0.3.0
voluptuous-serialize==2.7.0
voluptuous==0.15.2
@@ -0,0 +1,103 @@
"""Plugin for detecting polling interval fields in config flow schemas.
Polling intervals (scan_interval, update_interval, etc.) should be fixed
by the integration, not exposed as user-configurable fields. The integration
author determines the appropriate polling frequency based on API rate limits,
device capabilities, and data freshness needs.
Found in 3.5% of new-integration PRs across 1,100+ analyzed PRs, April 2026.
"""
from __future__ import annotations
from astroid import nodes
from pylint.checkers import BaseChecker
from pylint.lint import PyLinter
# Field names that indicate a polling/scan interval.
_POLLING_FIELD_NAMES: frozenset[str] = frozenset(
{
"CONF_POLLING_INTERVAL",
"CONF_REFRESH_INTERVAL",
"CONF_SCAN_INTERVAL",
"CONF_UPDATE_FREQUENCY",
"CONF_UPDATE_INTERVAL",
"polling_interval",
"refresh_interval",
"scan_interval",
"update_frequency",
"update_interval",
}
)
class HassEnforceConfigFlowNoPollingChecker(BaseChecker):
"""Checker for polling interval fields in config flow schemas.
Config flows should not include polling interval fields
(CONF_SCAN_INTERVAL, "scan_interval", "update_interval", etc.) -- these
should be fixed by the integration author, not user-configurable.
"""
name = "hass_enforce_config_flow_no_polling"
priority = -1
msgs = {
"W7492": (
"Config flow should not include a '%s' field -- polling intervals "
"should be fixed by the integration, not user-configurable "
"(https://developers.home-assistant.io/docs/core/"
"integration-quality-scale/rules/appropriate-polling)",
"hass-config-flow-polling-field",
"Used when a config flow schema includes a polling/scan interval "
"field. The integration author should determine the appropriate "
"polling frequency based on API rate limits and data freshness "
"needs. Polling intervals should not be user-configurable. "
"See https://developers.home-assistant.io/docs/core/"
"integration-quality-scale/rules/appropriate-polling",
),
}
options = ()
def visit_call(self, node: nodes.Call) -> None:
"""Check for polling interval fields in vol.Required/Optional calls."""
root_name = node.root().name
if not root_name.startswith("homeassistant.components."):
return
parts = root_name.split(".")
current_module = parts[3] if len(parts) > 3 else ""
if current_module != "config_flow":
return
field_name = _get_schema_field_name(node)
if field_name is None:
return
if field_name in _POLLING_FIELD_NAMES:
self.add_message(
"hass-config-flow-polling-field",
node=node,
args=(field_name,),
)
def _get_schema_field_name(node: nodes.Call) -> str | None:
"""Extract the field name from vol.Required(...) or vol.Optional(...)."""
if not isinstance(node.func, nodes.Attribute):
return None
if node.func.attrname not in {"Required", "Optional"}:
return None
if not node.args:
return None
first_arg = node.args[0]
if isinstance(first_arg, nodes.Const) and isinstance(first_arg.value, str):
return first_arg.value
if isinstance(first_arg, nodes.Name):
return str(first_arg.name)
return None
def register(linter: PyLinter) -> None:
"""Register the checker."""
linter.register_checker(HassEnforceConfigFlowNoPollingChecker(linter))
+2 -1
View File
@@ -74,7 +74,7 @@ dependencies = [
"typing-extensions>=4.15.0,<5.0",
"ulid-transform==2.2.0",
"urllib3>=2.0",
"uv==0.11.6",
"uv==0.11.7",
"voluptuous==0.15.2",
"voluptuous-serialize==2.7.0",
"voluptuous-openapi==0.3.0",
@@ -121,6 +121,7 @@ load-plugins = [
"hass_async_load_fixtures",
"hass_decorator",
"hass_enforce_class_module",
"hass_enforce_config_flow_no_polling",
"hass_enforce_greek_micro_char",
"hass_enforce_runtime_data",
"hass_enforce_sorted_platforms",
+1 -1
View File
@@ -54,7 +54,7 @@ standard-telnetlib==3.13.0
typing-extensions>=4.15.0,<5.0
ulid-transform==2.2.0
urllib3>=2.0
uv==0.11.6
uv==0.11.7
voluptuous-openapi==0.3.0
voluptuous-serialize==2.7.0
voluptuous==0.15.2
+2 -2
View File
@@ -1908,7 +1908,7 @@ py-sucks==0.9.11
py-synologydsm-api==2.7.3
# homeassistant.components.unifi_access
py-unifi-access==1.2.0
py-unifi-access==1.3.0
# homeassistant.components.atome
pyAtome==0.1.1
@@ -3164,7 +3164,7 @@ ttls==1.8.3
ttn_client==1.3.0
# homeassistant.components.tuya
tuya-device-handlers==0.0.17
tuya-device-handlers==0.0.18
# homeassistant.components.tuya
tuya-device-sharing-sdk==0.2.8
+2 -2
View File
@@ -1660,7 +1660,7 @@ py-sucks==0.9.11
py-synologydsm-api==2.7.3
# homeassistant.components.unifi_access
py-unifi-access==1.2.0
py-unifi-access==1.3.0
# homeassistant.components.hdmi_cec
pyCEC==0.5.2
@@ -2685,7 +2685,7 @@ ttls==1.8.3
ttn_client==1.3.0
# homeassistant.components.tuya
tuya-device-handlers==0.0.17
tuya-device-handlers==0.0.18
# homeassistant.components.tuya
tuya-device-sharing-sdk==0.2.8
@@ -21,6 +21,7 @@ from tests.components.common import (
assert_condition_behavior_all,
assert_condition_behavior_any,
assert_condition_gated_by_labs_flag,
assert_condition_options_supported,
assert_numerical_condition_unit_conversion,
parametrize_condition_states_all,
parametrize_condition_states_any,
@@ -80,6 +81,73 @@ async def test_air_quality_conditions_gated_by_labs_flag(
await assert_condition_gated_by_labs_flag(hass, caplog, condition)
_PLAIN_THRESHOLD = {"threshold": {"type": "above", "value": {"number": 50}}}
_PPB_THRESHOLD = {
"threshold": {
"type": "above",
"value": {
"number": 50,
"unit_of_measurement": CONCENTRATION_PARTS_PER_BILLION,
},
}
}
_UGM3_THRESHOLD = {
"threshold": {
"type": "above",
"value": {
"number": 50,
"unit_of_measurement": CONCENTRATION_MICROGRAMS_PER_CUBIC_METER,
},
}
}
@pytest.mark.usefixtures("enable_labs_preview_features")
@pytest.mark.parametrize(
("condition_key", "base_options", "supports_behavior", "supports_duration"),
[
# State-based conditions
("air_quality.is_gas_detected", {}, True, True),
("air_quality.is_gas_cleared", {}, True, True),
("air_quality.is_co_detected", {}, True, True),
("air_quality.is_co_cleared", {}, True, True),
("air_quality.is_smoke_detected", {}, True, True),
("air_quality.is_smoke_cleared", {}, True, True),
# Numerical conditions with unit conversion (μg/m³ base)
("air_quality.is_co_value", _UGM3_THRESHOLD, True, False),
("air_quality.is_ozone_value", _UGM3_THRESHOLD, True, False),
("air_quality.is_voc_value", _UGM3_THRESHOLD, True, False),
("air_quality.is_no_value", _UGM3_THRESHOLD, True, False),
("air_quality.is_no2_value", _UGM3_THRESHOLD, True, False),
("air_quality.is_so2_value", _UGM3_THRESHOLD, True, False),
# Numerical conditions with unit conversion (ppb base)
("air_quality.is_voc_ratio_value", _PPB_THRESHOLD, True, False),
# Numerical conditions without unit conversion
("air_quality.is_co2_value", _PLAIN_THRESHOLD, True, False),
("air_quality.is_pm1_value", _PLAIN_THRESHOLD, True, False),
("air_quality.is_pm25_value", _PLAIN_THRESHOLD, True, False),
("air_quality.is_pm4_value", _PLAIN_THRESHOLD, True, False),
("air_quality.is_pm10_value", _PLAIN_THRESHOLD, True, False),
("air_quality.is_n2o_value", _PLAIN_THRESHOLD, True, False),
],
)
async def test_air_quality_condition_options_validation(
hass: HomeAssistant,
condition_key: str,
base_options: dict[str, Any] | None,
supports_behavior: bool,
supports_duration: bool,
) -> None:
"""Test that air_quality conditions support the expected options."""
await assert_condition_options_supported(
hass,
condition_key,
base_options,
supports_behavior=supports_behavior,
supports_duration=supports_duration,
)
@pytest.mark.usefixtures("enable_labs_preview_features")
@pytest.mark.parametrize(
("condition_target_config", "entity_id", "entities_in_target"),
+60
View File
@@ -1096,6 +1096,66 @@ async def assert_trigger_gated_by_labs_flag(
) in caplog.text
async def _validate_condition_options(
hass: HomeAssistant,
condition: str,
options: dict[str, Any] | None,
*,
valid: bool,
) -> None:
"""Assert that a condition accepts or rejects the given options during validation."""
config: dict[str, Any] = {
CONF_CONDITION: condition,
CONF_TARGET: {ATTR_LABEL_ID: "test_label"},
}
if options is not None:
config[CONF_OPTIONS] = options
if valid:
await async_validate_condition_config(hass, config)
else:
with pytest.raises(vol.Invalid):
await async_validate_condition_config(hass, config)
async def assert_condition_options_supported(
hass: HomeAssistant,
condition: str,
base_options: dict[str, Any] | None,
*,
supports_behavior: bool,
supports_duration: bool,
) -> None:
"""Assert which options a condition supports.
Tests that the condition:
- Accepts the minimal config (base_options)
- Accepts/rejects behavior depending on supports_behavior
- Accepts/rejects duration depending on supports_duration
- Rejects unknown options
"""
# Minimal config should always be valid
await _validate_condition_options(hass, condition, base_options, valid=True)
def _merge(extra: dict[str, Any]) -> dict[str, Any]:
return {**(base_options or {}), **extra}
# Behavior
for behavior in ("any", "all"):
await _validate_condition_options(
hass, condition, _merge({"behavior": behavior}), valid=supports_behavior
)
# Duration
await _validate_condition_options(
hass, condition, _merge({"for": {"seconds": 5}}), valid=supports_duration
)
# Unknown option should always be rejected
await _validate_condition_options(
hass, condition, _merge({"unknown_option": True}), valid=False
)
async def _validate_trigger_options(
hass: HomeAssistant,
trigger: str,
@@ -0,0 +1,220 @@
"""Tests for the ESPHome serial proxy helper."""
from __future__ import annotations
from unittest.mock import AsyncMock, call, patch
from aioesphomeapi import APIClient
from aioesphomeapi.model import SerialProxyInfo, SerialProxyPortType
import pytest
from serialx.platforms.serial_esphome import InvalidSettingsError
from yarl import URL
from homeassistant.components.esphome import _async_scan_serial_ports, serial_proxy
from homeassistant.components.esphome.const import DOMAIN
from homeassistant.components.usb import SerialDevice
from homeassistant.core import HomeAssistant
from homeassistant.setup import async_setup_component
from .conftest import MockESPHomeDeviceType
from tests.common import MockConfigEntry
def test_build_url_basic() -> None:
"""Build a URL with a simple port name."""
url = serial_proxy.build_url("abc123DEF456", "uart0")
assert url == URL("esphome-hass://esphome/abc123DEF456?port_name=uart0")
def test_build_url_escapes_port_name() -> None:
"""Port names with special characters are URL-encoded."""
url = serial_proxy.build_url("abc123", "uart 0/main")
# Round-trip via yarl recovers the original port name
assert URL(str(url)).query["port_name"] == "uart 0/main"
@pytest.mark.usefixtures("mock_zeroconf")
async def test_async_setup_stores_event_loop(
hass: HomeAssistant,
) -> None:
"""async_setup registers hass.loop on the serial_proxy module."""
assert await async_setup_component(hass, DOMAIN, {})
assert serial_proxy._HASS_LOOP is hass.loop
async def test_resolve_client_unknown_entry(hass: HomeAssistant) -> None:
"""An unknown entry_id raises InvalidSettingsError."""
with (
patch.object(serial_proxy, "async_get_hass", return_value=hass),
pytest.raises(InvalidSettingsError),
):
await serial_proxy._resolve_client("does-not-exist")
async def test_resolve_client_wrong_domain(hass: HomeAssistant) -> None:
"""A config entry from a different domain raises InvalidSettingsError."""
entry = MockConfigEntry(domain="other", data={})
entry.add_to_hass(hass)
with (
patch.object(serial_proxy, "async_get_hass", return_value=hass),
pytest.raises(InvalidSettingsError),
):
await serial_proxy._resolve_client(entry.entry_id)
async def test_resolve_client_unloaded_entry(hass: HomeAssistant) -> None:
"""An ESPHome entry that isn't loaded raises InvalidSettingsError."""
entry = MockConfigEntry(domain=DOMAIN, data={})
entry.add_to_hass(hass)
with (
patch.object(serial_proxy, "async_get_hass", return_value=hass),
pytest.raises(InvalidSettingsError),
):
await serial_proxy._resolve_client(entry.entry_id)
@pytest.mark.usefixtures("mock_zeroconf")
async def test_resolve_client_loaded_entry(
hass: HomeAssistant,
mock_client: APIClient,
mock_esphome_device: MockESPHomeDeviceType,
) -> None:
"""A loaded ESPHome entry returns its APIClient."""
device = await mock_esphome_device(mock_client=mock_client)
with patch.object(serial_proxy, "async_get_hass", return_value=hass):
client = await serial_proxy._resolve_client(device.entry.entry_id)
assert client is mock_client
@pytest.mark.usefixtures("mock_zeroconf")
async def test_scan_serial_ports_no_entries(hass: HomeAssistant) -> None:
"""No loaded ESPHome entries yields no ports."""
assert _async_scan_serial_ports(hass) == []
@pytest.mark.usefixtures("mock_zeroconf")
async def test_scan_serial_ports_happy_path(
hass: HomeAssistant,
mock_client: APIClient,
mock_esphome_device: MockESPHomeDeviceType,
) -> None:
"""A loaded entry with serial proxies emits a SerialDevice per proxy."""
device = await mock_esphome_device(
mock_client=mock_client,
device_info={
"mac_address": "AA:BB:CC:DD:EE:FF",
"manufacturer": "Espressif",
"model": "ESP32",
"serial_proxies": [
SerialProxyInfo(name="Left Port", port_type=SerialProxyPortType.TTL),
SerialProxyInfo(name="Right Port", port_type=SerialProxyPortType.TTL),
],
},
)
ports = _async_scan_serial_ports(hass)
entry_id = device.entry.entry_id
assert ports == [
SerialDevice(
device=str(serial_proxy.build_url(entry_id, "Left Port")),
serial_number="AABBCCDDEEFF-left_port",
manufacturer="Espressif",
description="ESP32 (Left Port)",
),
SerialDevice(
device=str(serial_proxy.build_url(entry_id, "Right Port")),
serial_number="AABBCCDDEEFF-right_port",
manufacturer="Espressif",
description="ESP32 (Right Port)",
),
]
@pytest.mark.usefixtures("mock_zeroconf")
async def test_scan_serial_ports_skips_unavailable(
hass: HomeAssistant,
mock_client: APIClient,
mock_esphome_device: MockESPHomeDeviceType,
) -> None:
"""Unavailable entries are skipped by the scanner."""
device = await mock_esphome_device(
mock_client=mock_client,
device_info={
"serial_proxies": [
SerialProxyInfo(name="uart0", port_type=SerialProxyPortType.TTL)
],
},
)
# Mark the entry as unavailable
device.entry.runtime_data.available = False
assert _async_scan_serial_ports(hass) == []
@pytest.mark.usefixtures("mock_zeroconf")
async def test_async_open_missing_host(hass: HomeAssistant) -> None:
"""A URL with an invalid entry_id raises InvalidSettingsError."""
assert await async_setup_component(hass, DOMAIN, {})
proxy = serial_proxy.HassESPHomeSerial("esphome-hass://unknown/?port_name=uart0")
with pytest.raises(InvalidSettingsError):
await proxy._async_open()
@pytest.mark.usefixtures("mock_zeroconf")
async def test_async_open_missing_port_name(
hass: HomeAssistant,
mock_client: APIClient,
mock_esphome_device: MockESPHomeDeviceType,
) -> None:
"""A URL with a missing port name raises InvalidSettingsError."""
assert await async_setup_component(hass, DOMAIN, {})
device = await mock_esphome_device(
mock_client=mock_client,
device_info={
"mac_address": "AA:BB:CC:DD:EE:FF",
"manufacturer": "Espressif",
"model": "ESP32",
"serial_proxies": [
SerialProxyInfo(name="uart0", port_type=SerialProxyPortType.TTL),
],
},
)
entry_id = device.entry.entry_id
proxy = serial_proxy.HassESPHomeSerial(f"esphome-hass://{entry_id}")
with pytest.raises(InvalidSettingsError):
await proxy._async_open()
@pytest.mark.usefixtures("mock_zeroconf")
async def test_async_open_happy_path(
hass: HomeAssistant,
mock_client: APIClient,
mock_esphome_device: MockESPHomeDeviceType,
) -> None:
"""Happy path sets _api from the loaded entry and applies port_name from query."""
device = await mock_esphome_device(mock_client=mock_client)
mock_client._loop = hass.loop
url = str(serial_proxy.build_url(device.entry.entry_id, "uart0"))
proxy = serial_proxy.HassESPHomeSerial(url)
with patch(
"homeassistant.components.esphome.serial_proxy.ESPHomeSerial._async_open",
AsyncMock(),
) as mock_super_open:
await proxy._async_open()
assert proxy._api is mock_client
assert proxy._port_name == "uart0"
assert proxy._client_loop is hass.loop
assert mock_super_open.mock_calls == [call()]
@@ -29,7 +29,7 @@
'object_id_base': 'Discharge limit',
'options': dict({
}),
'original_device_class': <NumberDeviceClass.BATTERY: 'battery'>,
'original_device_class': None,
'original_icon': None,
'original_name': 'Discharge limit',
'platform': 'indevolt',
@@ -44,7 +44,6 @@
# name: test_number[2][number.cms_sf2000_discharge_limit-state]
StateSnapshot({
'attributes': ReadOnlyDict({
'device_class': 'battery',
'friendly_name': 'CMS-SF2000 Discharge limit',
'max': 100,
'min': 0,
@@ -645,6 +645,57 @@
'state': 'main',
})
# ---
# name: test_sensor[1][sensor.bk1600_discharge_limit-entry]
EntityRegistryEntrySnapshot({
'aliases': list([
None,
]),
'area_id': None,
'capabilities': None,
'config_entry_id': <ANY>,
'config_subentry_id': <ANY>,
'device_class': None,
'device_id': <ANY>,
'disabled_by': None,
'domain': 'sensor',
'entity_category': None,
'entity_id': 'sensor.bk1600_discharge_limit',
'has_entity_name': True,
'hidden_by': None,
'icon': None,
'id': <ANY>,
'labels': set({
}),
'name': None,
'object_id_base': 'Discharge limit',
'options': dict({
}),
'original_device_class': None,
'original_icon': None,
'original_name': 'Discharge limit',
'platform': 'indevolt',
'previous_unique_id': None,
'suggested_object_id': None,
'supported_features': 0,
'translation_key': 'discharge_limit',
'unique_id': 'BK1600-12345678_6105',
'unit_of_measurement': '%',
})
# ---
# name: test_sensor[1][sensor.bk1600_discharge_limit-state]
StateSnapshot({
'attributes': ReadOnlyDict({
'friendly_name': 'BK1600 Discharge limit',
'unit_of_measurement': '%',
}),
'context': <ANY>,
'entity_id': 'sensor.bk1600_discharge_limit',
'last_changed': <ANY>,
'last_reported': <ANY>,
'last_updated': <ANY>,
'state': '5',
})
# ---
# name: test_sensor[1][sensor.bk1600_energy_mode-entry]
EntityRegistryEntrySnapshot({
'aliases': list([
@@ -767,64 +818,6 @@
'state': '0',
})
# ---
# name: test_sensor[1][sensor.bk1600_rated_capacity-entry]
EntityRegistryEntrySnapshot({
'aliases': list([
None,
]),
'area_id': None,
'capabilities': dict({
'state_class': <SensorStateClass.TOTAL_INCREASING: 'total_increasing'>,
}),
'config_entry_id': <ANY>,
'config_subentry_id': <ANY>,
'device_class': None,
'device_id': <ANY>,
'disabled_by': None,
'domain': 'sensor',
'entity_category': None,
'entity_id': 'sensor.bk1600_rated_capacity',
'has_entity_name': True,
'hidden_by': None,
'icon': None,
'id': <ANY>,
'labels': set({
}),
'name': None,
'object_id_base': 'Rated capacity',
'options': dict({
'sensor': dict({
'suggested_display_precision': 2,
}),
}),
'original_device_class': <SensorDeviceClass.ENERGY: 'energy'>,
'original_icon': None,
'original_name': 'Rated capacity',
'platform': 'indevolt',
'previous_unique_id': None,
'suggested_object_id': None,
'supported_features': 0,
'translation_key': 'rated_capacity',
'unique_id': 'BK1600-12345678_6105',
'unit_of_measurement': <UnitOfEnergy.KILO_WATT_HOUR: 'kWh'>,
})
# ---
# name: test_sensor[1][sensor.bk1600_rated_capacity-state]
StateSnapshot({
'attributes': ReadOnlyDict({
'device_class': 'energy',
'friendly_name': 'BK1600 Rated capacity',
'state_class': <SensorStateClass.TOTAL_INCREASING: 'total_increasing'>,
'unit_of_measurement': <UnitOfEnergy.KILO_WATT_HOUR: 'kWh'>,
}),
'context': <ANY>,
'entity_id': 'sensor.bk1600_rated_capacity',
'last_changed': <ANY>,
'last_reported': <ANY>,
'last_updated': <ANY>,
'state': '5',
})
# ---
# name: test_sensor[1][sensor.bk1600_total_ac_input_energy-entry]
EntityRegistryEntrySnapshot({
'aliases': list([
@@ -9,8 +9,8 @@ import pytest
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.components import keenetic_ndms2 as keenetic
from homeassistant.components.keenetic_ndms2 import CONF_INTERFACES, const
from homeassistant.components.keenetic_ndms2.const import DOMAIN
from homeassistant.const import CONF_HOST, CONF_SOURCE
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResultType
@@ -62,7 +62,7 @@ async def test_flow_works(hass: HomeAssistant, connect) -> None:
"""Test config flow."""
result = await hass.config_entries.flow.async_init(
keenetic.DOMAIN, context={"source": config_entries.SOURCE_USER}
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
@@ -84,7 +84,7 @@ async def test_flow_works(hass: HomeAssistant, connect) -> None:
async def test_reconfigure(hass: HomeAssistant, connect) -> None:
"""Test reconfigure flow."""
entry = MockConfigEntry(domain=keenetic.DOMAIN, data=MOCK_DATA)
entry = MockConfigEntry(domain=DOMAIN, data=MOCK_DATA)
entry.add_to_hass(hass)
result = await entry.start_reconfigure_flow(hass)
@@ -112,7 +112,7 @@ async def test_reconfigure(hass: HomeAssistant, connect) -> None:
async def test_options(hass: HomeAssistant) -> None:
"""Test updating options."""
entry = MockConfigEntry(domain=keenetic.DOMAIN, data=MOCK_DATA)
entry = MockConfigEntry(domain=DOMAIN, data=MOCK_DATA)
entry.add_to_hass(hass)
with patch(
"homeassistant.components.keenetic_ndms2.async_setup_entry", return_value=True
@@ -151,13 +151,11 @@ async def test_options(hass: HomeAssistant) -> None:
async def test_host_already_configured(hass: HomeAssistant, connect) -> None:
"""Test host already configured."""
entry = MockConfigEntry(
domain=keenetic.DOMAIN, data=MOCK_DATA, options=MOCK_OPTIONS
)
entry = MockConfigEntry(domain=DOMAIN, data=MOCK_DATA, options=MOCK_OPTIONS)
entry.add_to_hass(hass)
result = await hass.config_entries.flow.async_init(
keenetic.DOMAIN, context={"source": config_entries.SOURCE_USER}
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
result2 = await hass.config_entries.flow.async_configure(
@@ -172,7 +170,7 @@ async def test_connection_error(hass: HomeAssistant, connect_error) -> None:
"""Test error when connection is unsuccessful."""
result = await hass.config_entries.flow.async_init(
keenetic.DOMAIN, context={"source": config_entries.SOURCE_USER}
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input=MOCK_DATA
@@ -184,7 +182,7 @@ async def test_connection_error(hass: HomeAssistant, connect_error) -> None:
async def test_options_not_initialized(hass: HomeAssistant) -> None:
"""Test the error when the integration is not initialized."""
entry = MockConfigEntry(domain=keenetic.DOMAIN, data=MOCK_DATA)
entry = MockConfigEntry(domain=DOMAIN, data=MOCK_DATA)
entry.add_to_hass(hass)
# not setting entry.runtime_data
@@ -198,7 +196,7 @@ async def test_options_not_initialized(hass: HomeAssistant) -> None:
async def test_options_connection_error(hass: HomeAssistant) -> None:
"""Test updating options."""
entry = MockConfigEntry(domain=keenetic.DOMAIN, data=MOCK_DATA)
entry = MockConfigEntry(domain=DOMAIN, data=MOCK_DATA)
entry.add_to_hass(hass)
def get_interfaces_error():
@@ -218,7 +216,7 @@ async def test_options_connection_error(hass: HomeAssistant) -> None:
async def test_options_interface_filter(hass: HomeAssistant) -> None:
"""Test the case when the default Home interface is missing on the router."""
entry = MockConfigEntry(domain=keenetic.DOMAIN, data=MOCK_DATA)
entry = MockConfigEntry(domain=DOMAIN, data=MOCK_DATA)
entry.add_to_hass(hass)
# fake interfaces
@@ -250,7 +248,7 @@ async def test_ssdp_works(hass: HomeAssistant, connect) -> None:
discovery_info = dataclasses.replace(MOCK_SSDP_DISCOVERY_INFO)
result = await hass.config_entries.flow.async_init(
keenetic.DOMAIN,
DOMAIN,
context={CONF_SOURCE: config_entries.SOURCE_SSDP},
data=discovery_info,
)
@@ -279,14 +277,12 @@ async def test_ssdp_works(hass: HomeAssistant, connect) -> None:
async def test_ssdp_already_configured(hass: HomeAssistant) -> None:
"""Test host already configured and discovered."""
entry = MockConfigEntry(
domain=keenetic.DOMAIN, data=MOCK_DATA, options=MOCK_OPTIONS
)
entry = MockConfigEntry(domain=DOMAIN, data=MOCK_DATA, options=MOCK_OPTIONS)
entry.add_to_hass(hass)
discovery_info = dataclasses.replace(MOCK_SSDP_DISCOVERY_INFO)
result = await hass.config_entries.flow.async_init(
keenetic.DOMAIN,
DOMAIN,
context={CONF_SOURCE: config_entries.SOURCE_SSDP},
data=discovery_info,
)
@@ -299,7 +295,7 @@ async def test_ssdp_ignored(hass: HomeAssistant) -> None:
"""Test unique ID ignored and discovered."""
entry = MockConfigEntry(
domain=keenetic.DOMAIN,
domain=DOMAIN,
source=config_entries.SOURCE_IGNORE,
unique_id=MOCK_SSDP_DISCOVERY_INFO.upnp[ATTR_UPNP_UDN],
)
@@ -307,7 +303,7 @@ async def test_ssdp_ignored(hass: HomeAssistant) -> None:
discovery_info = dataclasses.replace(MOCK_SSDP_DISCOVERY_INFO)
result = await hass.config_entries.flow.async_init(
keenetic.DOMAIN,
DOMAIN,
context={CONF_SOURCE: config_entries.SOURCE_SSDP},
data=discovery_info,
)
@@ -320,7 +316,7 @@ async def test_ssdp_update_host(hass: HomeAssistant) -> None:
"""Test unique ID configured and discovered with the new host."""
entry = MockConfigEntry(
domain=keenetic.DOMAIN,
domain=DOMAIN,
data=MOCK_DATA,
options=MOCK_OPTIONS,
unique_id=MOCK_SSDP_DISCOVERY_INFO.upnp[ATTR_UPNP_UDN],
@@ -333,7 +329,7 @@ async def test_ssdp_update_host(hass: HomeAssistant) -> None:
discovery_info.ssdp_location = f"http://{new_ip}/"
result = await hass.config_entries.flow.async_init(
keenetic.DOMAIN,
DOMAIN,
context={CONF_SOURCE: config_entries.SOURCE_SSDP},
data=discovery_info,
)
@@ -351,7 +347,7 @@ async def test_ssdp_reject_no_udn(hass: HomeAssistant) -> None:
discovery_info.upnp.pop(ATTR_UPNP_UDN)
result = await hass.config_entries.flow.async_init(
keenetic.DOMAIN,
DOMAIN,
context={CONF_SOURCE: config_entries.SOURCE_SSDP},
data=discovery_info,
)
@@ -367,7 +363,7 @@ async def test_ssdp_reject_non_keenetic(hass: HomeAssistant) -> None:
discovery_info.upnp = {**discovery_info.upnp}
discovery_info.upnp[ATTR_UPNP_FRIENDLY_NAME] = "Suspicious device"
result = await hass.config_entries.flow.async_init(
keenetic.DOMAIN,
DOMAIN,
context={CONF_SOURCE: config_entries.SOURCE_SSDP},
data=discovery_info,
)
@@ -225,7 +225,13 @@ async def test_generate_data_with_attachments(
@pytest.mark.freeze_time("2025-06-14 22:59:00")
@pytest.mark.parametrize("configured_store", [False, True])
@pytest.mark.parametrize(
"image_model", ["gpt-image-1.5", "gpt-image-1", "gpt-image-1-mini"]
("image_model", "input_fidelity_present"),
[
("gpt-image-2", False),
("gpt-image-1.5", True),
("gpt-image-1", True),
("gpt-image-1-mini", False),
],
)
async def test_generate_image(
hass: HomeAssistant,
@@ -234,6 +240,7 @@ async def test_generate_image(
entity_registry: er.EntityRegistry,
issue_registry: ir.IssueRegistry,
image_model: str,
input_fidelity_present: bool,
configured_store: bool,
) -> None:
"""Test AI Task image generation."""
@@ -296,6 +303,14 @@ async def test_generate_image(
mock_upload_media.assert_called_once()
assert mock_create_stream.call_args is not None
assert mock_create_stream.call_args.kwargs["store"] is True
image_tool = next(
iter(
tool
for tool in mock_create_stream.call_args.kwargs["tools"]
if tool["type"] == "image_generation"
),
)
assert ("input_fidelity" in image_tool) == input_fidelity_present
image_data = mock_upload_media.call_args[0][1]
assert image_data.file.getvalue() == b"A"
assert image_data.content_type == "image/png"
@@ -1196,7 +1196,7 @@ async def test_creating_ai_task_subentry_advanced(
assert result4.get("data") == {
CONF_RECOMMENDED: False,
CONF_CHAT_MODEL: "gpt-4o",
CONF_IMAGE_MODEL: "gpt-image-1.5",
CONF_IMAGE_MODEL: "gpt-image-2",
CONF_MAX_TOKENS: 200,
CONF_STORE_RESPONSES: True,
CONF_TEMPERATURE: 0.5,
+1 -31
View File
@@ -3,12 +3,10 @@
from __future__ import annotations
from typing import Any
from unittest.mock import Mock, patch
from unittest.mock import patch
import pytest
from syrupy.assertion import SnapshotAssertion
from tuya_device_handlers import TUYA_QUIRKS_REGISTRY
from tuya_device_handlers.definition.camera import CameraQuirk, get_default_definition
from tuya_sharing import CustomerDevice, Manager
from homeassistant.components.camera import (
@@ -59,34 +57,6 @@ async def test_platform_setup_and_discovery(
)
@pytest.mark.parametrize("mock_device_code", ["sp_rudejjigkywujjvs"])
@pytest.mark.parametrize(
("get_quirks", "available"),
[
(None, True),
([], False),
([CameraQuirk(key="", definition_fn=get_default_definition)], True),
([CameraQuirk(key="", definition_fn=lambda d: None)], False),
],
)
async def test_empty_quirk(
hass: HomeAssistant,
mock_manager: Manager,
mock_config_entry: MockConfigEntry,
mock_device: CustomerDevice,
get_quirks: list | None,
available: bool,
) -> None:
"""Test None quirks use defaults and empty quirk list skips default entities."""
with patch.object(TUYA_QUIRKS_REGISTRY, "get_quirk_for_device") as mock_get_quirk:
mock_get_quirk.return_value = Mock()
mock_get_quirk.return_value.camera_quirks = get_quirks
await initialize_entry(hass, mock_manager, mock_config_entry, mock_device)
state = hass.states.get("camera.burocam")
assert (state is not None) is available
@pytest.mark.parametrize(
"mock_device_code",
["sp_rudejjigkywujjvs"],
@@ -11,6 +11,9 @@ from unifi_access_api.models.websocket import (
LocationUpdateData,
LocationUpdateState,
LocationUpdateV2,
V2DeviceLocationState,
V2DeviceUpdate,
V2DeviceUpdateData,
WebsocketMessage,
)
@@ -123,3 +126,78 @@ async def test_ws_reconnect_restores_binary_sensor_states(
assert hass.states.get(FRONT_DOOR_ENTITY).state == "off"
assert hass.states.get(BACK_DOOR_ENTITY).state == "on"
async def test_binary_sensor_state_updates_via_v2_device_update(
hass: HomeAssistant,
init_integration: MockConfigEntry,
mock_client: MagicMock,
) -> None:
"""Test access.data.v2.device.update changes binary sensor state."""
handlers = _get_ws_handlers(mock_client)
update_msg = V2DeviceUpdate(
event="access.data.v2.device.update",
data=V2DeviceUpdateData(
id="hub-device-001",
location_states=[
V2DeviceLocationState(
location_id="door-001",
dps=DoorPositionStatus.OPEN,
)
],
),
)
await handlers["access.data.v2.device.update"](update_msg)
await hass.async_block_till_done()
assert hass.states.get(FRONT_DOOR_ENTITY).state == "on"
async def test_v2_device_update_empty_location_id_ignored(
hass: HomeAssistant,
init_integration: MockConfigEntry,
mock_client: MagicMock,
) -> None:
"""Test access.data.v2.device.update with empty location_id does not update state."""
handlers = _get_ws_handlers(mock_client)
update_msg = V2DeviceUpdate(
event="access.data.v2.device.update",
data=V2DeviceUpdateData(
id="hub-device-001",
location_states=[V2DeviceLocationState(location_id="")],
),
)
await handlers["access.data.v2.device.update"](update_msg)
await hass.async_block_till_done()
# State should be unchanged (front door starts closed/off)
assert hass.states.get(FRONT_DOOR_ENTITY).state == "off"
async def test_v2_device_update_no_explicit_state_does_not_overwrite(
hass: HomeAssistant,
init_integration: MockConfigEntry,
mock_client: MagicMock,
) -> None:
"""Test v2.device.update without explicit dps/lock does not overwrite known state.
A device association message (only location_id set, no explicit dps/lock)
must not reset a known-open/unlocked door back to closed/locked.
"""
handlers = _get_ws_handlers(mock_client)
# back door starts open (on). Send a device update with no explicit dps/lock.
update_msg = V2DeviceUpdate(
event="access.data.v2.device.update",
data=V2DeviceUpdateData(
id="hub-device-002",
location_states=[V2DeviceLocationState(location_id="door-002")],
),
)
await handlers["access.data.v2.device.update"](update_msg)
await hass.async_block_till_done()
# Back door must still be open/on not silently reset to closed/off
assert hass.states.get(BACK_DOOR_ENTITY).state == "on"
+268
View File
@@ -8,6 +8,9 @@ from unittest.mock import MagicMock, patch
import pytest
from syrupy.assertion import SnapshotAssertion
from unifi_access_api.models.websocket import (
DeviceUpdate,
DeviceUpdateData,
DeviceUpdateDoor,
HwDoorbell,
HwDoorbellData,
InsightsAdd,
@@ -21,6 +24,11 @@ from unifi_access_api.models.websocket import (
LogEvent,
LogSource,
LogTarget,
RemoteView,
RemoteViewData,
V2DeviceLocationState,
V2DeviceUpdate,
V2DeviceUpdateData,
V2LocationUpdate,
V2LocationUpdateData,
WebsocketMessage,
@@ -780,6 +788,266 @@ async def test_logs_add_device_mapping_pruned_on_refresh(
assert hass.states.get(FRONT_DOOR_ACCESS_ENTITY) is None
@pytest.mark.freeze_time("2025-01-01 00:00:00+00:00")
async def test_remote_view_doorbell_ring_by_device_id(
hass: HomeAssistant,
init_integration: MockConfigEntry,
mock_client: MagicMock,
) -> None:
"""Test access.remote_view fires doorbell ring when device_id is in the mapping."""
handlers = _get_ws_handlers(mock_client)
await _populate_device_mapping(handlers)
remote_view_msg = RemoteView(
event="access.remote_view",
data=RemoteViewData(device_id="hub-device-001"),
)
await handlers["access.remote_view"](remote_view_msg)
await hass.async_block_till_done()
state = hass.states.get(FRONT_DOOR_DOORBELL_ENTITY)
assert state is not None
assert state.attributes["event_type"] == "ring"
assert state.state == "2025-01-01T00:00:00.000+00:00"
@pytest.mark.freeze_time("2025-01-01 00:00:00+00:00")
async def test_remote_view_doorbell_ring_by_door_name_fallback(
hass: HomeAssistant,
init_integration: MockConfigEntry,
mock_client: MagicMock,
) -> None:
"""Test access.remote_view falls back to door_name lookup when device_id is unmapped."""
handlers = _get_ws_handlers(mock_client)
remote_view_msg = RemoteView(
event="access.remote_view",
data=RemoteViewData(device_id="unknown-device", door_name="Front Door"),
)
await handlers["access.remote_view"](remote_view_msg)
await hass.async_block_till_done()
state = hass.states.get(FRONT_DOOR_DOORBELL_ENTITY)
assert state is not None
assert state.attributes["event_type"] == "ring"
assert state.state == "2025-01-01T00:00:00.000+00:00"
async def test_remote_view_unknown_device_and_door_ignored(
hass: HomeAssistant,
init_integration: MockConfigEntry,
mock_client: MagicMock,
) -> None:
"""Test access.remote_view is ignored when both device_id and door_name are unknown."""
handlers = _get_ws_handlers(mock_client)
remote_view_msg = RemoteView(
event="access.remote_view",
data=RemoteViewData(device_id="unknown-device", door_name="Unknown Door"),
)
await handlers["access.remote_view"](remote_view_msg)
await hass.async_block_till_done()
state = hass.states.get(FRONT_DOOR_DOORBELL_ENTITY)
assert state is not None
assert state.state == "unknown"
@pytest.mark.freeze_time("2025-01-01 00:00:00+00:00")
async def test_remote_view_device_mapping_via_device_update(
hass: HomeAssistant,
init_integration: MockConfigEntry,
mock_client: MagicMock,
) -> None:
"""Test access.remote_view resolves device_id populated by access.data.device.update."""
handlers = _get_ws_handlers(mock_client)
device_update_msg = DeviceUpdate(
event="access.data.device.update",
data=DeviceUpdateData(
unique_id="intercom-device-001",
door=DeviceUpdateDoor(unique_id="door-001"),
),
)
await handlers["access.data.device.update"](device_update_msg)
await hass.async_block_till_done()
remote_view_msg = RemoteView(
event="access.remote_view",
data=RemoteViewData(device_id="intercom-device-001"),
)
await handlers["access.remote_view"](remote_view_msg)
await hass.async_block_till_done()
state = hass.states.get(FRONT_DOOR_DOORBELL_ENTITY)
assert state is not None
assert state.attributes["event_type"] == "ring"
assert state.state == "2025-01-01T00:00:00.000+00:00"
@pytest.mark.freeze_time("2025-01-01 00:00:00+00:00")
async def test_remote_view_device_mapping_via_v2_device_update(
hass: HomeAssistant,
init_integration: MockConfigEntry,
mock_client: MagicMock,
) -> None:
"""Test access.remote_view resolves device_id populated by access.data.v2.device.update."""
handlers = _get_ws_handlers(mock_client)
v2_device_update_msg = V2DeviceUpdate(
event="access.data.v2.device.update",
data=V2DeviceUpdateData(
id="intercom-v2-001",
location_states=[V2DeviceLocationState(location_id="door-001")],
),
)
await handlers["access.data.v2.device.update"](v2_device_update_msg)
await hass.async_block_till_done()
remote_view_msg = RemoteView(
event="access.remote_view",
data=RemoteViewData(device_id="intercom-v2-001"),
)
await handlers["access.remote_view"](remote_view_msg)
await hass.async_block_till_done()
state = hass.states.get(FRONT_DOOR_DOORBELL_ENTITY)
assert state is not None
assert state.attributes["event_type"] == "ring"
assert state.state == "2025-01-01T00:00:00.000+00:00"
@pytest.mark.freeze_time("2025-01-01 00:00:00+00:00")
async def test_v2_device_update_multiple_location_states_maps_to_first_door(
hass: HomeAssistant,
init_integration: MockConfigEntry,
mock_client: MagicMock,
) -> None:
"""Test device with multiple location_states maps to the first valid door only."""
handlers = _get_ws_handlers(mock_client)
# Device has two location_states; should be mapped to the first door (door-001).
v2_device_update_msg = V2DeviceUpdate(
event="access.data.v2.device.update",
data=V2DeviceUpdateData(
id="hub-multi-001",
location_states=[
V2DeviceLocationState(location_id="door-001"),
V2DeviceLocationState(location_id="door-002"),
],
),
)
await handlers["access.data.v2.device.update"](v2_device_update_msg)
await hass.async_block_till_done()
remote_view_msg = RemoteView(
event="access.remote_view",
data=RemoteViewData(device_id="hub-multi-001"),
)
await handlers["access.remote_view"](remote_view_msg)
await hass.async_block_till_done()
# Should ring front door (door-001), not back door (door-002)
front_state = hass.states.get(FRONT_DOOR_DOORBELL_ENTITY)
assert front_state is not None
assert front_state.attributes["event_type"] == "ring"
assert front_state.state == "2025-01-01T00:00:00.000+00:00"
back_state = hass.states.get(BACK_DOOR_DOORBELL_ENTITY)
assert back_state is not None
assert back_state.state == "unknown"
async def test_device_update_without_door_does_not_map(
hass: HomeAssistant,
init_integration: MockConfigEntry,
mock_client: MagicMock,
) -> None:
"""Test access.data.device.update without a door does not populate the mapping."""
handlers = _get_ws_handlers(mock_client)
device_update_msg = DeviceUpdate(
event="access.data.device.update",
data=DeviceUpdateData(unique_id="orphan-device"),
)
await handlers["access.data.device.update"](device_update_msg)
await hass.async_block_till_done()
# Sending a remote_view for that device should not fire a doorbell event
remote_view_msg = RemoteView(
event="access.remote_view",
data=RemoteViewData(device_id="orphan-device"),
)
await handlers["access.remote_view"](remote_view_msg)
await hass.async_block_till_done()
state = hass.states.get(FRONT_DOOR_DOORBELL_ENTITY)
assert state is not None
assert state.state == "unknown"
async def test_device_update_empty_unique_id_does_not_pollute_mapping(
hass: HomeAssistant,
init_integration: MockConfigEntry,
mock_client: MagicMock,
) -> None:
"""Test access.data.device.update with empty unique_id does not create mapping."""
handlers = _get_ws_handlers(mock_client)
device_update_msg = DeviceUpdate(
event="access.data.device.update",
data=DeviceUpdateData(
unique_id="",
door=DeviceUpdateDoor(unique_id="door-001"),
),
)
await handlers["access.data.device.update"](device_update_msg)
await hass.async_block_till_done()
# An empty device_id must not accidentally resolve via the empty-string key
remote_view_msg = RemoteView(
event="access.remote_view",
data=RemoteViewData(device_id=""),
)
await handlers["access.remote_view"](remote_view_msg)
await hass.async_block_till_done()
state = hass.states.get(FRONT_DOOR_DOORBELL_ENTITY)
assert state is not None
assert state.state == "unknown"
async def test_v2_device_update_empty_id_does_not_pollute_mapping(
hass: HomeAssistant,
init_integration: MockConfigEntry,
mock_client: MagicMock,
) -> None:
"""Test access.data.v2.device.update with empty id does not create mapping."""
handlers = _get_ws_handlers(mock_client)
v2_device_update_msg = V2DeviceUpdate(
event="access.data.v2.device.update",
data=V2DeviceUpdateData(
id="",
location_states=[V2DeviceLocationState(location_id="door-001")],
),
)
await handlers["access.data.v2.device.update"](v2_device_update_msg)
await hass.async_block_till_done()
# The empty-string device id must not produce an entry in _device_to_door
remote_view_msg = RemoteView(
event="access.remote_view",
data=RemoteViewData(device_id=""),
)
await handlers["access.remote_view"](remote_view_msg)
await hass.async_block_till_done()
state = hass.states.get(FRONT_DOOR_DOORBELL_ENTITY)
assert state is not None
assert state.state == "unknown"
@pytest.mark.freeze_time("2025-01-01 00:00:00+00:00")
async def test_logs_add_uah_door_via_enriched_door_id(
hass: HomeAssistant,
+128
View File
@@ -1,6 +1,7 @@
"""Tests for the storage helper."""
import asyncio
from compression import zstd
from datetime import timedelta
import json
import os
@@ -1382,3 +1383,130 @@ async def test_load_empty_returns_none_and_read_only(
await store.async_save({"new": "data"})
assert hass_storage[MOCK_KEY]["data"] == MOCK_DATA
assert hass_storage[MOCK_KEY]["version"] == 99
async def test_compress_save_load_round_trip(tmpdir: py.path.local) -> None:
"""Test that a compressed store saves a .zst file and loads back correctly."""
loop = asyncio.get_running_loop()
config_dir = await loop.run_in_executor(None, tmpdir.mkdir, "temp_storage")
async with async_test_home_assistant(config_dir=config_dir.strpath) as hass:
store = storage.Store(hass, MOCK_VERSION, MOCK_KEY, compress=True)
await store.async_save(MOCK_DATA)
storage_path = Path(config_dir.strpath) / ".storage"
zst_file = storage_path / (MOCK_KEY + ".zst")
plain_file = storage_path / MOCK_KEY
assert zst_file.is_file()
assert not plain_file.exists()
raw = zstd.decompress(zst_file.read_bytes())
on_disk = json.loads(raw)
assert on_disk["data"] == MOCK_DATA
loaded = await store.async_load()
assert loaded == MOCK_DATA
await hass.async_stop(force=True)
async def test_compress_migrates_plain_to_compressed(tmpdir: py.path.local) -> None:
"""Test that saving with compress=True removes an existing plain file."""
loop = asyncio.get_running_loop()
config_dir = await loop.run_in_executor(None, tmpdir.mkdir, "temp_storage")
async with async_test_home_assistant(config_dir=config_dir.strpath) as hass:
plain_store = storage.Store(hass, MOCK_VERSION, MOCK_KEY)
await plain_store.async_save(MOCK_DATA)
storage_path = Path(config_dir.strpath) / ".storage"
plain_file = storage_path / MOCK_KEY
assert plain_file.is_file()
compressed_store = storage.Store(hass, MOCK_VERSION, MOCK_KEY, compress=True)
# Before the first compressed write the plain file is still the fallback.
loaded = await compressed_store.async_load()
assert loaded == MOCK_DATA
# Saving with compress=True should write .zst and remove the plain file.
await compressed_store.async_save(MOCK_DATA2)
zst_file = storage_path / (MOCK_KEY + ".zst")
assert zst_file.is_file()
assert not plain_file.exists()
loaded = await compressed_store.async_load()
assert loaded == MOCK_DATA2
await hass.async_stop(force=True)
async def test_compress_corrupt_file(
tmpdir: py.path.local, caplog: pytest.LogCaptureFixture
) -> None:
"""Test that a corrupt .zst file is handled gracefully."""
loop = asyncio.get_running_loop()
config_dir = await loop.run_in_executor(None, tmpdir.mkdir, "temp_storage")
async with async_test_home_assistant(config_dir=config_dir.strpath) as hass:
store = storage.Store(hass, MOCK_VERSION, MOCK_KEY, compress=True)
await store.async_save(MOCK_DATA)
storage_path = Path(config_dir.strpath) / ".storage"
zst_file = storage_path / (MOCK_KEY + ".zst")
def _corrupt_file() -> None:
zst_file.write_bytes(b"this is not valid zstd data")
await hass.async_add_executor_job(_corrupt_file)
loaded = await store.async_load()
assert loaded is None
assert "Unrecoverable error decoding storage" in caplog.text
files = await hass.async_add_executor_job(os.listdir, storage_path)
corrupt_files = [f for f in files if ".corrupt" in f]
assert len(corrupt_files) == 1
await hass.async_stop(force=True)
async def test_compress_store_manager_cache(tmpdir: py.path.local) -> None:
"""Test that compressed stores are cached and served by the store manager."""
loop = asyncio.get_running_loop()
def _setup_mock_storage() -> py.path.local:
config_dir = tmpdir.mkdir("temp_config")
tmp_storage = config_dir.mkdir(".storage")
payload = json.dumps(
{
"version": MOCK_VERSION,
"minor_version": 1,
"key": MOCK_KEY,
"data": MOCK_DATA,
}
).encode()
tmp_storage.join(MOCK_KEY + ".zst").write_binary(zstd.compress(payload))
return config_dir
config_dir = await loop.run_in_executor(None, _setup_mock_storage)
async with async_test_home_assistant(config_dir=config_dir.strpath) as hass:
store_manager = storage.get_internal_store_manager(hass)
await store_manager.async_initialize()
await store_manager.async_preload([MOCK_KEY + ".zst"])
# The cache key for a compressed store is key + ".zst".
result = store_manager.async_fetch(MOCK_KEY + ".zst")
assert result is not None
exists, cached_data = result
assert exists is True
assert cached_data["data"] == MOCK_DATA # type: ignore[index]
store = storage.Store(hass, MOCK_VERSION, MOCK_KEY, compress=True)
loaded = await store.async_load()
assert loaded == MOCK_DATA
await hass.async_stop(force=True)
+21
View File
@@ -140,6 +140,27 @@ def decorator_checker_fixture(hass_decorator, linter) -> BaseChecker:
return type_hint_checker
@pytest.fixture(name="hass_enforce_config_flow_no_polling", scope="package")
def hass_enforce_config_flow_no_polling_fixture() -> ModuleType:
"""Fixture to the content for the config_flow_no_polling check."""
return _load_plugin_from_file(
"hass_enforce_config_flow_no_polling",
"pylint/plugins/hass_enforce_config_flow_no_polling.py",
)
@pytest.fixture(name="enforce_config_flow_no_polling_checker")
def enforce_config_flow_no_polling_checker_fixture(
hass_enforce_config_flow_no_polling, linter
) -> BaseChecker:
"""Fixture to provide a config_flow_no_polling checker."""
checker = hass_enforce_config_flow_no_polling.HassEnforceConfigFlowNoPollingChecker(
linter
)
checker.module = "homeassistant.components.pylint_test"
return checker
@pytest.fixture(name="hass_enforce_runtime_data", scope="package")
def hass_enforce_runtime_data_fixture() -> ModuleType:
"""Fixture to the content for the hass_enforce_runtime_data check."""
@@ -0,0 +1,144 @@
"""Tests for pylint hass_enforce_config_flow_no_polling plugin."""
from __future__ import annotations
import astroid
from pylint.checkers import BaseChecker
from pylint.testutils.unittest_linter import UnittestLinter
from pylint.utils.ast_walker import ASTWalker
import pytest
from . import assert_no_messages
@pytest.mark.parametrize(
("code", "module_name"),
[
pytest.param(
"""
vol.Required(CONF_HOST)
""",
"homeassistant.components.test.config_flow",
id="non_polling_field",
),
pytest.param(
"""
vol.Optional("username")
""",
"homeassistant.components.test.config_flow",
id="non_polling_string_field",
),
pytest.param(
"""
vol.Optional(CONF_SCAN_INTERVAL)
""",
"homeassistant.components.test.sensor",
id="polling_in_sensor_not_flagged",
),
pytest.param(
"""
vol.Optional(CONF_SCAN_INTERVAL)
""",
"some.other.module",
id="outside_components",
),
pytest.param(
"""
vol.Optional("scan_interval", default=30)
""",
"homeassistant.components.test",
id="polling_in_init_not_flagged",
),
pytest.param(
"""
vol.Optional("check_interval")
""",
"homeassistant.components.test.config_flow",
id="unknown_interval_field",
),
pytest.param(
"""
vol.Optional("poll_frequency")
""",
"homeassistant.components.test.config_flow",
id="unknown_frequency_field",
),
],
)
def test_enforce_config_flow_no_polling(
linter: UnittestLinter,
enforce_config_flow_no_polling_checker: BaseChecker,
code: str,
module_name: str,
) -> None:
"""Good test cases."""
root_node = astroid.parse(code, module_name)
walker = ASTWalker(linter)
walker.add_checker(enforce_config_flow_no_polling_checker)
with assert_no_messages(linter):
walker.walk(root_node)
@pytest.mark.parametrize(
("code", "module_name"),
[
pytest.param(
"""
vol.Optional(CONF_SCAN_INTERVAL)
""",
"homeassistant.components.test.config_flow",
id="conf_scan_interval",
),
pytest.param(
"""
vol.Optional("scan_interval", default=30)
""",
"homeassistant.components.test.config_flow",
id="string_scan_interval",
),
pytest.param(
"""
vol.Required("update_interval")
""",
"homeassistant.components.test.config_flow",
id="update_interval",
),
pytest.param(
"""
vol.Optional("update_frequency", default=60)
""",
"homeassistant.components.test.config_flow",
id="update_frequency",
),
pytest.param(
"""
vol.Optional("refresh_interval")
""",
"homeassistant.components.test.config_flow",
id="refresh_interval",
),
pytest.param(
"""
vol.Optional(CONF_UPDATE_INTERVAL)
""",
"homeassistant.components.test.config_flow",
id="conf_update_interval",
),
],
)
def test_enforce_config_flow_no_polling_bad(
linter: UnittestLinter,
enforce_config_flow_no_polling_checker: BaseChecker,
code: str,
module_name: str,
) -> None:
"""Bad test cases."""
root_node = astroid.parse(code, module_name)
walker = ASTWalker(linter)
walker.add_checker(enforce_config_flow_no_polling_checker)
walker.walk(root_node)
messages = linter.release_messages()
assert len(messages) == 1
assert messages[0].msg_id == "hass-config-flow-polling-field"