Compare commits

..

2 Commits

Author SHA1 Message Date
Paulus Schoutsen
ee3210b239 Test raising OSError 2026-04-15 15:47:44 -04:00
Paulus Schoutsen
8293a10118 Add SerialSelector 2026-04-14 22:35:06 -04:00
44 changed files with 1407 additions and 1430 deletions

View File

@@ -8,7 +8,7 @@ repos:
- id: ruff-format
files: ^((homeassistant|pylint|script|tests)/.+)?[^/]+\.(py|pyi)$
- repo: https://github.com/codespell-project/codespell
rev: v2.4.2
rev: v2.4.1
hooks:
- id: codespell
args:

View File

@@ -152,7 +152,6 @@ _EXPERIMENTAL_CONDITION_PLATFORMS = {
"text",
"timer",
"todo",
"update",
"vacuum",
"valve",
"water_heater",

View File

@@ -54,7 +54,7 @@ class MotionMountErrorStatusSensor(MotionMountEntity, SensorEntity):
def __init__(
self, mm: motionmount.MotionMount, config_entry: MotionMountConfigEntry
) -> None:
"""Initialize sensor entity."""
"""Initialize sensor entiry."""
super().__init__(mm, config_entry)
self._attr_unique_id = f"{self._base_unique_id}-error-status"

View File

@@ -192,7 +192,7 @@ ID_TYPE = BigInteger().with_variant(sqlite.INTEGER, "sqlite")
# For MariaDB and MySQL we can use an unsigned integer type since it will fit 2**32
# for sqlite and postgresql we use a bigint
UINT_32_TYPE = BigInteger().with_variant(
mysql.INTEGER(unsigned=True),
mysql.INTEGER(unsigned=True), # type: ignore[no-untyped-call]
"mysql",
"mariadb",
)
@@ -206,12 +206,12 @@ JSONB_VARIANT_CAST = Text().with_variant(
)
DATETIME_TYPE = (
DateTime(timezone=True)
.with_variant(mysql.DATETIME(timezone=True, fsp=6), "mysql", "mariadb")
.with_variant(mysql.DATETIME(timezone=True, fsp=6), "mysql", "mariadb") # type: ignore[no-untyped-call]
.with_variant(FAST_PYSQLITE_DATETIME(), "sqlite") # type: ignore[no-untyped-call]
)
DOUBLE_TYPE = (
Float()
.with_variant(mysql.DOUBLE(asdecimal=False), "mysql", "mariadb")
.with_variant(mysql.DOUBLE(asdecimal=False), "mysql", "mariadb") # type: ignore[no-untyped-call]
.with_variant(oracle.DOUBLE_PRECISION(), "oracle")
.with_variant(postgresql.DOUBLE_PRECISION(), "postgresql")
)

View File

@@ -7,7 +7,7 @@
"iot_class": "local_push",
"quality_scale": "internal",
"requirements": [
"SQLAlchemy==2.0.49",
"SQLAlchemy==2.0.41",
"fnv-hash-fast==2.0.0",
"psutil-home-assistant==0.0.1"
]

View File

@@ -447,10 +447,10 @@ def setup_connection_for_dialect(
slow_dependent_subquery = False
if dialect_name == SupportedDialect.SQLITE:
if first_connection:
old_isolation = dbapi_connection.isolation_level
dbapi_connection.isolation_level = None
old_isolation = dbapi_connection.isolation_level # type: ignore[attr-defined]
dbapi_connection.isolation_level = None # type: ignore[attr-defined]
execute_on_connection(dbapi_connection, "PRAGMA journal_mode=WAL")
dbapi_connection.isolation_level = old_isolation
dbapi_connection.isolation_level = old_isolation # type: ignore[attr-defined]
# WAL mode only needs to be setup once
# instead of every time we open the sqlite connection
# as its persistent and isn't free to call every time.

View File

@@ -4,40 +4,27 @@ from __future__ import annotations
import asyncio
from collections.abc import Coroutine
from copy import deepcopy
from datetime import timedelta
import logging
from types import MappingProxyType
from typing import Any
import voluptuous as vol
from homeassistant.components.rest import RESOURCE_SCHEMA, create_rest_data_from_config
from homeassistant.components.sensor import CONF_STATE_CLASS, DOMAIN as SENSOR_DOMAIN
from homeassistant.config_entries import ConfigEntry, ConfigSubentry
from homeassistant.components.sensor import DOMAIN as SENSOR_DOMAIN
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
CONF_ATTRIBUTE,
CONF_AUTHENTICATION,
CONF_DEVICE_CLASS,
CONF_HEADERS,
CONF_NAME,
CONF_PASSWORD,
CONF_SCAN_INTERVAL,
CONF_TIMEOUT,
CONF_UNIQUE_ID,
CONF_UNIT_OF_MEASUREMENT,
CONF_USERNAME,
CONF_VALUE_TEMPLATE,
CONF_VERIFY_SSL,
Platform,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers import (
config_validation as cv,
device_registry as dr,
discovery,
entity_registry as er,
)
from homeassistant.helpers.device_registry import DeviceEntry
from homeassistant.helpers.trigger_template_entity import (
CONF_AVAILABILITY,
TEMPLATE_SENSOR_BASE_SCHEMA,
@@ -45,22 +32,11 @@ from homeassistant.helpers.trigger_template_entity import (
)
from homeassistant.helpers.typing import ConfigType
from .const import (
CONF_ADVANCED,
CONF_AUTH,
CONF_ENCODING,
CONF_INDEX,
CONF_SELECT,
DEFAULT_SCAN_INTERVAL,
DOMAIN,
PLATFORMS,
)
from .const import CONF_INDEX, CONF_SELECT, DEFAULT_SCAN_INTERVAL, DOMAIN, PLATFORMS
from .coordinator import ScrapeCoordinator
type ScrapeConfigEntry = ConfigEntry[ScrapeCoordinator]
_LOGGER = logging.getLogger(__name__)
SENSOR_SCHEMA = vol.Schema(
{
**TEMPLATE_SENSOR_BASE_SCHEMA.schema,
@@ -127,13 +103,7 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
async def async_setup_entry(hass: HomeAssistant, entry: ScrapeConfigEntry) -> bool:
"""Set up Scrape from a config entry."""
config: dict[str, Any] = dict(entry.options)
# Config flow uses sections but the COMBINED SCHEMA does not
# so we need to flatten the config here
config.update(config.pop(CONF_ADVANCED, {}))
config.update(config.pop(CONF_AUTH, {}))
rest_config: dict[str, Any] = COMBINED_SCHEMA(dict(config))
rest_config: dict[str, Any] = COMBINED_SCHEMA(dict(entry.options))
rest = create_rest_data_from_config(hass, rest_config)
coordinator = ScrapeCoordinator(
@@ -147,159 +117,17 @@ async def async_setup_entry(hass: HomeAssistant, entry: ScrapeConfigEntry) -> bo
entry.runtime_data = coordinator
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
entry.async_on_unload(entry.add_update_listener(update_listener))
return True
async def async_migrate_entry(hass: HomeAssistant, entry: ScrapeConfigEntry) -> bool:
"""Migrate old entry."""
if entry.version > 2:
# Don't migrate from future version
return False
if entry.version == 1:
old_to_new_sensor_id = {}
for sensor_config in entry.options[SENSOR_DOMAIN]:
# Create a new sub config entry per sensor
title = sensor_config[CONF_NAME]
old_unique_id = sensor_config[CONF_UNIQUE_ID]
subentry_config = {
CONF_INDEX: sensor_config[CONF_INDEX],
CONF_SELECT: sensor_config[CONF_SELECT],
CONF_ADVANCED: {},
}
for sensor_advanced_key in (
CONF_ATTRIBUTE,
CONF_VALUE_TEMPLATE,
CONF_AVAILABILITY,
CONF_DEVICE_CLASS,
CONF_STATE_CLASS,
CONF_UNIT_OF_MEASUREMENT,
):
if sensor_advanced_key not in sensor_config:
continue
subentry_config[CONF_ADVANCED][sensor_advanced_key] = sensor_config[
sensor_advanced_key
]
new_sub_entry = ConfigSubentry(
data=MappingProxyType(subentry_config),
subentry_type="entity",
title=title,
unique_id=None,
)
_LOGGER.debug(
"Migrating sensor %s with unique id %s to sub config entry id %s, old data %s, new data %s",
title,
old_unique_id,
new_sub_entry.subentry_id,
sensor_config,
subentry_config,
)
old_to_new_sensor_id[old_unique_id] = new_sub_entry.subentry_id
hass.config_entries.async_add_subentry(entry, new_sub_entry)
# Use the new sub config entry id as the unique id for the sensor entity
entity_reg = er.async_get(hass)
entities = er.async_entries_for_config_entry(entity_reg, entry.entry_id)
for entity in entities:
if (old_unique_id := entity.unique_id) in old_to_new_sensor_id:
new_unique_id = old_to_new_sensor_id[old_unique_id]
_LOGGER.debug(
"Migrating entity %s with unique id %s to new unique id %s",
entity.entity_id,
entity.unique_id,
new_unique_id,
)
entity_reg.async_update_entity(
entity.entity_id,
config_entry_id=entry.entry_id,
config_subentry_id=new_unique_id,
new_unique_id=new_unique_id,
)
# Use the new sub config entry id as the identifier for the sensor device
device_reg = dr.async_get(hass)
devices = dr.async_entries_for_config_entry(device_reg, entry.entry_id)
for device in devices:
for domain, identifier in device.identifiers:
if domain != DOMAIN or identifier not in old_to_new_sensor_id:
continue
subentry_id = old_to_new_sensor_id[identifier]
new_identifiers = deepcopy(device.identifiers)
new_identifiers.remove((domain, identifier))
new_identifiers.add((domain, old_to_new_sensor_id[identifier]))
_LOGGER.debug(
"Migrating device %s with identifiers %s to new identifiers %s",
device.id,
device.identifiers,
new_identifiers,
)
device_reg.async_update_device(
device.id,
add_config_entry_id=entry.entry_id,
add_config_subentry_id=subentry_id,
new_identifiers=new_identifiers,
)
# Removing None from the list of subentries if existing
# as the device should only belong to the subentry
# and not the main config entry
device_reg.async_update_device(
device.id,
remove_config_entry_id=entry.entry_id,
remove_config_subentry_id=None,
)
# Update the resource config
new_config_entry_data = dict(entry.options)
new_config_entry_data[CONF_AUTH] = {}
new_config_entry_data[CONF_ADVANCED] = {}
new_config_entry_data.pop(SENSOR_DOMAIN, None)
for resource_advanced_key in (
CONF_HEADERS,
CONF_VERIFY_SSL,
CONF_TIMEOUT,
CONF_ENCODING,
):
if resource_advanced_key in new_config_entry_data:
new_config_entry_data[CONF_ADVANCED][resource_advanced_key] = (
new_config_entry_data.pop(resource_advanced_key)
)
for resource_auth_key in (CONF_AUTHENTICATION, CONF_USERNAME, CONF_PASSWORD):
if resource_auth_key in new_config_entry_data:
new_config_entry_data[CONF_AUTH][resource_auth_key] = (
new_config_entry_data.pop(resource_auth_key)
)
_LOGGER.debug(
"Migrating config entry %s from version 1 to version 2 with data %s",
entry.entry_id,
new_config_entry_data,
)
hass.config_entries.async_update_entry(
entry, version=2, options=new_config_entry_data
)
return True
async def async_unload_entry(hass: HomeAssistant, entry: ScrapeConfigEntry) -> bool:
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload Scrape config entry."""
return await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
async def update_listener(hass: HomeAssistant, entry: ScrapeConfigEntry) -> None:
"""Handle config entry update."""
hass.config_entries.async_schedule_reload(entry.entry_id)
async def async_remove_config_entry_device(
hass: HomeAssistant, entry: ConfigEntry, device: dr.DeviceEntry
hass: HomeAssistant, entry: ConfigEntry, device: DeviceEntry
) -> bool:
"""Remove Scrape config entry from a device."""
entity_registry = er.async_get(hass)

View File

@@ -2,13 +2,12 @@
from __future__ import annotations
from copy import deepcopy
import logging
from typing import Any
from collections.abc import Mapping
from typing import Any, cast
import uuid
import voluptuous as vol
from homeassistant import data_entry_flow
from homeassistant.components.rest import create_rest_data_from_config
from homeassistant.components.rest.data import ( # pylint: disable=hass-component-root-import
DEFAULT_TIMEOUT,
@@ -19,17 +18,10 @@ from homeassistant.components.rest.schema import ( # pylint: disable=hass-compo
)
from homeassistant.components.sensor import (
CONF_STATE_CLASS,
DOMAIN as SENSOR_DOMAIN,
SensorDeviceClass,
SensorStateClass,
)
from homeassistant.config_entries import (
ConfigEntry,
ConfigFlow,
ConfigFlowResult,
ConfigSubentryFlow,
OptionsFlow,
SubentryFlowResult,
)
from homeassistant.const import (
CONF_ATTRIBUTE,
CONF_AUTHENTICATION,
@@ -41,6 +33,7 @@ from homeassistant.const import (
CONF_PAYLOAD,
CONF_RESOURCE,
CONF_TIMEOUT,
CONF_UNIQUE_ID,
CONF_UNIT_OF_MEASUREMENT,
CONF_USERNAME,
CONF_VALUE_TEMPLATE,
@@ -49,7 +42,15 @@ from homeassistant.const import (
HTTP_DIGEST_AUTHENTICATION,
UnitOfTemperature,
)
from homeassistant.core import HomeAssistant, callback
from homeassistant.core import async_get_hass
from homeassistant.helpers import config_validation as cv, entity_registry as er
from homeassistant.helpers.schema_config_entry_flow import (
SchemaCommonFlowHandler,
SchemaConfigFlowHandler,
SchemaFlowError,
SchemaFlowFormStep,
SchemaFlowMenuStep,
)
from homeassistant.helpers.selector import (
BooleanSelector,
NumberSelector,
@@ -68,8 +69,6 @@ from homeassistant.helpers.trigger_template_entity import CONF_AVAILABILITY
from . import COMBINED_SCHEMA
from .const import (
CONF_ADVANCED,
CONF_AUTH,
CONF_ENCODING,
CONF_INDEX,
CONF_SELECT,
@@ -79,212 +78,243 @@ from .const import (
DOMAIN,
)
_LOGGER = logging.getLogger(__name__)
RESOURCE_SETUP = {
vol.Required(CONF_RESOURCE): TextSelector(
TextSelectorConfig(type=TextSelectorType.URL)
),
vol.Optional(CONF_METHOD, default=DEFAULT_METHOD): SelectSelector(
SelectSelectorConfig(options=METHODS, mode=SelectSelectorMode.DROPDOWN)
),
vol.Optional(CONF_PAYLOAD): ObjectSelector(),
vol.Optional(CONF_AUTHENTICATION): SelectSelector(
SelectSelectorConfig(
options=[HTTP_BASIC_AUTHENTICATION, HTTP_DIGEST_AUTHENTICATION],
mode=SelectSelectorMode.DROPDOWN,
)
),
vol.Optional(CONF_USERNAME): TextSelector(),
vol.Optional(CONF_PASSWORD): TextSelector(
TextSelectorConfig(type=TextSelectorType.PASSWORD)
),
vol.Optional(CONF_HEADERS): ObjectSelector(),
vol.Optional(CONF_VERIFY_SSL, default=DEFAULT_VERIFY_SSL): BooleanSelector(),
vol.Optional(CONF_TIMEOUT, default=DEFAULT_TIMEOUT): NumberSelector(
NumberSelectorConfig(min=0, step=1, mode=NumberSelectorMode.BOX)
),
vol.Optional(CONF_ENCODING, default=DEFAULT_ENCODING): TextSelector(),
}
RESOURCE_SETUP = vol.Schema(
{
vol.Required(CONF_RESOURCE): TextSelector(
TextSelectorConfig(type=TextSelectorType.URL)
),
vol.Optional(CONF_METHOD, default=DEFAULT_METHOD): SelectSelector(
SelectSelectorConfig(options=METHODS, mode=SelectSelectorMode.DROPDOWN)
),
vol.Optional(CONF_PAYLOAD): ObjectSelector(),
vol.Required(CONF_AUTH): data_entry_flow.section(
vol.Schema(
{
vol.Optional(CONF_AUTHENTICATION): SelectSelector(
SelectSelectorConfig(
options=[
HTTP_BASIC_AUTHENTICATION,
HTTP_DIGEST_AUTHENTICATION,
],
mode=SelectSelectorMode.DROPDOWN,
)
),
vol.Optional(CONF_USERNAME): TextSelector(
TextSelectorConfig(
type=TextSelectorType.TEXT, autocomplete="username"
)
),
vol.Optional(CONF_PASSWORD): TextSelector(
TextSelectorConfig(
type=TextSelectorType.PASSWORD,
autocomplete="current-password",
)
),
}
),
data_entry_flow.SectionConfig(collapsed=True),
),
vol.Required(CONF_ADVANCED): data_entry_flow.section(
vol.Schema(
{
vol.Optional(CONF_HEADERS): ObjectSelector(),
vol.Optional(
CONF_VERIFY_SSL, default=DEFAULT_VERIFY_SSL
): BooleanSelector(),
vol.Optional(CONF_TIMEOUT, default=DEFAULT_TIMEOUT): NumberSelector(
NumberSelectorConfig(min=0, step=1, mode=NumberSelectorMode.BOX)
),
vol.Optional(
CONF_ENCODING, default=DEFAULT_ENCODING
): TextSelector(),
}
),
data_entry_flow.SectionConfig(collapsed=True),
),
}
)
SENSOR_SETUP = vol.Schema(
{
vol.Optional(CONF_NAME, default=DEFAULT_NAME): TextSelector(),
vol.Required(CONF_SELECT): TextSelector(),
vol.Optional(CONF_INDEX, default=0): vol.All(
NumberSelector(
NumberSelectorConfig(min=0, step=1, mode=NumberSelectorMode.BOX)
),
vol.Coerce(int),
),
vol.Required(CONF_ADVANCED): data_entry_flow.section(
vol.Schema(
{
vol.Optional(CONF_ATTRIBUTE): TextSelector(),
vol.Optional(CONF_VALUE_TEMPLATE): TemplateSelector(),
vol.Optional(CONF_AVAILABILITY): TemplateSelector(),
vol.Optional(CONF_DEVICE_CLASS): SelectSelector(
SelectSelectorConfig(
options=[
cls.value
for cls in SensorDeviceClass
if cls != SensorDeviceClass.ENUM
],
mode=SelectSelectorMode.DROPDOWN,
translation_key="device_class",
sort=True,
)
),
vol.Optional(CONF_STATE_CLASS): SelectSelector(
SelectSelectorConfig(
options=[cls.value for cls in SensorStateClass],
mode=SelectSelectorMode.DROPDOWN,
translation_key="state_class",
sort=True,
)
),
vol.Optional(CONF_UNIT_OF_MEASUREMENT): SelectSelector(
SelectSelectorConfig(
options=[cls.value for cls in UnitOfTemperature],
custom_value=True,
mode=SelectSelectorMode.DROPDOWN,
translation_key="unit_of_measurement",
sort=True,
)
),
}
),
data_entry_flow.SectionConfig(collapsed=True),
),
}
)
SENSOR_SETUP = {
vol.Required(CONF_SELECT): TextSelector(),
vol.Optional(CONF_INDEX, default=0): NumberSelector(
NumberSelectorConfig(min=0, step=1, mode=NumberSelectorMode.BOX)
),
vol.Optional(CONF_ATTRIBUTE): TextSelector(),
vol.Optional(CONF_VALUE_TEMPLATE): TemplateSelector(),
vol.Optional(CONF_AVAILABILITY): TemplateSelector(),
vol.Optional(CONF_DEVICE_CLASS): SelectSelector(
SelectSelectorConfig(
options=[
cls.value for cls in SensorDeviceClass if cls != SensorDeviceClass.ENUM
],
mode=SelectSelectorMode.DROPDOWN,
translation_key="device_class",
sort=True,
)
),
vol.Optional(CONF_STATE_CLASS): SelectSelector(
SelectSelectorConfig(
options=[cls.value for cls in SensorStateClass],
mode=SelectSelectorMode.DROPDOWN,
translation_key="state_class",
sort=True,
)
),
vol.Optional(CONF_UNIT_OF_MEASUREMENT): SelectSelector(
SelectSelectorConfig(
options=[cls.value for cls in UnitOfTemperature],
custom_value=True,
mode=SelectSelectorMode.DROPDOWN,
translation_key="unit_of_measurement",
sort=True,
)
),
}
async def validate_rest_setup(
hass: HomeAssistant, user_input: dict[str, Any]
handler: SchemaCommonFlowHandler, user_input: dict[str, Any]
) -> dict[str, Any]:
"""Validate rest setup."""
config = deepcopy(user_input)
config.update(config.pop(CONF_ADVANCED, {}))
config.update(config.pop(CONF_AUTH, {}))
rest_config: dict[str, Any] = COMBINED_SCHEMA(config)
hass = async_get_hass()
rest_config: dict[str, Any] = COMBINED_SCHEMA(user_input)
try:
rest = create_rest_data_from_config(hass, rest_config)
await rest.async_update()
except Exception:
_LOGGER.exception("Error when getting resource %s", config[CONF_RESOURCE])
return {"base": "resource_error"}
except Exception as err:
raise SchemaFlowError("resource_error") from err
if rest.data is None:
return {"base": "no_data"}
raise SchemaFlowError("resource_error")
return user_input
async def validate_sensor_setup(
handler: SchemaCommonFlowHandler, user_input: dict[str, Any]
) -> dict[str, Any]:
"""Validate sensor input."""
user_input[CONF_INDEX] = int(user_input[CONF_INDEX])
user_input[CONF_UNIQUE_ID] = str(uuid.uuid1())
# Standard behavior is to merge the result with the options.
# In this case, we want to add a sub-item so we update the options directly.
sensors: list[dict[str, Any]] = handler.options.setdefault(SENSOR_DOMAIN, [])
sensors.append(user_input)
return {}
class ScrapeConfigFlow(ConfigFlow, domain=DOMAIN):
"""Scrape configuration flow."""
VERSION = 2
@staticmethod
@callback
def async_get_options_flow(config_entry: ConfigEntry) -> ScrapeOptionFlow:
"""Get the options flow for this handler."""
return ScrapeOptionFlow()
@classmethod
@callback
def async_get_supported_subentry_types(
cls, config_entry: ConfigEntry
) -> dict[str, type[ConfigSubentryFlow]]:
"""Return subentries supported by this handler."""
return {"entity": ScrapeSubentryFlowHandler}
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""User flow to create the main config entry."""
errors: dict[str, str] = {}
if user_input is not None:
errors = await validate_rest_setup(self.hass, user_input)
title = user_input[CONF_RESOURCE]
if not errors:
return self.async_create_entry(data={}, options=user_input, title=title)
return self.async_show_form(
step_id="user",
data_schema=self.add_suggested_values_to_schema(
RESOURCE_SETUP, user_input or {}
),
errors=errors,
)
async def validate_select_sensor(
handler: SchemaCommonFlowHandler, user_input: dict[str, Any]
) -> dict[str, Any]:
"""Store sensor index in flow state."""
handler.flow_state["_idx"] = int(user_input[CONF_INDEX])
return {}
class ScrapeOptionFlow(OptionsFlow):
"""Scrape Options flow."""
async def async_step_init(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Manage Scrape options."""
errors: dict[str, str] = {}
if user_input is not None:
errors = await validate_rest_setup(self.hass, user_input)
if not errors:
return self.async_create_entry(data=user_input)
return self.async_show_form(
step_id="init",
data_schema=self.add_suggested_values_to_schema(
RESOURCE_SETUP,
user_input or self.config_entry.options,
),
errors=errors,
)
async def get_select_sensor_schema(handler: SchemaCommonFlowHandler) -> vol.Schema:
"""Return schema for selecting a sensor."""
return vol.Schema(
{
vol.Required(CONF_INDEX): vol.In(
{
str(index): config[CONF_NAME]
for index, config in enumerate(handler.options[SENSOR_DOMAIN])
},
)
}
)
class ScrapeSubentryFlowHandler(ConfigSubentryFlow):
"""Handle subentry flow."""
async def get_edit_sensor_suggested_values(
handler: SchemaCommonFlowHandler,
) -> dict[str, Any]:
"""Return suggested values for sensor editing."""
idx: int = handler.flow_state["_idx"]
return dict(handler.options[SENSOR_DOMAIN][idx])
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> SubentryFlowResult:
"""User flow to create a sensor subentry."""
if user_input is not None:
title = user_input.pop("name")
return self.async_create_entry(data=user_input, title=title)
return self.async_show_form(
step_id="user",
data_schema=self.add_suggested_values_to_schema(
SENSOR_SETUP, user_input or {}
),
)
async def validate_sensor_edit(
handler: SchemaCommonFlowHandler, user_input: dict[str, Any]
) -> dict[str, Any]:
"""Update edited sensor."""
user_input[CONF_INDEX] = int(user_input[CONF_INDEX])
# Standard behavior is to merge the result with the options.
# In this case, we want to add a sub-item so we update the options directly,
# including popping omitted optional schema items.
idx: int = handler.flow_state["_idx"]
handler.options[SENSOR_DOMAIN][idx].update(user_input)
for key in DATA_SCHEMA_EDIT_SENSOR.schema:
if isinstance(key, vol.Optional) and key not in user_input:
# Key not present, delete keys old value (if present) too
handler.options[SENSOR_DOMAIN][idx].pop(key, None)
return {}
async def get_remove_sensor_schema(handler: SchemaCommonFlowHandler) -> vol.Schema:
"""Return schema for sensor removal."""
return vol.Schema(
{
vol.Required(CONF_INDEX): cv.multi_select(
{
str(index): config[CONF_NAME]
for index, config in enumerate(handler.options[SENSOR_DOMAIN])
},
)
}
)
async def validate_remove_sensor(
handler: SchemaCommonFlowHandler, user_input: dict[str, Any]
) -> dict[str, Any]:
"""Validate remove sensor."""
removed_indexes: set[str] = set(user_input[CONF_INDEX])
# Standard behavior is to merge the result with the options.
# In this case, we want to remove sub-items so we update the options directly.
entity_registry = er.async_get(handler.parent_handler.hass)
sensors: list[dict[str, Any]] = []
sensor: dict[str, Any]
for index, sensor in enumerate(handler.options[SENSOR_DOMAIN]):
if str(index) not in removed_indexes:
sensors.append(sensor)
elif entity_id := entity_registry.async_get_entity_id(
SENSOR_DOMAIN, DOMAIN, sensor[CONF_UNIQUE_ID]
):
entity_registry.async_remove(entity_id)
handler.options[SENSOR_DOMAIN] = sensors
return {}
DATA_SCHEMA_RESOURCE = vol.Schema(RESOURCE_SETUP)
DATA_SCHEMA_EDIT_SENSOR = vol.Schema(SENSOR_SETUP)
DATA_SCHEMA_SENSOR = vol.Schema(
{
vol.Optional(CONF_NAME, default=DEFAULT_NAME): TextSelector(),
**SENSOR_SETUP,
}
)
CONFIG_FLOW = {
"user": SchemaFlowFormStep(
schema=DATA_SCHEMA_RESOURCE,
next_step="sensor",
validate_user_input=validate_rest_setup,
),
"sensor": SchemaFlowFormStep(
schema=DATA_SCHEMA_SENSOR,
validate_user_input=validate_sensor_setup,
),
}
OPTIONS_FLOW = {
"init": SchemaFlowMenuStep(
["resource", "add_sensor", "select_edit_sensor", "remove_sensor"]
),
"resource": SchemaFlowFormStep(
DATA_SCHEMA_RESOURCE,
validate_user_input=validate_rest_setup,
),
"add_sensor": SchemaFlowFormStep(
DATA_SCHEMA_SENSOR,
suggested_values=None,
validate_user_input=validate_sensor_setup,
),
"select_edit_sensor": SchemaFlowFormStep(
get_select_sensor_schema,
suggested_values=None,
validate_user_input=validate_select_sensor,
next_step="edit_sensor",
),
"edit_sensor": SchemaFlowFormStep(
DATA_SCHEMA_EDIT_SENSOR,
suggested_values=get_edit_sensor_suggested_values,
validate_user_input=validate_sensor_edit,
),
"remove_sensor": SchemaFlowFormStep(
get_remove_sensor_schema,
suggested_values=None,
validate_user_input=validate_remove_sensor,
),
}
class ScrapeConfigFlowHandler(SchemaConfigFlowHandler, domain=DOMAIN):
"""Handle a config flow for Scrape."""
config_flow = CONFIG_FLOW
options_flow = OPTIONS_FLOW
options_flow_reloads = True
def async_config_entry_title(self, options: Mapping[str, Any]) -> str:
"""Return config entry title."""
return cast(str, options[CONF_RESOURCE])

View File

@@ -14,8 +14,6 @@ DEFAULT_SCAN_INTERVAL = timedelta(minutes=10)
PLATFORMS = [Platform.SENSOR]
CONF_ADVANCED = "advanced"
CONF_AUTH = "auth"
CONF_ENCODING = "encoding"
CONF_SELECT = "select"
CONF_INDEX = "index"

View File

@@ -1,21 +0,0 @@
{
"config": {
"step": {
"user": {
"sections": {
"advanced": "mdi:cog",
"auth": "mdi:lock"
}
}
}
},
"options": {
"step": {
"init": {
"sections": {
"advanced": "mdi:cog"
}
}
}
}
}

View File

@@ -46,10 +46,9 @@ TRIGGER_ENTITY_OPTIONS = (
CONF_AVAILABILITY,
CONF_DEVICE_CLASS,
CONF_ICON,
CONF_NAME,
CONF_PICTURE,
CONF_STATE_CLASS,
CONF_UNIQUE_ID,
CONF_STATE_CLASS,
CONF_UNIT_OF_MEASUREMENT,
)
@@ -71,7 +70,7 @@ async def async_setup_platform(
entities: list[ScrapeSensor] = []
for sensor_config in sensors_config:
trigger_entity_config = {}
trigger_entity_config = {CONF_NAME: sensor_config[CONF_NAME]}
for key in TRIGGER_ENTITY_OPTIONS:
if key not in sensor_config:
continue
@@ -99,24 +98,23 @@ async def async_setup_entry(
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up the Scrape sensor entry."""
coordinator = entry.runtime_data
for subentry in entry.subentries.values():
sensor = dict(subentry.data)
sensor.update(sensor.pop("advanced", {}))
sensor[CONF_UNIQUE_ID] = subentry.subentry_id
sensor[CONF_NAME] = subentry.title
entities: list = []
coordinator = entry.runtime_data
config = dict(entry.options)
for sensor in config["sensor"]:
sensor_config: ConfigType = vol.Schema(
TEMPLATE_SENSOR_BASE_SCHEMA.schema, extra=vol.ALLOW_EXTRA
)(sensor)
name: str = sensor_config[CONF_NAME]
value_string: str | None = sensor_config.get(CONF_VALUE_TEMPLATE)
value_template: ValueTemplate | None = (
ValueTemplate(value_string, hass) if value_string is not None else None
)
trigger_entity_config: dict[str, str | Template | None] = {}
trigger_entity_config: dict[str, str | Template | None] = {CONF_NAME: name}
for key in TRIGGER_ENTITY_OPTIONS:
if key not in sensor_config:
continue
@@ -125,22 +123,21 @@ async def async_setup_entry(
continue
trigger_entity_config[key] = sensor_config[key]
async_add_entities(
[
ScrapeSensor(
hass,
coordinator,
trigger_entity_config,
sensor_config[CONF_SELECT],
sensor_config.get(CONF_ATTRIBUTE),
sensor_config[CONF_INDEX],
value_template,
False,
)
],
config_subentry_id=subentry.subentry_id,
entities.append(
ScrapeSensor(
hass,
coordinator,
trigger_entity_config,
sensor_config[CONF_SELECT],
sensor_config.get(CONF_ATTRIBUTE),
sensor_config[CONF_INDEX],
value_template,
False,
)
)
async_add_entities(entities)
class ScrapeSensor(CoordinatorEntity[ScrapeCoordinator], ManualTriggerSensorEntity):
"""Representation of a web scrape sensor."""

View File

@@ -4,140 +4,134 @@
"already_configured": "[%key:common::config_flow::abort::already_configured_account%]"
},
"error": {
"no_data": "Rest data is empty. Verify your configuration",
"resource_error": "Could not update rest data. Verify your configuration"
},
"step": {
"user": {
"sensor": {
"data": {
"method": "Method",
"payload": "Payload",
"resource": "Resource"
"attribute": "Attribute",
"availability": "Availability template",
"device_class": "Device class",
"index": "Index",
"name": "[%key:common::config_flow::data::name%]",
"select": "Select",
"state_class": "State class",
"unit_of_measurement": "Unit of measurement",
"value_template": "Value template"
},
"data_description": {
"payload": "Payload to use when method is POST.",
"resource": "The URL to the website that contains the value."
},
"sections": {
"advanced": {
"data": {
"encoding": "Character encoding",
"headers": "Headers",
"timeout": "Timeout",
"verify_ssl": "[%key:common::config_flow::data::verify_ssl%]"
},
"data_description": {
"encoding": "Character encoding to use. Defaults to UTF-8.",
"headers": "Headers to use for the web request.",
"timeout": "Timeout for connection to website.",
"verify_ssl": "Enables/disables verification of SSL/TLS certificate, for example if it is self-signed."
},
"description": "Provide additional advanced settings for the resource.",
"name": "Advanced settings"
},
"auth": {
"data": {
"authentication": "Select authentication method",
"password": "[%key:common::config_flow::data::password%]",
"username": "[%key:common::config_flow::data::username%]"
},
"data_description": {
"authentication": "Type of the HTTP authentication. Either basic or digest."
},
"description": "Provide authentication details to access the resource.",
"name": "Authentication settings"
}
"attribute": "Get value of an attribute on the selected tag.",
"availability": "Defines a template to get the availability of the sensor.",
"device_class": "The type/class of the sensor to set the icon in the frontend.",
"index": "Defines which of the elements returned by the CSS selector to use.",
"select": "Defines what tag to search for. Check Beautifulsoup CSS selectors for details.",
"state_class": "The state_class of the sensor.",
"unit_of_measurement": "Choose unit of measurement or create your own.",
"value_template": "Defines a template to get the state of the sensor."
}
}
}
},
"config_subentries": {
"entity": {
"entry_type": "Sensor",
"initiate_flow": {
"user": "Add sensor"
},
"step": {
"user": {
"data": {
"index": "Index",
"select": "Select"
},
"data_description": {
"index": "Defines which of the elements returned by the CSS selector to use.",
"select": "Defines what tag to search for. Check Beautifulsoup CSS selectors for details."
},
"sections": {
"advanced": {
"data": {
"attribute": "Attribute",
"availability": "Availability template",
"device_class": "Device class",
"state_class": "State class",
"unit_of_measurement": "Unit of measurement",
"value_template": "Value template"
},
"data_description": {
"attribute": "Get value of an attribute on the selected tag.",
"availability": "Defines a template to get the availability of the sensor.",
"device_class": "The type/class of the sensor to set the icon in the frontend.",
"state_class": "The state_class of the sensor.",
"unit_of_measurement": "Choose unit of measurement or create your own.",
"value_template": "Defines a template to get the state of the sensor."
},
"description": "Provide additional advanced settings for the sensor.",
"name": "Advanced settings"
}
}
"user": {
"data": {
"authentication": "Select authentication method",
"encoding": "Character encoding",
"headers": "Headers",
"method": "Method",
"password": "[%key:common::config_flow::data::password%]",
"payload": "Payload",
"resource": "Resource",
"timeout": "Timeout",
"username": "[%key:common::config_flow::data::username%]",
"verify_ssl": "[%key:common::config_flow::data::verify_ssl%]"
},
"data_description": {
"authentication": "Type of the HTTP authentication. Either basic or digest.",
"encoding": "Character encoding to use. Defaults to UTF-8.",
"headers": "Headers to use for the web request.",
"payload": "Payload to use when method is POST.",
"resource": "The URL to the website that contains the value.",
"timeout": "Timeout for connection to website.",
"verify_ssl": "Enables/disables verification of SSL/TLS certificate, for example if it is self-signed."
}
}
}
},
"options": {
"error": {
"no_data": "[%key:component::scrape::config::error::no_data%]",
"resource_error": "[%key:component::scrape::config::error::resource_error%]"
},
"step": {
"init": {
"add_sensor": {
"data": {
"method": "[%key:component::scrape::config::step::user::data::method%]",
"payload": "[%key:component::scrape::config::step::user::data::payload%]",
"resource": "[%key:component::scrape::config::step::user::data::resource%]"
"attribute": "[%key:component::scrape::config::step::sensor::data::attribute%]",
"availability": "[%key:component::scrape::config::step::sensor::data::availability%]",
"device_class": "[%key:component::scrape::config::step::sensor::data::device_class%]",
"index": "[%key:component::scrape::config::step::sensor::data::index%]",
"name": "[%key:common::config_flow::data::name%]",
"select": "[%key:component::scrape::config::step::sensor::data::select%]",
"state_class": "[%key:component::scrape::config::step::sensor::data::state_class%]",
"unit_of_measurement": "[%key:component::scrape::config::step::sensor::data::unit_of_measurement%]",
"value_template": "[%key:component::scrape::config::step::sensor::data::value_template%]"
},
"data_description": {
"payload": "[%key:component::scrape::config::step::user::data_description::payload%]",
"resource": "[%key:component::scrape::config::step::user::data_description::resource%]"
"attribute": "[%key:component::scrape::config::step::sensor::data_description::attribute%]",
"availability": "[%key:component::scrape::config::step::sensor::data_description::availability%]",
"device_class": "[%key:component::scrape::config::step::sensor::data_description::device_class%]",
"index": "[%key:component::scrape::config::step::sensor::data_description::index%]",
"select": "[%key:component::scrape::config::step::sensor::data_description::select%]",
"state_class": "[%key:component::scrape::config::step::sensor::data_description::state_class%]",
"unit_of_measurement": "[%key:component::scrape::config::step::sensor::data_description::unit_of_measurement%]",
"value_template": "[%key:component::scrape::config::step::sensor::data_description::value_template%]"
}
},
"edit_sensor": {
"data": {
"attribute": "[%key:component::scrape::config::step::sensor::data::attribute%]",
"availability": "[%key:component::scrape::config::step::sensor::data::availability%]",
"device_class": "[%key:component::scrape::config::step::sensor::data::device_class%]",
"index": "[%key:component::scrape::config::step::sensor::data::index%]",
"name": "[%key:common::config_flow::data::name%]",
"select": "[%key:component::scrape::config::step::sensor::data::select%]",
"state_class": "[%key:component::scrape::config::step::sensor::data::state_class%]",
"unit_of_measurement": "[%key:component::scrape::config::step::sensor::data::unit_of_measurement%]",
"value_template": "[%key:component::scrape::config::step::sensor::data::value_template%]"
},
"sections": {
"advanced": {
"data": {
"encoding": "[%key:component::scrape::config::step::user::sections::advanced::data::encoding%]",
"headers": "[%key:component::scrape::config::step::user::sections::advanced::data::headers%]",
"timeout": "[%key:component::scrape::config::step::user::sections::advanced::data::timeout%]",
"verify_ssl": "[%key:common::config_flow::data::verify_ssl%]"
},
"data_description": {
"encoding": "[%key:component::scrape::config::step::user::sections::advanced::data_description::encoding%]",
"headers": "[%key:component::scrape::config::step::user::sections::advanced::data_description::headers%]",
"timeout": "[%key:component::scrape::config::step::user::sections::advanced::data_description::timeout%]",
"verify_ssl": "[%key:component::scrape::config::step::user::sections::advanced::data_description::verify_ssl%]"
},
"description": "[%key:component::scrape::config::step::user::sections::advanced::description%]",
"name": "[%key:component::scrape::config::step::user::sections::advanced::name%]"
},
"auth": {
"data": {
"authentication": "[%key:component::scrape::config::step::user::sections::auth::data::authentication%]",
"password": "[%key:common::config_flow::data::password%]",
"username": "[%key:common::config_flow::data::username%]"
},
"data_description": {
"authentication": "[%key:component::scrape::config::step::user::sections::auth::data_description::authentication%]"
},
"description": "[%key:component::scrape::config::step::user::sections::auth::description%]",
"name": "[%key:component::scrape::config::step::user::sections::auth::name%]"
}
"data_description": {
"attribute": "[%key:component::scrape::config::step::sensor::data_description::attribute%]",
"availability": "[%key:component::scrape::config::step::sensor::data_description::availability%]",
"device_class": "[%key:component::scrape::config::step::sensor::data_description::device_class%]",
"index": "[%key:component::scrape::config::step::sensor::data_description::index%]",
"select": "[%key:component::scrape::config::step::sensor::data_description::select%]",
"state_class": "[%key:component::scrape::config::step::sensor::data_description::state_class%]",
"unit_of_measurement": "[%key:component::scrape::config::step::sensor::data_description::unit_of_measurement%]",
"value_template": "[%key:component::scrape::config::step::sensor::data_description::value_template%]"
}
},
"init": {
"menu_options": {
"add_sensor": "Add sensor",
"remove_sensor": "Remove sensor",
"resource": "Configure resource",
"select_edit_sensor": "Configure sensor"
}
},
"resource": {
"data": {
"authentication": "[%key:component::scrape::config::step::user::data::authentication%]",
"encoding": "[%key:component::scrape::config::step::user::data::encoding%]",
"headers": "[%key:component::scrape::config::step::user::data::headers%]",
"method": "[%key:component::scrape::config::step::user::data::method%]",
"password": "[%key:common::config_flow::data::password%]",
"payload": "[%key:component::scrape::config::step::user::data::payload%]",
"resource": "[%key:component::scrape::config::step::user::data::resource%]",
"timeout": "[%key:component::scrape::config::step::user::data::timeout%]",
"username": "[%key:common::config_flow::data::username%]",
"verify_ssl": "[%key:common::config_flow::data::verify_ssl%]"
},
"data_description": {
"authentication": "[%key:component::scrape::config::step::user::data_description::authentication%]",
"encoding": "[%key:component::scrape::config::step::user::data_description::encoding%]",
"headers": "[%key:component::scrape::config::step::user::data_description::headers%]",
"payload": "[%key:component::scrape::config::step::user::data_description::payload%]",
"resource": "[%key:component::scrape::config::step::user::data_description::resource%]",
"timeout": "[%key:component::scrape::config::step::user::data_description::timeout%]",
"verify_ssl": "[%key:component::scrape::config::step::user::data_description::verify_ssl%]"
}
}
}

View File

@@ -807,7 +807,7 @@ class ShellyConfigFlow(ConfigFlow, domain=DOMAIN):
)
ssid_options = [network["ssid"] for network in sorted_networks]
# Preselect SSID if returning from failed provisioning attempt
# Pre-select SSID if returning from failed provisioning attempt
suggested_values: dict[str, Any] = {}
if self.selected_ssid:
suggested_values[CONF_SSID] = self.selected_ssid
@@ -1086,7 +1086,7 @@ class ShellyConfigFlow(ConfigFlow, domain=DOMAIN):
) -> ConfigFlowResult:
"""Handle failed provisioning - allow retry."""
if user_input is not None:
# User wants to retry - keep selected_ssid so it's preselected
# User wants to retry - keep selected_ssid so it's pre-selected
self.wifi_networks = []
return await self.async_step_wifi_scan()

View File

@@ -6,5 +6,5 @@
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/sql",
"iot_class": "local_polling",
"requirements": ["SQLAlchemy==2.0.49", "sqlparse==0.5.5"]
"requirements": ["SQLAlchemy==2.0.41", "sqlparse==0.5.5"]
}

View File

@@ -24,9 +24,8 @@ from homeassistant.components.recorder.statistics import (
statistics_during_period,
)
from homeassistant.const import UnitOfEnergy
from homeassistant.core import CALLBACK_TYPE, HomeAssistant
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryAuthFailed
from homeassistant.helpers.event import async_track_point_in_utc_time
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from homeassistant.util import dt as dt_util
from homeassistant.util.unit_conversion import EnergyConverter
@@ -93,40 +92,11 @@ def _build_home_data(home: tibber.TibberHome) -> TibberHomeData:
return result
class TibberCoordinator[_DataT](DataUpdateCoordinator[_DataT]):
"""Base Tibber coordinator."""
class TibberDataCoordinator(DataUpdateCoordinator[None]):
"""Handle Tibber data and insert statistics."""
config_entry: TibberConfigEntry
def __init__(
self,
hass: HomeAssistant,
config_entry: TibberConfigEntry,
*,
name: str,
update_interval: timedelta,
) -> None:
"""Initialize the coordinator."""
super().__init__(
hass,
_LOGGER,
config_entry=config_entry,
name=name,
update_interval=update_interval,
)
self._runtime_data = config_entry.runtime_data
async def _async_get_client(self) -> tibber.Tibber:
"""Get the Tibber client with error handling."""
try:
return await self._runtime_data.async_get_client(self.hass)
except (ClientError, TimeoutError, tibber.exceptions.HttpExceptionError) as err:
raise UpdateFailed(f"Unable to create Tibber client: {err}") from err
class TibberDataCoordinator(TibberCoordinator[None]):
"""Handle Tibber data and insert statistics."""
def __init__(
self,
hass: HomeAssistant,
@@ -136,14 +106,17 @@ class TibberDataCoordinator(TibberCoordinator[None]):
"""Initialize the data handler."""
super().__init__(
hass,
config_entry,
_LOGGER,
config_entry=config_entry,
name=f"Tibber {tibber_connection.name}",
update_interval=timedelta(minutes=20),
)
async def _async_update_data(self) -> None:
"""Update data via API."""
tibber_connection = await self._async_get_client()
tibber_connection = await self.config_entry.runtime_data.async_get_client(
self.hass
)
try:
await tibber_connection.fetch_consumption_data_active_homes()
@@ -159,7 +132,9 @@ class TibberDataCoordinator(TibberCoordinator[None]):
async def _insert_statistics(self) -> None:
"""Insert Tibber statistics."""
tibber_connection = await self._async_get_client()
tibber_connection = await self.config_entry.runtime_data.async_get_client(
self.hass
)
for home in tibber_connection.get_homes():
sensors: list[tuple[str, bool, str | None, str]] = []
if home.hourly_consumption_data:
@@ -279,9 +254,11 @@ class TibberDataCoordinator(TibberCoordinator[None]):
async_add_external_statistics(self.hass, metadata, statistics)
class TibberPriceCoordinator(TibberCoordinator[dict[str, TibberHomeData]]):
class TibberPriceCoordinator(DataUpdateCoordinator[dict[str, TibberHomeData]]):
"""Handle Tibber price data and insert statistics."""
config_entry: TibberConfigEntry
def __init__(
self,
hass: HomeAssistant,
@@ -290,45 +267,12 @@ class TibberPriceCoordinator(TibberCoordinator[dict[str, TibberHomeData]]):
"""Initialize the price coordinator."""
super().__init__(
hass,
config_entry,
_LOGGER,
config_entry=config_entry,
name=f"{DOMAIN} price",
update_interval=timedelta(minutes=1),
)
self._tomorrow_price_poll_threshold_seconds = random.uniform(
3600 * 14, 3600 * 23
)
self._unsub_tomorrow_price_poll: CALLBACK_TYPE | None = None
initial_tomorrow_price_poll = dt_util.start_of_local_day() + timedelta(
seconds=self._tomorrow_price_poll_threshold_seconds
)
if initial_tomorrow_price_poll <= dt_util.utcnow():
initial_tomorrow_price_poll += timedelta(days=1)
self._schedule_tomorrow_price_poll(initial_tomorrow_price_poll)
self._tibber_homes: list[tibber.TibberHome] | None = None
async def async_shutdown(self) -> None:
"""Cancel any scheduled call, and ignore new runs."""
await super().async_shutdown()
if self._unsub_tomorrow_price_poll:
self._unsub_tomorrow_price_poll()
self._unsub_tomorrow_price_poll = None
def _schedule_tomorrow_price_poll(self, point_in_time: datetime) -> None:
"""Schedule the next one-shot tomorrow price poll."""
if point_in_time <= (now := dt_util.utcnow()):
point_in_time = now + timedelta(seconds=1)
if self._unsub_tomorrow_price_poll:
self._unsub_tomorrow_price_poll()
self._unsub_tomorrow_price_poll = async_track_point_in_utc_time(
self.hass,
self._async_handle_tomorrow_price_poll,
point_in_time,
)
async def _async_handle_tomorrow_price_poll(self, _: datetime) -> None:
"""Handle the scheduled tomorrow price poll."""
self._unsub_tomorrow_price_poll = None
await self._fetch_data()
self._tomorrow_price_poll_threshold_seconds = random.uniform(0, 3600 * 10)
def _time_until_next_15_minute(self) -> timedelta:
"""Return time until the next 15-minute boundary (0, 15, 30, 45) in UTC."""
@@ -345,24 +289,10 @@ class TibberPriceCoordinator(TibberCoordinator[dict[str, TibberHomeData]]):
return next_run - now
async def _async_update_data(self) -> dict[str, TibberHomeData]:
if self._tibber_homes is None:
await self._fetch_data()
homes = self._tibber_homes
if homes is None:
raise UpdateFailed("No Tibber homes available")
result = {home.home_id: _build_home_data(home) for home in homes}
self.update_interval = self._time_until_next_15_minute()
return result
async def _fetch_data(self) -> None:
"""Fetch latest price data via API and update cached home data."""
self._schedule_tomorrow_price_poll(
dt_util.utcnow() + timedelta(seconds=random.uniform(60, 60 * 10))
"""Update data via API and return per-home data for sensors."""
tibber_connection = await self.config_entry.runtime_data.async_get_client(
self.hass
)
tibber_connection = await self._async_get_client()
active_homes = tibber_connection.get_homes(only_active=True)
now = dt_util.now()
@@ -400,36 +330,28 @@ class TibberPriceCoordinator(TibberCoordinator[dict[str, TibberHomeData]]):
return False
homes_to_update = [home for home in active_homes if _needs_update(home)]
try:
if homes_to_update:
await asyncio.gather(
*(home.update_info_and_price_info() for home in homes_to_update)
)
except tibber.exceptions.RateLimitExceededError as err:
self._schedule_tomorrow_price_poll(
dt_util.utcnow() + timedelta(seconds=err.retry_after)
)
raise UpdateFailed(
f"Rate limit exceeded, retry after {err.retry_after} seconds",
retry_after=err.retry_after,
) from err
except tibber.exceptions.HttpExceptionError as err:
raise UpdateFailed(f"Error communicating with API ({err})") from err
except tibber.RetryableHttpExceptionError as err:
raise UpdateFailed(f"Error communicating with API ({err.status})") from err
except tibber.FatalHttpExceptionError as err:
raise UpdateFailed(f"Error communicating with API ({err.status})") from err
self._schedule_tomorrow_price_poll(
dt_util.start_of_local_day(now)
+ timedelta(days=1, seconds=self._tomorrow_price_poll_threshold_seconds)
)
self._tibber_homes = active_homes
result = {home.home_id: _build_home_data(home) for home in active_homes}
self.update_interval = self._time_until_next_15_minute()
return result
class TibberFetchPriceCoordinator(TibberPriceCoordinator):
"""Backward-compatible alias for the merged price fetch coordinator."""
class TibberDataAPICoordinator(TibberCoordinator[dict[str, TibberDevice]]):
class TibberDataAPICoordinator(DataUpdateCoordinator[dict[str, TibberDevice]]):
"""Fetch and cache Tibber Data API device capabilities."""
config_entry: TibberConfigEntry
def __init__(
self,
hass: HomeAssistant,
@@ -438,10 +360,12 @@ class TibberDataAPICoordinator(TibberCoordinator[dict[str, TibberDevice]]):
"""Initialize the coordinator."""
super().__init__(
hass,
entry,
_LOGGER,
name=f"{DOMAIN} Data API",
update_interval=timedelta(minutes=1),
config_entry=entry,
)
self._runtime_data = entry.runtime_data
self.sensors_by_device: dict[str, dict[str, tibber.data_api.Sensor]] = {}
def _build_sensor_lookup(self, devices: dict[str, TibberDevice]) -> None:
@@ -459,6 +383,15 @@ class TibberDataAPICoordinator(TibberCoordinator[dict[str, TibberDevice]]):
return device_sensors.get(sensor_id)
return None
async def _async_get_client(self) -> tibber.Tibber:
"""Get the Tibber client with error handling."""
try:
return await self._runtime_data.async_get_client(self.hass)
except ConfigEntryAuthFailed:
raise
except (ClientError, TimeoutError, tibber.UserAgentMissingError) as err:
raise UpdateFailed(f"Unable to create Tibber client: {err}") from err
async def _async_setup(self) -> None:
"""Initial load of Tibber Data API devices."""
client = await self._async_get_client()

View File

@@ -1,17 +0,0 @@
"""Provides conditions for updates."""
from homeassistant.const import STATE_OFF, STATE_ON
from homeassistant.core import HomeAssistant
from homeassistant.helpers.condition import Condition, make_entity_state_condition
from .const import DOMAIN
CONDITIONS: dict[str, type[Condition]] = {
"is_available": make_entity_state_condition(DOMAIN, STATE_ON),
"is_not_available": make_entity_state_condition(DOMAIN, STATE_OFF),
}
async def async_get_conditions(hass: HomeAssistant) -> dict[str, type[Condition]]:
"""Return the update conditions."""
return CONDITIONS

View File

@@ -1,17 +0,0 @@
.condition_common: &condition_common
target:
entity:
domain: update
fields:
behavior:
required: true
default: any
selector:
select:
translation_key: condition_behavior
options:
- all
- any
is_available: *condition_common
is_not_available: *condition_common

View File

@@ -1,12 +1,4 @@
{
"conditions": {
"is_available": {
"condition": "mdi:package-up"
},
"is_not_available": {
"condition": "mdi:package"
}
},
"entity_component": {
"_": {
"default": "mdi:package-up",

View File

@@ -1,28 +1,7 @@
{
"common": {
"condition_behavior_name": "Condition passes if",
"trigger_behavior_name": "Trigger when"
},
"conditions": {
"is_available": {
"description": "Tests if one or more updates are available.",
"fields": {
"behavior": {
"name": "[%key:component::update::common::condition_behavior_name%]"
}
},
"name": "Update is available"
},
"is_not_available": {
"description": "Tests if one or more updates are not available.",
"fields": {
"behavior": {
"name": "[%key:component::update::common::condition_behavior_name%]"
}
},
"name": "Update is not available"
}
},
"device_automation": {
"extra_fields": {
"for": "[%key:common::device_automation::extra_fields::for%]"
@@ -80,12 +59,6 @@
}
},
"selector": {
"condition_behavior": {
"options": {
"all": "All",
"any": "Any"
}
},
"trigger_behavior": {
"options": {
"any": "Any",

View File

@@ -4,6 +4,7 @@ from __future__ import annotations
import asyncio
from collections.abc import Callable, Coroutine, Sequence
import dataclasses
from datetime import datetime, timedelta
import logging
import os
@@ -163,6 +164,7 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
await usb_discovery.async_setup()
hass.data[_USB_DATA] = usb_discovery
websocket_api.async_register_command(hass, websocket_usb_scan)
websocket_api.async_register_command(hass, websocket_usb_list_serial_ports)
return True
@@ -477,3 +479,23 @@ async def websocket_usb_scan(
"""Scan for new usb devices."""
await async_request_scan(hass)
connection.send_result(msg["id"])
@websocket_api.require_admin
@websocket_api.websocket_command({vol.Required("type"): "usb/list_serial_ports"})
@websocket_api.async_response
async def websocket_usb_list_serial_ports(
hass: HomeAssistant,
connection: ActiveConnection,
msg: dict[str, Any],
) -> None:
"""List available serial ports."""
try:
ports = await async_scan_serial_ports(hass)
except OSError as err:
connection.send_error(msg["id"], websocket_api.ERR_UNKNOWN_ERROR, str(err))
return
connection.send_result(
msg["id"],
[dataclasses.asdict(port) for port in ports],
)

View File

@@ -315,7 +315,7 @@ class BaseZhaFlow(ConfigEntryBaseFlow):
return await self.async_step_verify_radio()
# Preselect the currently configured port
# Pre-select the currently configured port
default_port: vol.Undefined | str = vol.UNDEFINED
if self._radio_mgr.device_path is not None:
@@ -345,7 +345,7 @@ class BaseZhaFlow(ConfigEntryBaseFlow):
)
return await self.async_step_manual_port_config()
# Preselect the current radio type
# Pre-select the current radio type
default: vol.Undefined | str = vol.UNDEFINED
if self._radio_mgr.radio_type is not None:

View File

@@ -1771,6 +1771,28 @@ class SelectSelector(Selector[SelectSelectorConfig]):
return [parent_schema(vol.Schema(str)(val)) for val in data]
class SerialSelectorConfig(BaseSelectorConfig):
"""Class to represent a serial selector config."""
@SELECTORS.register("serial")
class SerialSelector(Selector[SerialSelectorConfig]):
"""Selector for a serial port."""
selector_type = "serial"
CONFIG_SCHEMA = make_selector_config_schema()
def __init__(self, config: SerialSelectorConfig | None = None) -> None:
"""Instantiate a selector."""
super().__init__(config)
def __call__(self, data: Any) -> str:
"""Validate the passed selection."""
serial: str = vol.Schema(str)(data)
return serial
class StateSelectorConfig(BaseSelectorConfig, total=False):
"""Class to represent an state selector config."""

View File

@@ -47,7 +47,7 @@ Jinja2==3.1.6
lru-dict==1.3.0
mutagen==1.47.0
openai==2.21.0
orjson==3.11.8
orjson==3.11.7
packaging>=23.1
paho-mqtt==2.1.0
Pillow==12.2.0
@@ -64,7 +64,7 @@ PyTurboJPEG==1.8.0
PyYAML==6.0.3
requests==2.33.1
securetar==2026.4.1
SQLAlchemy==2.0.49
SQLAlchemy==2.0.41
standard-aifc==3.13.0
standard-telnetlib==3.13.0
typing-extensions>=4.15.0,<5.0

View File

@@ -61,14 +61,14 @@ dependencies = [
"Pillow==12.2.0",
"propcache==0.4.1",
"pyOpenSSL==26.0.0",
"orjson==3.11.8",
"orjson==3.11.7",
"packaging>=23.1",
"psutil-home-assistant==0.0.1",
"python-slugify==8.0.4",
"PyYAML==6.0.3",
"requests==2.33.1",
"securetar==2026.4.1",
"SQLAlchemy==2.0.49",
"SQLAlchemy==2.0.41",
"standard-aifc==3.13.0",
"standard-telnetlib==3.13.0",
"typing-extensions>=4.15.0,<5.0",

4
requirements.txt generated
View File

@@ -34,7 +34,7 @@ infrared-protocols==1.1.0
Jinja2==3.1.6
lru-dict==1.3.0
mutagen==1.47.0
orjson==3.11.8
orjson==3.11.7
packaging>=23.1
Pillow==12.2.0
propcache==0.4.1
@@ -48,7 +48,7 @@ PyTurboJPEG==1.8.0
PyYAML==6.0.3
requests==2.33.1
securetar==2026.4.1
SQLAlchemy==2.0.49
SQLAlchemy==2.0.41
standard-aifc==3.13.0
standard-telnetlib==3.13.0
typing-extensions>=4.15.0,<5.0

2
requirements_all.txt generated
View File

@@ -115,7 +115,7 @@ RtmAPI==0.7.2
# homeassistant.components.recorder
# homeassistant.components.sql
SQLAlchemy==2.0.49
SQLAlchemy==2.0.41
# homeassistant.components.tami4
Tami4EdgeAPI==3.0

View File

@@ -8,7 +8,7 @@
-c homeassistant/package_constraints.txt
-r requirements_test_pre_commit.txt
astroid==4.0.4
coverage==7.13.5
coverage==7.10.6
freezegun==1.5.5
# librt is an internal mypy dependency
librt==0.8.1
@@ -22,7 +22,7 @@ pylint-per-file-ignores==3.2.1
pipdeptree==2.26.1
pytest-asyncio==1.3.0
pytest-aiohttp==1.1.0
pytest-cov==7.1.0
pytest-cov==7.0.0
pytest-freezer==0.4.9
pytest-github-actions-annotate-failures==0.3.0
pytest-socket==0.7.0

View File

@@ -112,7 +112,7 @@ RtmAPI==0.7.2
# homeassistant.components.recorder
# homeassistant.components.sql
SQLAlchemy==2.0.49
SQLAlchemy==2.0.41
# homeassistant.components.tami4
Tami4EdgeAPI==3.0

View File

@@ -1,6 +1,6 @@
# Automatically generated from .pre-commit-config.yaml by gen_requirements_all.py, do not edit
codespell==2.4.2
codespell==2.4.1
ruff==0.15.1
yamllint==1.38.0
zizmor==1.23.1

View File

@@ -252,12 +252,6 @@ FORBIDDEN_PACKAGE_FILES_EXCEPTIONS = {
"coinbase": {"homeassistant": {"coinbase-advanced-py"}},
# https://github.com/u9n/dlms-cosem
"dsmr": {"dsmr-parser": {"dlms-cosem"}},
# https://github.com/tkdrob/pyefergy
# pyefergy declares codecov as a runtime dependency, which pulls in
# coverage; coverage ships an 'a1_coverage.pth' file starting from
# 7.13.x. Upstream fix pending in
# https://github.com/tkdrob/pyefergy/pull/47
"efergy": {"codecov": {"coverage"}},
# https://github.com/ChrisMandich/PyFlume # Fixed with >=0.7.1
"fitbit": {
# Setuptools - distutils-precedence.pth

View File

@@ -9,12 +9,12 @@ from homeassistant.core import HomeAssistant
from tests.components.common import (
ConditionStateDescription,
assert_condition_behavior_all,
assert_condition_behavior_any,
assert_condition_gated_by_labs_flag,
create_target_condition,
parametrize_condition_states_all,
parametrize_condition_states_any,
parametrize_target_entities,
set_or_remove_state,
target_entities,
)
@@ -22,7 +22,17 @@ from tests.components.common import (
@pytest.fixture
async def target_fans(hass: HomeAssistant) -> dict[str, list[str]]:
"""Create multiple fan entities associated with different targets."""
return await target_entities(hass, "fan", domain_excluded="switch")
return await target_entities(hass, "fan")
@pytest.fixture
async def target_switches(hass: HomeAssistant) -> dict[str, list[str]]:
"""Create multiple switch entities associated with different targets.
Note: The switches are used to ensure that only fan entities are considered
in the condition evaluation and not other toggle entities.
"""
return await target_entities(hass, "switch")
@pytest.mark.parametrize(
@@ -51,19 +61,18 @@ async def test_fan_conditions_gated_by_labs_flag(
condition="fan.is_on",
target_states=[STATE_ON],
other_states=[STATE_OFF],
excluded_entities_from_other_domain=True,
),
*parametrize_condition_states_any(
condition="fan.is_off",
target_states=[STATE_OFF],
other_states=[STATE_ON],
excluded_entities_from_other_domain=True,
),
],
)
async def test_fan_state_condition_behavior_any(
hass: HomeAssistant,
target_fans: dict[str, list[str]],
target_switches: dict[str, list[str]],
condition_target_config: dict,
entity_id: str,
entities_in_target: int,
@@ -72,17 +81,39 @@ async def test_fan_state_condition_behavior_any(
states: list[ConditionStateDescription],
) -> None:
"""Test the fan state condition with the 'any' behavior."""
await assert_condition_behavior_any(
other_entity_ids = set(target_fans["included_entities"]) - {entity_id}
# Set all fans, including the tested fan, to the initial state
for eid in target_fans["included_entities"]:
set_or_remove_state(hass, eid, states[0]["included_state"])
await hass.async_block_till_done()
condition = await create_target_condition(
hass,
target_entities=target_fans,
condition_target_config=condition_target_config,
entity_id=entity_id,
entities_in_target=entities_in_target,
condition=condition,
condition_options=condition_options,
states=states,
target=condition_target_config,
behavior="any",
)
# Set state for switches to ensure that they don't impact the condition
for state in states:
for eid in target_switches["included_entities"]:
set_or_remove_state(hass, eid, state["included_state"])
await hass.async_block_till_done()
assert condition(hass) is False
for state in states:
included_state = state["included_state"]
set_or_remove_state(hass, entity_id, included_state)
await hass.async_block_till_done()
assert condition(hass) == state["condition_true"]
# Check if changing other fans also passes the condition
for other_entity_id in other_entity_ids:
set_or_remove_state(hass, other_entity_id, included_state)
await hass.async_block_till_done()
assert condition(hass) == state["condition_true"]
@pytest.mark.usefixtures("enable_labs_preview_features")
@pytest.mark.parametrize(
@@ -96,13 +127,11 @@ async def test_fan_state_condition_behavior_any(
condition="fan.is_on",
target_states=[STATE_ON],
other_states=[STATE_OFF],
excluded_entities_from_other_domain=True,
),
*parametrize_condition_states_all(
condition="fan.is_off",
target_states=[STATE_OFF],
other_states=[STATE_ON],
excluded_entities_from_other_domain=True,
),
],
)
@@ -117,13 +146,33 @@ async def test_fan_state_condition_behavior_all(
states: list[ConditionStateDescription],
) -> None:
"""Test the fan state condition with the 'all' behavior."""
await assert_condition_behavior_all(
# Set state for two switches to ensure that they don't impact the condition
hass.states.async_set("switch.label_switch_1", STATE_OFF)
hass.states.async_set("switch.label_switch_2", STATE_ON)
other_entity_ids = set(target_fans["included_entities"]) - {entity_id}
# Set all fans, including the tested fan, to the initial state
for eid in target_fans["included_entities"]:
set_or_remove_state(hass, eid, states[0]["included_state"])
await hass.async_block_till_done()
condition = await create_target_condition(
hass,
target_entities=target_fans,
condition_target_config=condition_target_config,
entity_id=entity_id,
entities_in_target=entities_in_target,
condition=condition,
condition_options=condition_options,
states=states,
target=condition_target_config,
behavior="all",
)
for state in states:
included_state = state["included_state"]
set_or_remove_state(hass, entity_id, included_state)
await hass.async_block_till_done()
assert condition(hass) == state["condition_true_first_entity"]
for other_entity_id in other_entity_ids:
set_or_remove_state(hass, other_entity_id, included_state)
await hass.async_block_till_done()
assert condition(hass) == state["condition_true"]

View File

@@ -13,9 +13,11 @@ from tests.components.common import (
assert_condition_behavior_all,
assert_condition_behavior_any,
assert_condition_gated_by_labs_flag,
create_target_condition,
parametrize_condition_states_all,
parametrize_condition_states_any,
parametrize_target_entities,
set_or_remove_state,
target_entities,
)
@@ -133,7 +135,17 @@ def parametrize_brightness_condition_states_all(
@pytest.fixture
async def target_lights(hass: HomeAssistant) -> dict[str, list[str]]:
"""Create multiple light entities associated with different targets."""
return await target_entities(hass, "light", domain_excluded="switch")
return await target_entities(hass, "light")
@pytest.fixture
async def target_switches(hass: HomeAssistant) -> dict[str, list[str]]:
"""Create multiple switch entities associated with different targets.
Note: The switches are used to ensure that only light entities are considered
in the condition evaluation and not other toggle entities.
"""
return await target_entities(hass, "switch")
@pytest.mark.parametrize(
@@ -163,19 +175,18 @@ async def test_light_conditions_gated_by_labs_flag(
condition="light.is_on",
target_states=[STATE_ON],
other_states=[STATE_OFF],
excluded_entities_from_other_domain=True,
),
*parametrize_condition_states_any(
condition="light.is_off",
target_states=[STATE_OFF],
other_states=[STATE_ON],
excluded_entities_from_other_domain=True,
),
],
)
async def test_light_state_condition_behavior_any(
hass: HomeAssistant,
target_lights: dict[str, list[str]],
target_switches: dict[str, list[str]],
condition_target_config: dict,
entity_id: str,
entities_in_target: int,
@@ -184,17 +195,39 @@ async def test_light_state_condition_behavior_any(
states: list[ConditionStateDescription],
) -> None:
"""Test the light state condition with the 'any' behavior."""
await assert_condition_behavior_any(
other_entity_ids = set(target_lights["included_entities"]) - {entity_id}
# Set all lights, including the tested light, to the initial state
for eid in target_lights["included_entities"]:
set_or_remove_state(hass, eid, states[0]["included_state"])
await hass.async_block_till_done()
condition = await create_target_condition(
hass,
target_entities=target_lights,
condition_target_config=condition_target_config,
entity_id=entity_id,
entities_in_target=entities_in_target,
condition=condition,
condition_options=condition_options,
states=states,
target=condition_target_config,
behavior="any",
)
# Set state for switches to ensure that they don't impact the condition
for state in states:
for eid in target_switches["included_entities"]:
set_or_remove_state(hass, eid, state["included_state"])
await hass.async_block_till_done()
assert condition(hass) is False
for state in states:
included_state = state["included_state"]
set_or_remove_state(hass, entity_id, included_state)
await hass.async_block_till_done()
assert condition(hass) == state["condition_true"]
# Check if changing other lights also passes the condition
for other_entity_id in other_entity_ids:
set_or_remove_state(hass, other_entity_id, included_state)
await hass.async_block_till_done()
assert condition(hass) == state["condition_true"]
@pytest.mark.usefixtures("enable_labs_preview_features")
@pytest.mark.parametrize(
@@ -208,13 +241,11 @@ async def test_light_state_condition_behavior_any(
condition="light.is_on",
target_states=[STATE_ON],
other_states=[STATE_OFF],
excluded_entities_from_other_domain=True,
),
*parametrize_condition_states_all(
condition="light.is_off",
target_states=[STATE_OFF],
other_states=[STATE_ON],
excluded_entities_from_other_domain=True,
),
],
)
@@ -229,17 +260,37 @@ async def test_light_state_condition_behavior_all(
states: list[ConditionStateDescription],
) -> None:
"""Test the light state condition with the 'all' behavior."""
await assert_condition_behavior_all(
# Set state for two switches to ensure that they don't impact the condition
hass.states.async_set("switch.label_switch_1", STATE_OFF)
hass.states.async_set("switch.label_switch_2", STATE_ON)
other_entity_ids = set(target_lights["included_entities"]) - {entity_id}
# Set all lights, including the tested light, to the initial state
for eid in target_lights["included_entities"]:
set_or_remove_state(hass, eid, states[0]["included_state"])
await hass.async_block_till_done()
condition = await create_target_condition(
hass,
target_entities=target_lights,
condition_target_config=condition_target_config,
entity_id=entity_id,
entities_in_target=entities_in_target,
condition=condition,
condition_options=condition_options,
states=states,
target=condition_target_config,
behavior="all",
)
for state in states:
included_state = state["included_state"]
set_or_remove_state(hass, entity_id, included_state)
await hass.async_block_till_done()
assert condition(hass) == state["condition_true_first_entity"]
for other_entity_id in other_entity_ids:
set_or_remove_state(hass, other_entity_id, included_state)
await hass.async_block_till_done()
assert condition(hass) == state["condition_true"]
@pytest.mark.usefixtures("enable_labs_preview_features")
@pytest.mark.parametrize(

View File

@@ -2830,7 +2830,7 @@ async def test_clean_up_registry_monitoring(
}
# Publish it config
# Since it is not enabled_by_default the sensor will not be loaded
# it should register a hook for monitoring the entity registry
# it should register a hook for monitoring the entiry registry
async_fire_mqtt_message(
hass,
"homeassistant/sensor/sbfspot_0/sbfspot_12345/config",

View File

@@ -5,6 +5,7 @@ from __future__ import annotations
from collections.abc import Generator
from typing import Any
from unittest.mock import AsyncMock, patch
import uuid
import pytest
@@ -25,8 +26,10 @@ from homeassistant.components.scrape.const import (
from homeassistant.config_entries import SOURCE_USER
from homeassistant.const import (
CONF_METHOD,
CONF_NAME,
CONF_RESOURCE,
CONF_TIMEOUT,
CONF_UNIQUE_ID,
CONF_VERIFY_SSL,
)
from homeassistant.core import HomeAssistant
@@ -46,9 +49,9 @@ def mock_setup_entry() -> Generator[AsyncMock]:
yield mock_setup_entry
@pytest.fixture(name="get_resource_config")
async def get_resource_config_to_integration_load() -> dict[str, Any]:
"""Return default minimal configuration for resource.
@pytest.fixture(name="get_config")
async def get_config_to_integration_load() -> dict[str, Any]:
"""Return default minimal configuration.
To override the config, tests can be marked with:
@pytest.mark.parametrize("get_config", [{...}])
@@ -56,33 +59,20 @@ async def get_resource_config_to_integration_load() -> dict[str, Any]:
return {
CONF_RESOURCE: "https://www.home-assistant.io",
CONF_METHOD: DEFAULT_METHOD,
"auth": {},
"advanced": {
CONF_VERIFY_SSL: DEFAULT_VERIFY_SSL,
CONF_TIMEOUT: DEFAULT_TIMEOUT,
CONF_ENCODING: DEFAULT_ENCODING,
},
CONF_VERIFY_SSL: DEFAULT_VERIFY_SSL,
CONF_TIMEOUT: DEFAULT_TIMEOUT,
CONF_ENCODING: DEFAULT_ENCODING,
"sensor": [
{
CONF_NAME: "Current version",
CONF_SELECT: ".current-version h1",
CONF_INDEX: 0,
CONF_UNIQUE_ID: "3699ef88-69e6-11ed-a1eb-0242ac120002",
}
],
}
@pytest.fixture(name="get_sensor_config")
async def get_sensor_config_to_integration_load() -> tuple[dict[str, Any], ...]:
"""Return default minimal configuration for sensor.
To override the config, tests can be marked with:
@pytest.mark.parametrize("get_config", [{...}])
"""
return (
{
"data": {"advanced": {}, CONF_INDEX: 0, CONF_SELECT: ".current-version h1"},
"subentry_id": "01JZN07D8D23994A49YKS649S7",
"subentry_type": "entity",
"title": "Current version",
"unique_id": None,
},
)
@pytest.fixture(name="get_data")
async def get_data_to_integration_load() -> MockRestData:
"""Return RestData.
@@ -95,19 +85,14 @@ async def get_data_to_integration_load() -> MockRestData:
@pytest.fixture(name="loaded_entry")
async def load_integration(
hass: HomeAssistant,
get_resource_config: dict[str, Any],
get_sensor_config: tuple[dict[str, Any], ...],
get_data: MockRestData,
hass: HomeAssistant, get_config: dict[str, Any], get_data: MockRestData
) -> MockConfigEntry:
"""Set up the Scrape integration in Home Assistant."""
config_entry = MockConfigEntry(
domain=DOMAIN,
source=SOURCE_USER,
options=get_resource_config,
entry_id="01JZN04ZJ9BQXXGXDS05WS7D6P",
subentries_data=get_sensor_config,
version=2,
options=get_config,
entry_id="1",
)
config_entry.add_to_hass(hass)
@@ -120,3 +105,13 @@ async def load_integration(
await hass.async_block_till_done()
return config_entry
@pytest.fixture(autouse=True)
def uuid_fixture() -> str:
"""Automatically path uuid generator."""
with patch(
"homeassistant.components.scrape.config_flow.uuid.uuid1",
return_value=uuid.UUID("3699ef88-69e6-11ed-a1eb-0242ac120002"),
):
yield

View File

@@ -1,153 +0,0 @@
# serializer version: 1
# name: test_migrate_from_version_1_to_2[device_registry]
DeviceRegistryEntrySnapshot({
'area_id': None,
'config_entries': <ANY>,
'config_entries_subentries': <ANY>,
'configuration_url': None,
'connections': set({
}),
'disabled_by': None,
'entry_type': <DeviceEntryType.SERVICE: 'service'>,
'hw_version': None,
'id': <ANY>,
'identifiers': set({
tuple(
'scrape',
'01JZQ1G63X2DX66GZ9ZTFY9PEH',
),
}),
'labels': set({
}),
'manufacturer': 'Scrape',
'model': None,
'model_id': None,
'name': 'Current version',
'name_by_user': None,
'primary_config_entry': <ANY>,
'serial_number': None,
'sw_version': None,
'via_device_id': None,
})
# ---
# name: test_migrate_from_version_1_to_2[entity_registry]
EntityRegistryEntrySnapshot({
'aliases': list([
None,
]),
'area_id': None,
'capabilities': None,
'config_entry_id': <ANY>,
'config_subentry_id': <ANY>,
'device_class': None,
'device_id': <ANY>,
'disabled_by': None,
'domain': 'sensor',
'entity_category': None,
'entity_id': 'sensor.current_version',
'has_entity_name': True,
'hidden_by': None,
'icon': None,
'id': <ANY>,
'labels': set({
}),
'name': None,
'object_id_base': None,
'options': dict({
}),
'original_device_class': None,
'original_icon': None,
'original_name': None,
'platform': 'scrape',
'previous_unique_id': 'a0bde946-5c96-11f0-b55f-0242ac110002',
'suggested_object_id': None,
'supported_features': 0,
'translation_key': None,
'unique_id': '01JZQ1G63X2DX66GZ9ZTFY9PEH',
'unit_of_measurement': None,
})
# ---
# name: test_migrate_from_version_1_to_2[post_migration_config_entry]
ConfigEntrySnapshot({
'data': dict({
}),
'disabled_by': None,
'discovery_keys': dict({
}),
'domain': 'scrape',
'entry_id': <ANY>,
'minor_version': 1,
'options': dict({
'advanced': dict({
'encoding': 'UTF-8',
'timeout': 10.0,
'verify_ssl': True,
}),
'auth': dict({
'password': 'pass',
'username': 'user',
}),
'method': 'GET',
'resource': 'http://www.home-assistant.io',
}),
'pref_disable_new_entities': False,
'pref_disable_polling': False,
'source': 'user',
'subentries': list([
dict({
'data': dict({
'advanced': dict({
'value_template': '{{ value }}',
}),
'index': 0,
'select': '.release-date',
}),
'subentry_id': '01JZQ1G63X2DX66GZ9ZTFY9PEH',
'subentry_type': 'entity',
'title': 'Current version',
'unique_id': None,
}),
]),
'title': 'Mock Title',
'unique_id': None,
'version': 2,
})
# ---
# name: test_migrate_from_version_1_to_2[pre_migration_config_entry]
ConfigEntrySnapshot({
'data': dict({
}),
'disabled_by': None,
'discovery_keys': dict({
}),
'domain': 'scrape',
'entry_id': <ANY>,
'minor_version': 1,
'options': dict({
'encoding': 'UTF-8',
'method': 'GET',
'password': 'pass',
'resource': 'http://www.home-assistant.io',
'sensor': list([
dict({
'index': 0,
'name': 'Current version',
'select': '.release-date',
'unique_id': 'a0bde946-5c96-11f0-b55f-0242ac110002',
'value_template': '{{ value }}',
}),
]),
'timeout': 10.0,
'username': 'user',
'verify_ssl': True,
}),
'pref_disable_new_entities': False,
'pref_disable_polling': False,
'source': 'user',
'subentries': list([
]),
'title': 'Mock Title',
'unique_id': None,
'version': 1,
})
# ---

View File

@@ -3,6 +3,7 @@
from __future__ import annotations
from unittest.mock import AsyncMock, patch
import uuid
from homeassistant import config_entries
from homeassistant.components.rest.data import ( # pylint: disable=hass-component-root-import
@@ -13,21 +14,25 @@ from homeassistant.components.rest.schema import ( # pylint: disable=hass-compo
)
from homeassistant.components.scrape import DOMAIN
from homeassistant.components.scrape.const import (
CONF_ADVANCED,
CONF_AUTH,
CONF_ENCODING,
CONF_INDEX,
CONF_SELECT,
DEFAULT_ENCODING,
DEFAULT_VERIFY_SSL,
)
from homeassistant.components.sensor import CONF_STATE_CLASS
from homeassistant.const import (
CONF_DEVICE_CLASS,
CONF_METHOD,
CONF_NAME,
CONF_PASSWORD,
CONF_PAYLOAD,
CONF_RESOURCE,
CONF_TIMEOUT,
CONF_UNIQUE_ID,
CONF_UNIT_OF_MEASUREMENT,
CONF_USERNAME,
CONF_VALUE_TEMPLATE,
CONF_VERIFY_SSL,
)
from homeassistant.core import HomeAssistant
@@ -39,7 +44,7 @@ from . import MockRestData
from tests.common import MockConfigEntry
async def test_entry_and_subentry(
async def test_form(
hass: HomeAssistant, get_data: MockRestData, mock_setup_entry: AsyncMock
) -> None:
"""Test we get the form."""
@@ -54,55 +59,47 @@ async def test_entry_and_subentry(
"homeassistant.components.rest.RestData",
return_value=get_data,
) as mock_data:
result = await hass.config_entries.flow.async_configure(
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_RESOURCE: "https://www.home-assistant.io",
CONF_METHOD: "GET",
CONF_AUTH: {},
CONF_ADVANCED: {
CONF_VERIFY_SSL: True,
CONF_TIMEOUT: 10.0,
},
CONF_VERIFY_SSL: True,
CONF_TIMEOUT: 10.0,
},
)
await hass.async_block_till_done()
result3 = await hass.config_entries.flow.async_configure(
result2["flow_id"],
{
CONF_NAME: "Current version",
CONF_SELECT: ".current-version h1",
CONF_INDEX: 0.0,
},
)
await hass.async_block_till_done()
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["version"] == 2
assert result["options"] == {
assert result3["type"] is FlowResultType.CREATE_ENTRY
assert result3["version"] == 1
assert result3["options"] == {
CONF_RESOURCE: "https://www.home-assistant.io",
CONF_METHOD: "GET",
CONF_AUTH: {},
CONF_ADVANCED: {
CONF_VERIFY_SSL: True,
CONF_TIMEOUT: 10.0,
CONF_ENCODING: "UTF-8",
},
CONF_VERIFY_SSL: True,
CONF_TIMEOUT: 10.0,
CONF_ENCODING: "UTF-8",
"sensor": [
{
CONF_NAME: "Current version",
CONF_SELECT: ".current-version h1",
CONF_INDEX: 0.0,
CONF_UNIQUE_ID: "3699ef88-69e6-11ed-a1eb-0242ac120002",
}
],
}
assert len(mock_data.mock_calls) == 1
assert len(mock_setup_entry.mock_calls) == 1
entry_id = result["result"].entry_id
result = await hass.config_entries.subentries.async_init(
(entry_id, "entity"), context={"source": config_entries.SOURCE_USER}
)
assert result["step_id"] == "user"
assert result["type"] is FlowResultType.FORM
result = await hass.config_entries.subentries.async_configure(
result["flow_id"],
{CONF_INDEX: 0, CONF_SELECT: ".current-version h1", CONF_ADVANCED: {}},
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["data"] == {
CONF_INDEX: 0,
CONF_SELECT: ".current-version h1",
CONF_ADVANCED: {},
}
async def test_form_with_post(
hass: HomeAssistant, get_data: MockRestData, mock_setup_entry: AsyncMock
@@ -119,32 +116,44 @@ async def test_form_with_post(
"homeassistant.components.rest.RestData",
return_value=get_data,
) as mock_data:
result = await hass.config_entries.flow.async_configure(
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_RESOURCE: "https://www.home-assistant.io",
CONF_METHOD: "GET",
CONF_PAYLOAD: "POST",
CONF_AUTH: {},
CONF_ADVANCED: {
CONF_VERIFY_SSL: True,
CONF_TIMEOUT: 10.0,
},
CONF_VERIFY_SSL: True,
CONF_TIMEOUT: 10.0,
},
)
await hass.async_block_till_done()
result3 = await hass.config_entries.flow.async_configure(
result2["flow_id"],
{
CONF_NAME: "Current version",
CONF_SELECT: ".current-version h1",
CONF_INDEX: 0.0,
},
)
await hass.async_block_till_done()
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["version"] == 2
assert result["options"] == {
assert result3["type"] is FlowResultType.CREATE_ENTRY
assert result3["version"] == 1
assert result3["options"] == {
CONF_RESOURCE: "https://www.home-assistant.io",
CONF_METHOD: "GET",
CONF_PAYLOAD: "POST",
CONF_AUTH: {},
CONF_ADVANCED: {
CONF_VERIFY_SSL: True,
CONF_TIMEOUT: 10.0,
CONF_ENCODING: "UTF-8",
},
CONF_VERIFY_SSL: True,
CONF_TIMEOUT: 10.0,
CONF_ENCODING: "UTF-8",
"sensor": [
{
CONF_NAME: "Current version",
CONF_SELECT: ".current-version h1",
CONF_INDEX: 0.0,
CONF_UNIQUE_ID: "3699ef88-69e6-11ed-a1eb-0242ac120002",
}
],
}
assert len(mock_data.mock_calls) == 1
@@ -167,68 +176,74 @@ async def test_flow_fails(
"homeassistant.components.rest.RestData",
side_effect=HomeAssistantError,
):
result = await hass.config_entries.flow.async_configure(
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"],
user_input={
CONF_RESOURCE: "https://www.home-assistant.io",
CONF_METHOD: "GET",
CONF_AUTH: {},
CONF_ADVANCED: {
CONF_VERIFY_SSL: True,
CONF_TIMEOUT: 10.0,
},
CONF_VERIFY_SSL: True,
CONF_TIMEOUT: 10.0,
},
)
assert result["errors"] == {"base": "resource_error"}
assert result2["errors"] == {"base": "resource_error"}
with patch(
"homeassistant.components.rest.RestData",
return_value=MockRestData("test_scrape_sensor_no_data"),
):
result = await hass.config_entries.flow.async_configure(
result2 = await hass.config_entries.flow.async_configure(
result["flow_id"],
user_input={
CONF_RESOURCE: "https://www.home-assistant.io",
CONF_METHOD: "GET",
CONF_AUTH: {},
CONF_ADVANCED: {
CONF_VERIFY_SSL: True,
CONF_TIMEOUT: 10.0,
},
CONF_VERIFY_SSL: True,
CONF_TIMEOUT: 10.0,
},
)
assert result["errors"] == {"base": "no_data"}
assert result2["errors"] == {"base": "resource_error"}
with patch(
"homeassistant.components.rest.RestData",
return_value=get_data,
):
result = await hass.config_entries.flow.async_configure(
result3 = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
CONF_RESOURCE: "https://www.home-assistant.io",
CONF_METHOD: "GET",
CONF_AUTH: {},
CONF_ADVANCED: {
CONF_VERIFY_SSL: True,
CONF_TIMEOUT: 10.0,
},
CONF_VERIFY_SSL: True,
CONF_TIMEOUT: 10.0,
},
)
await hass.async_block_till_done()
result4 = await hass.config_entries.flow.async_configure(
result3["flow_id"],
{
CONF_NAME: "Current version",
CONF_SELECT: ".current-version h1",
CONF_INDEX: 0.0,
},
)
await hass.async_block_till_done()
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == "https://www.home-assistant.io"
assert result["options"] == {
assert result4["type"] is FlowResultType.CREATE_ENTRY
assert result4["title"] == "https://www.home-assistant.io"
assert result4["options"] == {
CONF_RESOURCE: "https://www.home-assistant.io",
CONF_METHOD: "GET",
CONF_AUTH: {},
CONF_ADVANCED: {
CONF_VERIFY_SSL: True,
CONF_TIMEOUT: 10.0,
CONF_ENCODING: "UTF-8",
},
CONF_VERIFY_SSL: True,
CONF_TIMEOUT: 10.0,
CONF_ENCODING: "UTF-8",
"sensor": [
{
CONF_NAME: "Current version",
CONF_SELECT: ".current-version h1",
CONF_INDEX: 0.0,
CONF_UNIQUE_ID: "3699ef88-69e6-11ed-a1eb-0242ac120002",
}
],
}
@@ -242,9 +257,17 @@ async def test_options_resource_flow(
result = await hass.config_entries.options.async_init(loaded_entry.entry_id)
assert result["type"] is FlowResultType.FORM
assert result["type"] is FlowResultType.MENU
assert result["step_id"] == "init"
result = await hass.config_entries.options.async_configure(
result["flow_id"],
{"next_step_id": "resource"},
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "resource"
mocker = MockRestData("test_scrape_sensor2")
with patch("homeassistant.components.rest.RestData", return_value=mocker):
result = await hass.config_entries.options.async_configure(
@@ -252,15 +275,11 @@ async def test_options_resource_flow(
user_input={
CONF_RESOURCE: "https://www.home-assistant.io",
CONF_METHOD: DEFAULT_METHOD,
CONF_AUTH: {
CONF_USERNAME: "secret_username",
CONF_PASSWORD: "secret_password",
},
CONF_ADVANCED: {
CONF_VERIFY_SSL: DEFAULT_VERIFY_SSL,
CONF_TIMEOUT: DEFAULT_TIMEOUT,
CONF_ENCODING: DEFAULT_ENCODING,
},
CONF_VERIFY_SSL: DEFAULT_VERIFY_SSL,
CONF_TIMEOUT: DEFAULT_TIMEOUT,
CONF_ENCODING: DEFAULT_ENCODING,
CONF_USERNAME: "secret_username",
CONF_PASSWORD: "secret_password",
},
)
await hass.async_block_till_done()
@@ -269,15 +288,19 @@ async def test_options_resource_flow(
assert result["data"] == {
CONF_RESOURCE: "https://www.home-assistant.io",
CONF_METHOD: "GET",
CONF_AUTH: {
CONF_USERNAME: "secret_username",
CONF_PASSWORD: "secret_password",
},
CONF_ADVANCED: {
CONF_VERIFY_SSL: True,
CONF_TIMEOUT: 10.0,
CONF_ENCODING: "UTF-8",
},
CONF_VERIFY_SSL: True,
CONF_TIMEOUT: 10.0,
CONF_ENCODING: "UTF-8",
CONF_USERNAME: "secret_username",
CONF_PASSWORD: "secret_password",
"sensor": [
{
CONF_NAME: "Current version",
CONF_SELECT: ".current-version h1",
CONF_INDEX: 0.0,
CONF_UNIQUE_ID: "3699ef88-69e6-11ed-a1eb-0242ac120002",
}
],
}
await hass.async_block_till_done()
@@ -288,3 +311,351 @@ async def test_options_resource_flow(
# Check the state of the entity has changed as expected
state = hass.states.get("sensor.current_version")
assert state.state == "Hidden Version: 2021.12.10"
async def test_options_add_remove_sensor_flow(
hass: HomeAssistant, loaded_entry: MockConfigEntry
) -> None:
"""Test options flow to add and remove a sensor."""
state = hass.states.get("sensor.current_version")
assert state.state == "Current Version: 2021.12.10"
result = await hass.config_entries.options.async_init(loaded_entry.entry_id)
assert result["type"] is FlowResultType.MENU
assert result["step_id"] == "init"
result = await hass.config_entries.options.async_configure(
result["flow_id"],
{"next_step_id": "add_sensor"},
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "add_sensor"
mocker = MockRestData("test_scrape_sensor2")
with (
patch("homeassistant.components.rest.RestData", return_value=mocker),
patch(
"homeassistant.components.scrape.config_flow.uuid.uuid1",
return_value=uuid.UUID("3699ef88-69e6-11ed-a1eb-0242ac120003"),
),
):
result = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input={
CONF_NAME: "Template",
CONF_SELECT: "template",
CONF_INDEX: 0.0,
},
)
await hass.async_block_till_done()
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["data"] == {
CONF_RESOURCE: "https://www.home-assistant.io",
CONF_METHOD: "GET",
CONF_VERIFY_SSL: True,
CONF_TIMEOUT: 10,
CONF_ENCODING: "UTF-8",
"sensor": [
{
CONF_NAME: "Current version",
CONF_SELECT: ".current-version h1",
CONF_INDEX: 0,
CONF_UNIQUE_ID: "3699ef88-69e6-11ed-a1eb-0242ac120002",
},
{
CONF_NAME: "Template",
CONF_SELECT: "template",
CONF_INDEX: 0,
CONF_UNIQUE_ID: "3699ef88-69e6-11ed-a1eb-0242ac120003",
},
],
}
await hass.async_block_till_done()
# Check the entity was updated, with the new entity
assert len(hass.states.async_all()) == 2
# Check the state of the entity has changed as expected
state = hass.states.get("sensor.current_version")
assert state.state == "Hidden Version: 2021.12.10"
state = hass.states.get("sensor.template")
assert state.state == "Trying to get"
# Now remove the original sensor
result = await hass.config_entries.options.async_init(loaded_entry.entry_id)
assert result["type"] is FlowResultType.MENU
assert result["step_id"] == "init"
result = await hass.config_entries.options.async_configure(
result["flow_id"],
{"next_step_id": "remove_sensor"},
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "remove_sensor"
mocker = MockRestData("test_scrape_sensor2")
with patch("homeassistant.components.rest.RestData", return_value=mocker):
result = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input={
CONF_INDEX: ["0"],
},
)
await hass.async_block_till_done()
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["data"] == {
CONF_RESOURCE: "https://www.home-assistant.io",
CONF_METHOD: "GET",
CONF_VERIFY_SSL: True,
CONF_TIMEOUT: 10,
CONF_ENCODING: "UTF-8",
"sensor": [
{
CONF_NAME: "Template",
CONF_SELECT: "template",
CONF_INDEX: 0,
CONF_UNIQUE_ID: "3699ef88-69e6-11ed-a1eb-0242ac120003",
},
],
}
await hass.async_block_till_done()
# Check the original entity was removed, with only the new entity left
assert len(hass.states.async_all()) == 1
# Check the state of the new entity
state = hass.states.get("sensor.template")
assert state.state == "Trying to get"
async def test_options_edit_sensor_flow(
hass: HomeAssistant, loaded_entry: MockConfigEntry
) -> None:
"""Test options flow to edit a sensor."""
state = hass.states.get("sensor.current_version")
assert state.state == "Current Version: 2021.12.10"
result = await hass.config_entries.options.async_init(loaded_entry.entry_id)
assert result["type"] is FlowResultType.MENU
assert result["step_id"] == "init"
result = await hass.config_entries.options.async_configure(
result["flow_id"],
{"next_step_id": "select_edit_sensor"},
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "select_edit_sensor"
result = await hass.config_entries.options.async_configure(
result["flow_id"],
{"index": "0"},
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "edit_sensor"
mocker = MockRestData("test_scrape_sensor2")
with patch("homeassistant.components.rest.RestData", return_value=mocker):
result = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input={
CONF_SELECT: "template",
CONF_INDEX: 0.0,
},
)
await hass.async_block_till_done()
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["data"] == {
CONF_RESOURCE: "https://www.home-assistant.io",
CONF_METHOD: "GET",
CONF_VERIFY_SSL: True,
CONF_TIMEOUT: 10,
CONF_ENCODING: "UTF-8",
"sensor": [
{
CONF_NAME: "Current version",
CONF_SELECT: "template",
CONF_INDEX: 0,
CONF_UNIQUE_ID: "3699ef88-69e6-11ed-a1eb-0242ac120002",
},
],
}
await hass.async_block_till_done()
# Check the entity was updated
assert len(hass.states.async_all()) == 1
# Check the state of the entity has changed as expected
state = hass.states.get("sensor.current_version")
assert state.state == "Trying to get"
async def test_sensor_options_add_device_class(
hass: HomeAssistant, mock_setup_entry: AsyncMock
) -> None:
"""Test options flow to edit a sensor."""
entry = MockConfigEntry(
domain=DOMAIN,
options={
CONF_RESOURCE: "https://www.home-assistant.io",
CONF_METHOD: DEFAULT_METHOD,
CONF_VERIFY_SSL: DEFAULT_VERIFY_SSL,
CONF_TIMEOUT: DEFAULT_TIMEOUT,
CONF_ENCODING: DEFAULT_ENCODING,
"sensor": [
{
CONF_NAME: "Current Temp",
CONF_SELECT: ".current-temp h3",
CONF_INDEX: 0,
CONF_VALUE_TEMPLATE: "{{ value.split(':')[1] }}",
CONF_UNIQUE_ID: "3699ef88-69e6-11ed-a1eb-0242ac120002",
}
],
},
entry_id="1",
)
entry.add_to_hass(hass)
result = await hass.config_entries.options.async_init(entry.entry_id)
assert result["type"] is FlowResultType.MENU
assert result["step_id"] == "init"
result = await hass.config_entries.options.async_configure(
result["flow_id"],
{"next_step_id": "select_edit_sensor"},
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "select_edit_sensor"
result = await hass.config_entries.options.async_configure(
result["flow_id"],
{"index": "0"},
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "edit_sensor"
result = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input={
CONF_SELECT: ".current-temp h3",
CONF_INDEX: 0.0,
CONF_VALUE_TEMPLATE: "{{ value.split(':')[1] }}",
CONF_DEVICE_CLASS: "temperature",
CONF_STATE_CLASS: "measurement",
CONF_UNIT_OF_MEASUREMENT: "°C",
},
)
await hass.async_block_till_done()
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["data"] == {
CONF_RESOURCE: "https://www.home-assistant.io",
CONF_METHOD: "GET",
CONF_VERIFY_SSL: True,
CONF_TIMEOUT: 10,
CONF_ENCODING: "UTF-8",
"sensor": [
{
CONF_NAME: "Current Temp",
CONF_SELECT: ".current-temp h3",
CONF_VALUE_TEMPLATE: "{{ value.split(':')[1] }}",
CONF_INDEX: 0,
CONF_DEVICE_CLASS: "temperature",
CONF_STATE_CLASS: "measurement",
CONF_UNIT_OF_MEASUREMENT: "°C",
CONF_UNIQUE_ID: "3699ef88-69e6-11ed-a1eb-0242ac120002",
},
],
}
async def test_sensor_options_remove_device_class(
hass: HomeAssistant, mock_setup_entry: AsyncMock
) -> None:
"""Test options flow to edit a sensor."""
entry = MockConfigEntry(
domain=DOMAIN,
options={
CONF_RESOURCE: "https://www.home-assistant.io",
CONF_METHOD: DEFAULT_METHOD,
CONF_VERIFY_SSL: DEFAULT_VERIFY_SSL,
CONF_TIMEOUT: DEFAULT_TIMEOUT,
CONF_ENCODING: DEFAULT_ENCODING,
"sensor": [
{
CONF_NAME: "Current Temp",
CONF_SELECT: ".current-temp h3",
CONF_INDEX: 0,
CONF_VALUE_TEMPLATE: "{{ value.split(':')[1] }}",
CONF_DEVICE_CLASS: "temperature",
CONF_STATE_CLASS: "measurement",
CONF_UNIT_OF_MEASUREMENT: "°C",
CONF_UNIQUE_ID: "3699ef88-69e6-11ed-a1eb-0242ac120002",
}
],
},
entry_id="1",
)
entry.add_to_hass(hass)
result = await hass.config_entries.options.async_init(entry.entry_id)
assert result["type"] is FlowResultType.MENU
assert result["step_id"] == "init"
result = await hass.config_entries.options.async_configure(
result["flow_id"],
{"next_step_id": "select_edit_sensor"},
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "select_edit_sensor"
result = await hass.config_entries.options.async_configure(
result["flow_id"],
{"index": "0"},
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "edit_sensor"
result = await hass.config_entries.options.async_configure(
result["flow_id"],
user_input={
CONF_SELECT: ".current-temp h3",
CONF_INDEX: 0.0,
CONF_VALUE_TEMPLATE: "{{ value.split(':')[1] }}",
},
)
await hass.async_block_till_done()
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["data"] == {
CONF_RESOURCE: "https://www.home-assistant.io",
CONF_METHOD: "GET",
CONF_VERIFY_SSL: True,
CONF_TIMEOUT: 10,
CONF_ENCODING: "UTF-8",
"sensor": [
{
CONF_NAME: "Current Temp",
CONF_SELECT: ".current-temp h3",
CONF_VALUE_TEMPLATE: "{{ value.split(':')[1] }}",
CONF_INDEX: 0,
CONF_UNIQUE_ID: "3699ef88-69e6-11ed-a1eb-0242ac120002",
},
],
}

View File

@@ -2,18 +2,14 @@
from __future__ import annotations
from dataclasses import dataclass
from http import HTTPStatus
from typing import Any
from unittest.mock import patch
from freezegun.api import FrozenDateTimeFactory
import pytest
from syrupy.assertion import SnapshotAssertion
from homeassistant.components.scrape.const import DEFAULT_SCAN_INTERVAL, DOMAIN
from homeassistant.components.sensor import DOMAIN as SENSOR_DOMAIN
from homeassistant.config_entries import SOURCE_USER, ConfigEntryState, ConfigSubentry
from homeassistant.config_entries import ConfigEntryState
from homeassistant.core import HomeAssistant
from homeassistant.helpers import device_registry as dr, entity_registry as er
from homeassistant.setup import async_setup_component
@@ -197,137 +193,3 @@ async def test_resource_template(
await hass.async_block_till_done(wait_background_tasks=True)
state = hass.states.get("sensor.template_sensor")
assert state.state == "Second"
async def test_migrate_from_future(
hass: HomeAssistant,
get_resource_config: dict[str, Any],
get_sensor_config: tuple[dict[str, Any], ...],
get_data: MockRestData,
) -> None:
"""Test migration from future version fails."""
config_entry = MockConfigEntry(
domain=DOMAIN,
source=SOURCE_USER,
options=get_resource_config,
entry_id="01JZN04ZJ9BQXXGXDS05WS7D6P",
subentries_data=get_sensor_config,
version=3,
)
config_entry.add_to_hass(hass)
with patch(
"homeassistant.components.rest.RestData",
return_value=get_data,
):
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
assert config_entry.state is ConfigEntryState.MIGRATION_ERROR
async def test_migrate_from_version_1_to_2(
hass: HomeAssistant,
get_data: MockRestData,
device_registry: dr.DeviceRegistry,
entity_registry: er.EntityRegistry,
snapshot: SnapshotAssertion,
) -> None:
"""Test migration from version 1.1 to 2.1 with config subentries."""
@dataclass(frozen=True, kw_only=True)
class MockConfigSubentry(ConfigSubentry):
"""Container for a configuration subentry."""
subentry_id: str = "01JZQ1G63X2DX66GZ9ZTFY9PEH"
config_entry = MockConfigEntry(
domain=DOMAIN,
source=SOURCE_USER,
options={
"encoding": "UTF-8",
"method": "GET",
"resource": "http://www.home-assistant.io",
"username": "user",
"password": "pass",
"sensor": [
{
"index": 0,
"name": "Current version",
"select": ".release-date",
"unique_id": "a0bde946-5c96-11f0-b55f-0242ac110002",
"value_template": "{{ value }}",
}
],
"timeout": 10.0,
"verify_ssl": True,
},
entry_id="01JZN04ZJ9BQXXGXDS05WS7D6P",
version=1,
)
config_entry.add_to_hass(hass)
device = device_registry.async_get_or_create(
config_entry_id=config_entry.entry_id,
entry_type=dr.DeviceEntryType.SERVICE,
identifiers={(DOMAIN, "a0bde946-5c96-11f0-b55f-0242ac110002")},
manufacturer="Scrape",
name="Current version",
)
entity_registry.async_get_or_create(
SENSOR_DOMAIN,
DOMAIN,
"a0bde946-5c96-11f0-b55f-0242ac110002",
config_entry=config_entry,
device_id=device.id,
original_name="Current version",
has_entity_name=True,
suggested_object_id="current_version",
)
assert hass.config_entries.async_get_entry(config_entry.entry_id) == snapshot(
name="pre_migration_config_entry"
)
with (
patch(
"homeassistant.components.rest.RestData",
return_value=get_data,
),
patch("homeassistant.components.scrape.ConfigSubentry", MockConfigSubentry),
):
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done(wait_background_tasks=True)
assert config_entry.state is ConfigEntryState.LOADED
assert hass.config_entries.async_get_entry(config_entry.entry_id) == snapshot(
name="post_migration_config_entry"
)
device = device_registry.async_get(device.id)
assert device == snapshot(name="device_registry")
entity = entity_registry.async_get("sensor.current_version")
assert entity == snapshot(name="entity_registry")
assert config_entry.subentries == {
"01JZQ1G63X2DX66GZ9ZTFY9PEH": MockConfigSubentry(
data={
"advanced": {"value_template": "{{ value }}"},
"index": 0,
"select": ".release-date",
},
subentry_id="01JZQ1G63X2DX66GZ9ZTFY9PEH",
subentry_type="entity",
title="Current version",
unique_id=None,
),
}
assert device.config_entries == {"01JZN04ZJ9BQXXGXDS05WS7D6P"}
assert device.config_entries_subentries == {
"01JZN04ZJ9BQXXGXDS05WS7D6P": {
"01JZQ1G63X2DX66GZ9ZTFY9PEH",
},
}
assert entity.config_entry_id == config_entry.entry_id
assert entity.config_subentry_id == "01JZQ1G63X2DX66GZ9ZTFY9PEH"
state = hass.states.get("sensor.current_version")
assert state.state == "January 17, 2022"

View File

@@ -18,6 +18,7 @@ from homeassistant.components.scrape.const import (
)
from homeassistant.components.sensor import (
CONF_STATE_CLASS,
DOMAIN as SENSOR_DOMAIN,
SensorDeviceClass,
SensorStateClass,
)
@@ -603,7 +604,7 @@ async def test_setup_config_entry(
entity = entity_registry.async_get("sensor.current_version")
assert entity.unique_id == "01JZN07D8D23994A49YKS649S7"
assert entity.unique_id == "3699ef88-69e6-11ed-a1eb-0242ac120002"
async def test_templates_with_yaml(hass: HomeAssistant) -> None:
@@ -687,38 +688,27 @@ async def test_templates_with_yaml(hass: HomeAssistant) -> None:
@pytest.mark.parametrize(
("get_resource_config", "get_sensor_config"),
"get_config",
[
(
{
CONF_RESOURCE: "https://www.home-assistant.io",
CONF_METHOD: "GET",
"auth": {},
"advanced": {
CONF_VERIFY_SSL: DEFAULT_VERIFY_SSL,
CONF_TIMEOUT: 10,
CONF_ENCODING: DEFAULT_ENCODING,
},
},
(
{
CONF_RESOURCE: "https://www.home-assistant.io",
CONF_METHOD: "GET",
CONF_VERIFY_SSL: DEFAULT_VERIFY_SSL,
CONF_TIMEOUT: 10,
CONF_ENCODING: DEFAULT_ENCODING,
SENSOR_DOMAIN: [
{
"data": {
CONF_SELECT: ".current-version h1",
CONF_INDEX: 0,
"advanced": {
CONF_VALUE_TEMPLATE: "{{ value.split(':')[1] }}",
CONF_AVAILABILITY: '{{ states("sensor.input1")=="on" }}',
CONF_ICON: 'mdi:o{{ "n" if states("sensor.input1")=="on" else "ff" }}',
CONF_PICTURE: 'o{{ "n" if states("sensor.input1")=="on" else "ff" }}.jpg',
},
},
# "subentry_id": "01JZN07D8D23994A49YKS649S7",
"subentry_type": "entity",
"title": "Current version",
"unique_id": None,
},
),
)
CONF_SELECT: ".current-version h1",
CONF_NAME: "Current version",
CONF_VALUE_TEMPLATE: "{{ value.split(':')[1] }}",
CONF_INDEX: 0,
CONF_UNIQUE_ID: "3699ef88-69e6-11ed-a1eb-0242ac120002",
CONF_AVAILABILITY: '{{ states("sensor.input1")=="on" }}',
CONF_ICON: 'mdi:o{{ "n" if states("sensor.input1")=="on" else "ff" }}',
CONF_PICTURE: 'o{{ "n" if states("sensor.input1")=="on" else "ff" }}.jpg',
}
],
}
],
)
async def test_availability(

View File

@@ -9,12 +9,12 @@ from homeassistant.core import HomeAssistant
from tests.components.common import (
ConditionStateDescription,
assert_condition_behavior_all,
assert_condition_behavior_any,
assert_condition_gated_by_labs_flag,
create_target_condition,
parametrize_condition_states_all,
parametrize_condition_states_any,
parametrize_target_entities,
set_or_remove_state,
target_entities,
)
@@ -22,7 +22,17 @@ from tests.components.common import (
@pytest.fixture
async def target_sirens(hass: HomeAssistant) -> dict[str, list[str]]:
"""Create multiple siren entities associated with different targets."""
return await target_entities(hass, "siren", domain_excluded="switch")
return await target_entities(hass, "siren")
@pytest.fixture
async def target_switches(hass: HomeAssistant) -> dict[str, list[str]]:
"""Create multiple switch entities associated with different targets.
Note: The switches are used to ensure that only siren entities are considered
in the condition evaluation and not other toggle entities.
"""
return await target_entities(hass, "switch")
@pytest.mark.parametrize(
@@ -51,19 +61,18 @@ async def test_siren_conditions_gated_by_labs_flag(
condition="siren.is_on",
target_states=[STATE_ON],
other_states=[STATE_OFF],
excluded_entities_from_other_domain=True,
),
*parametrize_condition_states_any(
condition="siren.is_off",
target_states=[STATE_OFF],
other_states=[STATE_ON],
excluded_entities_from_other_domain=True,
),
],
)
async def test_siren_state_condition_behavior_any(
hass: HomeAssistant,
target_sirens: dict[str, list[str]],
target_switches: dict[str, list[str]],
condition_target_config: dict,
entity_id: str,
entities_in_target: int,
@@ -72,17 +81,39 @@ async def test_siren_state_condition_behavior_any(
states: list[ConditionStateDescription],
) -> None:
"""Test the siren state condition with the 'any' behavior."""
await assert_condition_behavior_any(
other_entity_ids = set(target_sirens["included_entities"]) - {entity_id}
# Set all sirens, including the tested siren, to the initial state
for eid in target_sirens["included_entities"]:
set_or_remove_state(hass, eid, states[0]["included_state"])
await hass.async_block_till_done()
condition = await create_target_condition(
hass,
target_entities=target_sirens,
condition_target_config=condition_target_config,
entity_id=entity_id,
entities_in_target=entities_in_target,
condition=condition,
condition_options=condition_options,
states=states,
target=condition_target_config,
behavior="any",
)
# Set state for switches to ensure that they don't impact the condition
for state in states:
for eid in target_switches["included_entities"]:
set_or_remove_state(hass, eid, state["included_state"])
await hass.async_block_till_done()
assert condition(hass) is False
for state in states:
included_state = state["included_state"]
set_or_remove_state(hass, entity_id, included_state)
await hass.async_block_till_done()
assert condition(hass) == state["condition_true"]
# Check if changing other sirens also passes the condition
for other_entity_id in other_entity_ids:
set_or_remove_state(hass, other_entity_id, included_state)
await hass.async_block_till_done()
assert condition(hass) == state["condition_true"]
@pytest.mark.usefixtures("enable_labs_preview_features")
@pytest.mark.parametrize(
@@ -96,13 +127,11 @@ async def test_siren_state_condition_behavior_any(
condition="siren.is_on",
target_states=[STATE_ON],
other_states=[STATE_OFF],
excluded_entities_from_other_domain=True,
),
*parametrize_condition_states_all(
condition="siren.is_off",
target_states=[STATE_OFF],
other_states=[STATE_ON],
excluded_entities_from_other_domain=True,
),
],
)
@@ -117,13 +146,32 @@ async def test_siren_state_condition_behavior_all(
states: list[ConditionStateDescription],
) -> None:
"""Test the siren state condition with the 'all' behavior."""
await assert_condition_behavior_all(
# Set state for two switches to ensure that they don't impact the condition
hass.states.async_set("switch.label_switch_1", STATE_OFF)
hass.states.async_set("switch.label_switch_2", STATE_ON)
other_entity_ids = set(target_sirens["included_entities"]) - {entity_id}
# Set all sirens, including the tested siren, to the initial state
for eid in target_sirens["included_entities"]:
set_or_remove_state(hass, eid, states[0]["included_state"])
await hass.async_block_till_done()
condition = await create_target_condition(
hass,
target_entities=target_sirens,
condition_target_config=condition_target_config,
entity_id=entity_id,
entities_in_target=entities_in_target,
condition=condition,
condition_options=condition_options,
states=states,
target=condition_target_config,
behavior="all",
)
for state in states:
included_state = state["included_state"]
set_or_remove_state(hass, entity_id, included_state)
await hass.async_block_till_done()
assert condition(hass) == state["condition_true_first_entity"]
for other_entity_id in other_entity_ids:
set_or_remove_state(hass, other_entity_id, included_state)
await hass.async_block_till_done()
assert condition(hass) == state["condition_true"]

View File

@@ -16,14 +16,25 @@ from tests.components.common import (
parametrize_condition_states_all,
parametrize_condition_states_any,
parametrize_target_entities,
set_or_remove_state,
target_entities,
)
@pytest.fixture
async def target_lights(hass: HomeAssistant) -> dict[str, list[str]]:
"""Create multiple light entities associated with different targets.
Note: The lights are used to ensure that only switch entities are considered
in the condition evaluation and not other toggle entities.
"""
return await target_entities(hass, "light")
@pytest.fixture
async def target_switches(hass: HomeAssistant) -> dict[str, list[str]]:
"""Create multiple switch entities associated with different targets."""
return await target_entities(hass, "switch", domain_excluded="light")
return await target_entities(hass, "switch")
@pytest.fixture
@@ -58,18 +69,17 @@ async def test_switch_conditions_gated_by_labs_flag(
condition="switch.is_on",
target_states=[STATE_ON],
other_states=[STATE_OFF],
excluded_entities_from_other_domain=True,
),
*parametrize_condition_states_any(
condition="switch.is_off",
target_states=[STATE_OFF],
other_states=[STATE_ON],
excluded_entities_from_other_domain=True,
),
],
)
async def test_switch_state_condition_behavior_any(
hass: HomeAssistant,
target_lights: dict[str, list[str]],
target_switches: dict[str, list[str]],
condition_target_config: dict,
entity_id: str,
@@ -79,17 +89,39 @@ async def test_switch_state_condition_behavior_any(
states: list[ConditionStateDescription],
) -> None:
"""Test the switch state condition with the 'any' behavior."""
await assert_condition_behavior_any(
other_entity_ids = set(target_switches["included_entities"]) - {entity_id}
# Set all switches, including the tested switch, to the initial state
for eid in target_switches["included_entities"]:
set_or_remove_state(hass, eid, states[0]["included_state"])
await hass.async_block_till_done()
condition = await create_target_condition(
hass,
target_entities=target_switches,
condition_target_config=condition_target_config,
entity_id=entity_id,
entities_in_target=entities_in_target,
condition=condition,
condition_options=condition_options,
states=states,
target=condition_target_config,
behavior="any",
)
# Set state for lights to ensure that they don't impact the condition
for state in states:
for eid in target_lights["included_entities"]:
set_or_remove_state(hass, eid, state["included_state"])
await hass.async_block_till_done()
assert condition(hass) is False
for state in states:
included_state = state["included_state"]
set_or_remove_state(hass, entity_id, included_state)
await hass.async_block_till_done()
assert condition(hass) == state["condition_true"]
# Check if changing other lights also passes the condition
for other_entity_id in other_entity_ids:
set_or_remove_state(hass, other_entity_id, included_state)
await hass.async_block_till_done()
assert condition(hass) == state["condition_true"]
@pytest.mark.usefixtures("enable_labs_preview_features")
@pytest.mark.parametrize(
@@ -103,13 +135,11 @@ async def test_switch_state_condition_behavior_any(
condition="switch.is_on",
target_states=[STATE_ON],
other_states=[STATE_OFF],
excluded_entities_from_other_domain=True,
),
*parametrize_condition_states_all(
condition="switch.is_off",
target_states=[STATE_OFF],
other_states=[STATE_ON],
excluded_entities_from_other_domain=True,
),
],
)
@@ -124,17 +154,37 @@ async def test_switch_state_condition_behavior_all(
states: list[ConditionStateDescription],
) -> None:
"""Test the switch state condition with the 'all' behavior."""
await assert_condition_behavior_all(
# Set state for two switches to ensure that they don't impact the condition
hass.states.async_set("switch.label_switch_1", STATE_OFF)
hass.states.async_set("switch.label_switch_2", STATE_ON)
other_entity_ids = set(target_switches["included_entities"]) - {entity_id}
# Set all switches, including the tested switch, to the initial state
for eid in target_switches["included_entities"]:
set_or_remove_state(hass, eid, states[0]["included_state"])
await hass.async_block_till_done()
condition = await create_target_condition(
hass,
target_entities=target_switches,
condition_target_config=condition_target_config,
entity_id=entity_id,
entities_in_target=entities_in_target,
condition=condition,
condition_options=condition_options,
states=states,
target=condition_target_config,
behavior="all",
)
for state in states:
included_state = state["included_state"]
set_or_remove_state(hass, entity_id, included_state)
await hass.async_block_till_done()
assert condition(hass) == state["condition_true_first_entity"]
for other_entity_id in other_entity_ids:
set_or_remove_state(hass, other_entity_id, included_state)
await hass.async_block_till_done()
assert condition(hass) == state["condition_true"]
CONDITION_STATES = [
*parametrize_condition_states_any(

View File

@@ -1,129 +0,0 @@
"""Test update conditions."""
from typing import Any
import pytest
from homeassistant.const import STATE_OFF, STATE_ON
from homeassistant.core import HomeAssistant
from tests.components.common import (
ConditionStateDescription,
assert_condition_behavior_all,
assert_condition_behavior_any,
assert_condition_gated_by_labs_flag,
parametrize_condition_states_all,
parametrize_condition_states_any,
parametrize_target_entities,
target_entities,
)
@pytest.fixture
async def target_updates(hass: HomeAssistant) -> dict[str, list[str]]:
"""Create multiple update entities associated with different targets."""
return await target_entities(hass, "update", domain_excluded="switch")
@pytest.mark.parametrize(
"condition",
[
"update.is_available",
"update.is_not_available",
],
)
async def test_update_conditions_gated_by_labs_flag(
hass: HomeAssistant, caplog: pytest.LogCaptureFixture, condition: str
) -> None:
"""Test the update conditions are gated by the labs flag."""
await assert_condition_gated_by_labs_flag(hass, caplog, condition)
@pytest.mark.usefixtures("enable_labs_preview_features")
@pytest.mark.parametrize(
("condition_target_config", "entity_id", "entities_in_target"),
parametrize_target_entities("update"),
)
@pytest.mark.parametrize(
("condition", "condition_options", "states"),
[
*parametrize_condition_states_any(
condition="update.is_available",
target_states=[STATE_ON],
other_states=[STATE_OFF],
excluded_entities_from_other_domain=True,
),
*parametrize_condition_states_any(
condition="update.is_not_available",
target_states=[STATE_OFF],
other_states=[STATE_ON],
excluded_entities_from_other_domain=True,
),
],
)
async def test_update_state_condition_behavior_any(
hass: HomeAssistant,
target_updates: dict[str, list[str]],
condition_target_config: dict,
entity_id: str,
entities_in_target: int,
condition: str,
condition_options: dict[str, Any],
states: list[ConditionStateDescription],
) -> None:
"""Test the update state condition with the 'any' behavior."""
await assert_condition_behavior_any(
hass,
target_entities=target_updates,
condition_target_config=condition_target_config,
entity_id=entity_id,
entities_in_target=entities_in_target,
condition=condition,
condition_options=condition_options,
states=states,
)
@pytest.mark.usefixtures("enable_labs_preview_features")
@pytest.mark.parametrize(
("condition_target_config", "entity_id", "entities_in_target"),
parametrize_target_entities("update"),
)
@pytest.mark.parametrize(
("condition", "condition_options", "states"),
[
*parametrize_condition_states_all(
condition="update.is_available",
target_states=[STATE_ON],
other_states=[STATE_OFF],
excluded_entities_from_other_domain=True,
),
*parametrize_condition_states_all(
condition="update.is_not_available",
target_states=[STATE_OFF],
other_states=[STATE_ON],
excluded_entities_from_other_domain=True,
),
],
)
async def test_update_state_condition_behavior_all(
hass: HomeAssistant,
target_updates: dict[str, list[str]],
condition_target_config: dict,
entity_id: str,
entities_in_target: int,
condition: str,
condition_options: dict[str, Any],
states: list[ConditionStateDescription],
) -> None:
"""Test the update state condition with the 'all' behavior."""
await assert_condition_behavior_all(
hass,
target_entities=target_updates,
condition_target_config=condition_target_config,
entity_id=entity_id,
entities_in_target=entities_in_target,
condition=condition,
condition_options=condition_options,
states=states,
)

View File

@@ -1,12 +1,16 @@
"""Tests for the USB Discovery integration."""
from unittest.mock import patch
from unittest.mock import MagicMock, patch
from aiousbwatcher import InotifyNotAvailableError
import pytest
from homeassistant.components.usb import async_request_scan as usb_async_request_scan
from homeassistant.components.usb import (
DOMAIN,
async_request_scan as usb_async_request_scan,
)
from homeassistant.core import HomeAssistant
from homeassistant.setup import async_setup_component
@pytest.fixture(name="force_usb_polling_watcher")
@@ -24,6 +28,20 @@ def patch_scanned_serial_ports(**kwargs) -> None:
return patch("homeassistant.components.usb.utils.scan_serial_ports", **kwargs)
@pytest.fixture(name="setup_usb")
async def setup_usb_fixture(
hass: HomeAssistant, force_usb_polling_watcher: None
) -> MagicMock:
"""Set up USB integration and return the scanned serial ports mock."""
with (
patch("homeassistant.components.usb.async_get_usb", return_value=[]),
patch_scanned_serial_ports(return_value=[]) as mock_serial_ports,
):
assert await async_setup_component(hass, DOMAIN, {"usb": {}})
await hass.async_block_till_done()
yield mock_serial_ports
async def async_request_scan(hass: HomeAssistant) -> None:
"""Request a USB scan."""
return await usb_async_request_scan(hass)

View File

@@ -24,10 +24,12 @@ from homeassistant.util import dt as dt_util
from . import (
force_usb_polling_watcher, # noqa: F401
patch_scanned_serial_ports,
setup_usb_fixture, # noqa: F401
)
from tests.common import (
MockModule,
MockUser,
async_fire_time_changed,
mock_config_flow,
mock_integration,
@@ -1646,3 +1648,83 @@ async def test_removal_aborts_discovery_flows(
final_flows = hass.config_entries.flow.async_progress()
assert len(final_flows) == 1
assert final_flows[0]["handler"] == "test2"
async def test_list_serial_ports(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
setup_usb: MagicMock,
) -> None:
"""Test listing serial ports via websocket."""
setup_usb.return_value = [
USBDevice(
device="/dev/ttyUSB0",
vid="10C4",
pid="EA60",
serial_number="001234",
manufacturer="Silicon Labs",
description="CP2102 USB to UART",
),
SerialDevice(
device="/dev/ttyS0",
serial_number=None,
manufacturer=None,
description="ttyS0",
),
]
ws_client = await hass_ws_client(hass)
await ws_client.send_json({"id": 1, "type": "usb/list_serial_ports"})
response = await ws_client.receive_json()
assert response["success"]
result = response["result"]
assert len(result) == 2
assert result[0]["device"] == "/dev/ttyUSB0"
assert result[0]["vid"] == "10C4"
assert result[0]["pid"] == "EA60"
assert result[0]["serial_number"] == "001234"
assert result[0]["manufacturer"] == "Silicon Labs"
assert result[0]["description"] == "CP2102 USB to UART"
assert result[1]["device"] == "/dev/ttyS0"
assert result[1]["serial_number"] is None
assert result[1]["manufacturer"] is None
assert result[1]["description"] == "ttyS0"
assert "vid" not in result[1]
assert "pid" not in result[1]
async def test_list_serial_ports_require_admin(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
hass_admin_user: MockUser,
setup_usb: MagicMock,
) -> None:
"""Test that listing serial ports requires admin."""
hass_admin_user.groups = []
ws_client = await hass_ws_client(hass)
await ws_client.send_json({"id": 1, "type": "usb/list_serial_ports"})
response = await ws_client.receive_json()
assert not response["success"]
assert response["error"]["code"] == "unauthorized"
async def test_list_serial_ports_os_error(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
setup_usb: MagicMock,
) -> None:
"""Test listing serial ports handles OSError."""
setup_usb.side_effect = OSError("Permission denied")
ws_client = await hass_ws_client(hass)
await ws_client.send_json({"id": 1, "type": "usb/list_serial_ports"})
response = await ws_client.receive_json()
assert not response["success"]
assert response["error"]["code"] == "unknown_error"
assert "Permission denied" in response["error"]["message"]

View File

@@ -1122,6 +1122,22 @@ def test_state_selector_schema(schema, valid_selections, invalid_selections) ->
_test_selector("state", schema, valid_selections, invalid_selections)
@pytest.mark.parametrize(
("schema", "valid_selections", "invalid_selections"),
[
(None, ("/dev/ttyUSB0", "/dev/ttyACM1", "COM3"), (None, 1, True)),
({}, ("/dev/ttyUSB0",), (None,)),
],
)
def test_serial_selector_schema(
schema: dict | None,
valid_selections: tuple[Any, ...],
invalid_selections: tuple[Any, ...],
) -> None:
"""Test serial selector."""
_test_selector("serial", schema, valid_selections, invalid_selections)
@pytest.mark.parametrize(
("schema", "valid_selections", "invalid_selections"),
[