Merge branch 'dev' into zimi-sensor

This commit is contained in:
markhannon
2025-05-13 21:49:38 +10:00
committed by GitHub
41 changed files with 1842 additions and 1696 deletions

View File

@@ -39,11 +39,20 @@ async def async_setup_entry(
session = async_create_clientsession(
hass, timeout=ClientTimeout(connect=10, total=12 * 60 * 60)
)
container_client = ContainerClient(
account_url=f"https://{entry.data[CONF_ACCOUNT_NAME]}.blob.core.windows.net/",
container_name=entry.data[CONF_CONTAINER_NAME],
credential=entry.data[CONF_STORAGE_ACCOUNT_KEY],
transport=AioHttpTransport(session=session),
def create_container_client() -> ContainerClient:
"""Create a ContainerClient."""
return ContainerClient(
account_url=f"https://{entry.data[CONF_ACCOUNT_NAME]}.blob.core.windows.net/",
container_name=entry.data[CONF_CONTAINER_NAME],
credential=entry.data[CONF_STORAGE_ACCOUNT_KEY],
transport=AioHttpTransport(session=session),
)
# has a blocking call to open in cpython
container_client: ContainerClient = await hass.async_add_executor_job(
create_container_client
)
try:

View File

@@ -6,5 +6,5 @@
"integration_type": "service",
"iot_class": "local_push",
"quality_scale": "internal",
"requirements": ["debugpy==1.8.13"]
"requirements": ["debugpy==1.8.14"]
}

View File

@@ -14,7 +14,7 @@
],
"quality_scale": "internal",
"requirements": [
"aiodhcpwatcher==1.1.1",
"aiodhcpwatcher==1.2.0",
"aiodiscover==2.7.0",
"cached-ipaddress==0.10.0"
]

View File

@@ -23,6 +23,8 @@ from .const import MieleAppliance
from .coordinator import MieleConfigEntry
from .entity import MieleEntity
PARALLEL_UPDATES = 0
_LOGGER = logging.getLogger(__name__)

View File

@@ -17,6 +17,8 @@ from .const import DOMAIN, PROCESS_ACTION, MieleActions, MieleAppliance
from .coordinator import MieleConfigEntry
from .entity import MieleEntity
PARALLEL_UPDATES = 1
_LOGGER = logging.getLogger(__name__)

View File

@@ -26,6 +26,8 @@ from .const import DEVICE_TYPE_TAGS, DISABLED_TEMP_ENTITIES, DOMAIN, MieleApplia
from .coordinator import MieleConfigEntry, MieleDataUpdateCoordinator
from .entity import MieleEntity
PARALLEL_UPDATES = 1
_LOGGER = logging.getLogger(__name__)

View File

@@ -27,6 +27,8 @@ from .const import DOMAIN, POWER_OFF, POWER_ON, VENTILATION_STEP, MieleAppliance
from .coordinator import MieleConfigEntry, MieleDataUpdateCoordinator
from .entity import MieleEntity
PARALLEL_UPDATES = 1
_LOGGER = logging.getLogger(__name__)
SPEED_RANGE = (1, 4)

View File

@@ -23,6 +23,8 @@ from .const import AMBIENT_LIGHT, DOMAIN, LIGHT, LIGHT_OFF, LIGHT_ON, MieleAppli
from .coordinator import MieleConfigEntry
from .entity import MieleDevice, MieleEntity
PARALLEL_UPDATES = 1
_LOGGER = logging.getLogger(__name__)

View File

@@ -32,18 +32,23 @@ rules:
Handled by a setting in manifest.json as there is no account information in API
# Silver
action-exceptions: todo
action-exceptions:
status: done
comment: No custom actions are defined
config-entry-unloading: done
docs-configuration-parameters:
status: exempt
comment: No configuration parameters
docs-installation-parameters: todo
docs-installation-parameters:
status: exempt
comment: |
Integration uses account linking via Nabu casa so no installation parameters are needed.
entity-unavailable: done
integration-owner: done
log-when-unavailable: todo
parallel-updates:
status: exempt
comment: Handled by coordinator
log-when-unavailable:
status: done
comment: Handled by DataUpdateCoordinator
parallel-updates: done
reauthentication-flow: done
test-coverage: todo

View File

@@ -39,6 +39,8 @@ from .const import (
from .coordinator import MieleConfigEntry, MieleDataUpdateCoordinator
from .entity import MieleEntity
PARALLEL_UPDATES = 0
_LOGGER = logging.getLogger(__name__)
DISABLED_TEMPERATURE = -32768

View File

@@ -28,6 +28,8 @@ from .const import (
from .coordinator import MieleConfigEntry
from .entity import MieleEntity
PARALLEL_UPDATES = 1
_LOGGER = logging.getLogger(__name__)

View File

@@ -24,6 +24,8 @@ from .const import DOMAIN, PROCESS_ACTION, PROGRAM_ID, MieleActions, MieleApplia
from .coordinator import MieleConfigEntry
from .entity import MieleEntity
PARALLEL_UPDATES = 1
_LOGGER = logging.getLogger(__name__)
# The following const classes define program speeds and programs for the vacuum cleaner.

View File

@@ -21,26 +21,19 @@ from homeassistant.const import (
Platform,
)
from homeassistant.core import Event, HomeAssistant, callback
from homeassistant.exceptions import ConfigEntryAuthFailed, ConfigEntryNotReady
from homeassistant.exceptions import ConfigEntryAuthFailed
from homeassistant.helpers import device_registry as dr, entity_registry as er
from homeassistant.helpers.debounce import Debouncer
from .bridge import (
SamsungTVBridge,
async_get_device_info,
mac_from_device_info,
model_requires_encryption,
)
from .bridge import SamsungTVBridge, mac_from_device_info, model_requires_encryption
from .const import (
CONF_SESSION_ID,
CONF_SSDP_MAIN_TV_AGENT_LOCATION,
CONF_SSDP_RENDERING_CONTROL_LOCATION,
DOMAIN,
ENTRY_RELOAD_COOLDOWN,
LEGACY_PORT,
LOGGER,
METHOD_ENCRYPTED_WEBSOCKET,
METHOD_LEGACY,
UPNP_SVC_MAIN_TV_AGENT,
UPNP_SVC_RENDERING_CONTROL,
)
@@ -180,30 +173,10 @@ async def _async_create_bridge_with_updated_data(
"""Create a bridge object and update any missing data in the config entry."""
updated_data: dict[str, str | int] = {}
host: str = entry.data[CONF_HOST]
port: int | None = entry.data.get(CONF_PORT)
method: str | None = entry.data.get(CONF_METHOD)
method: str = entry.data[CONF_METHOD]
load_info_attempted = False
info: dict[str, Any] | None = None
if not port or not method:
LOGGER.debug("Attempting to get port or method for %s", host)
if method == METHOD_LEGACY:
port = LEGACY_PORT
else:
# When we imported from yaml we didn't setup the method
# because we didn't know it
_result, port, method, info = await async_get_device_info(hass, host)
load_info_attempted = True
if not port or not method:
raise ConfigEntryNotReady(
translation_domain=DOMAIN,
translation_key="failed_to_determine_connection_method",
)
LOGGER.debug("Updated port to %s and method to %s for %s", port, method, host)
updated_data[CONF_PORT] = port
updated_data[CONF_METHOD] = method
bridge = _async_get_device_bridge(hass, {**entry.data, **updated_data})
mac: str | None = entry.data.get(CONF_MAC)

View File

@@ -56,7 +56,6 @@ from .const import (
RESULT_INVALID_PIN,
RESULT_NOT_SUPPORTED,
RESULT_SUCCESS,
RESULT_UNKNOWN_HOST,
SUCCESSFUL_RESULTS,
UPNP_SVC_MAIN_TV_AGENT,
UPNP_SVC_RENDERING_CONTROL,
@@ -252,32 +251,40 @@ class SamsungTVConfigFlow(ConfigFlow, domain=DOMAIN):
self._mac = mac
return True
async def _async_set_name_host_from_input(self, user_input: dict[str, Any]) -> None:
async def _async_set_name_host_from_input(self, user_input: dict[str, Any]) -> bool:
try:
self._host = await self.hass.async_add_executor_job(
socket.gethostbyname, user_input[CONF_HOST]
)
except socket.gaierror as err:
raise AbortFlow(RESULT_UNKNOWN_HOST) from err
LOGGER.debug("Failed to get IP for %s: %s", user_input[CONF_HOST], err)
return False
self._title = self._host
return True
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle a flow initialized by the user."""
errors: dict[str, str] | None = None
if user_input is not None:
await self._async_set_name_host_from_input(user_input)
await self._async_create_bridge()
assert self._bridge
self._async_abort_entries_match({CONF_HOST: self._host})
if self._bridge.method != METHOD_LEGACY:
# Legacy bridge does not provide device info
await self._async_set_device_unique_id(raise_on_progress=False)
if self._bridge.method == METHOD_ENCRYPTED_WEBSOCKET:
return await self.async_step_encrypted_pairing()
return await self.async_step_pairing({})
if await self._async_set_name_host_from_input(user_input):
await self._async_create_bridge()
assert self._bridge
self._async_abort_entries_match({CONF_HOST: self._host})
if self._bridge.method != METHOD_LEGACY:
# Legacy bridge does not provide device info
await self._async_set_device_unique_id(raise_on_progress=False)
if self._bridge.method == METHOD_ENCRYPTED_WEBSOCKET:
return await self.async_step_encrypted_pairing()
return await self.async_step_pairing({})
errors = {"base": "invalid_host"}
return self.async_show_form(step_id="user", data_schema=DATA_SCHEMA)
return self.async_show_form(
step_id="user",
data_schema=self.add_suggested_values_to_schema(DATA_SCHEMA, user_input),
errors=errors,
)
async def async_step_pairing(
self, user_input: dict[str, Any] | None = None

View File

@@ -43,6 +43,7 @@
},
"error": {
"auth_missing": "[%key:component::samsungtv::config::abort::auth_missing%]",
"invalid_host": "Host is invalid, please try again.",
"invalid_pin": "PIN is invalid, please try again."
},
"abort": {
@@ -52,7 +53,6 @@
"id_missing": "This Samsung device doesn't have a SerialNumber.",
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
"not_supported": "This Samsung device is currently not supported.",
"unknown": "[%key:common::config_flow::error::unknown%]",
"reauth_successful": "[%key:common::config_flow::abort::reauth_successful%]"
}
},

View File

@@ -33,7 +33,11 @@ from homeassistant.const import (
from homeassistant.core import CALLBACK_TYPE, Event, HomeAssistant, callback
from homeassistant.helpers import device_registry as dr, issue_registry as ir
from homeassistant.helpers.debounce import Debouncer
from homeassistant.helpers.device_registry import CONNECTION_NETWORK_MAC, format_mac
from homeassistant.helpers.device_registry import (
CONNECTION_BLUETOOTH,
CONNECTION_NETWORK_MAC,
format_mac,
)
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .bluetooth import async_connect_scanner
@@ -160,6 +164,11 @@ class ShellyCoordinatorBase[_DeviceT: BlockDevice | RpcDevice](
"""Sleep period of the device."""
return self.config_entry.data.get(CONF_SLEEP_PERIOD, 0)
@property
def connections(self) -> set[tuple[str, str]]:
"""Connections of the device."""
return {(CONNECTION_NETWORK_MAC, self.mac)}
def async_setup(self, pending_platforms: list[Platform] | None = None) -> None:
"""Set up the coordinator."""
self._pending_platforms = pending_platforms
@@ -167,7 +176,7 @@ class ShellyCoordinatorBase[_DeviceT: BlockDevice | RpcDevice](
device_entry = dev_reg.async_get_or_create(
config_entry_id=self.config_entry.entry_id,
name=self.name,
connections={(CONNECTION_NETWORK_MAC, self.mac)},
connections=self.connections,
identifiers={(DOMAIN, self.mac)},
manufacturer="Shelly",
model=get_shelly_model_name(self.model, self.sleep_period, self.device),
@@ -523,6 +532,14 @@ class ShellyRpcCoordinator(ShellyCoordinatorBase[RpcDevice]):
"""
return format_mac(bluetooth_mac_from_primary_mac(self.mac)).upper()
@property
def connections(self) -> set[tuple[str, str]]:
"""Connections of the device."""
connections = super().connections
if not self.sleep_period:
connections.add((CONNECTION_BLUETOOTH, self.bluetooth_source))
return connections
async def async_device_online(self, source: str) -> None:
"""Handle device going online."""
if not self.sleep_period:

View File

@@ -0,0 +1,136 @@
"""Offer sun based automation rules."""
from __future__ import annotations
from datetime import datetime, timedelta
from typing import cast
import voluptuous as vol
from homeassistant.const import CONF_CONDITION, SUN_EVENT_SUNRISE, SUN_EVENT_SUNSET
from homeassistant.core import HomeAssistant
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.condition import (
ConditionCheckerType,
condition_trace_set_result,
condition_trace_update_result,
trace_condition_function,
)
from homeassistant.helpers.sun import get_astral_event_date
from homeassistant.helpers.typing import ConfigType, TemplateVarsType
from homeassistant.util import dt as dt_util
CONDITION_SCHEMA = vol.All(
vol.Schema(
{
**cv.CONDITION_BASE_SCHEMA,
vol.Required(CONF_CONDITION): "sun",
vol.Optional("before"): cv.sun_event,
vol.Optional("before_offset"): cv.time_period,
vol.Optional("after"): vol.All(
vol.Lower, vol.Any(SUN_EVENT_SUNSET, SUN_EVENT_SUNRISE)
),
vol.Optional("after_offset"): cv.time_period,
}
),
cv.has_at_least_one_key("before", "after"),
)
def sun(
hass: HomeAssistant,
before: str | None = None,
after: str | None = None,
before_offset: timedelta | None = None,
after_offset: timedelta | None = None,
) -> bool:
"""Test if current time matches sun requirements."""
utcnow = dt_util.utcnow()
today = dt_util.as_local(utcnow).date()
before_offset = before_offset or timedelta(0)
after_offset = after_offset or timedelta(0)
sunrise = get_astral_event_date(hass, SUN_EVENT_SUNRISE, today)
sunset = get_astral_event_date(hass, SUN_EVENT_SUNSET, today)
has_sunrise_condition = SUN_EVENT_SUNRISE in (before, after)
has_sunset_condition = SUN_EVENT_SUNSET in (before, after)
after_sunrise = today > dt_util.as_local(cast(datetime, sunrise)).date()
if after_sunrise and has_sunrise_condition:
tomorrow = today + timedelta(days=1)
sunrise = get_astral_event_date(hass, SUN_EVENT_SUNRISE, tomorrow)
after_sunset = today > dt_util.as_local(cast(datetime, sunset)).date()
if after_sunset and has_sunset_condition:
tomorrow = today + timedelta(days=1)
sunset = get_astral_event_date(hass, SUN_EVENT_SUNSET, tomorrow)
# Special case: before sunrise OR after sunset
# This will handle the very rare case in the polar region when the sun rises/sets
# but does not set/rise.
# However this entire condition does not handle those full days of darkness
# or light, the following should be used instead:
#
# condition:
# condition: state
# entity_id: sun.sun
# state: 'above_horizon' (or 'below_horizon')
#
if before == SUN_EVENT_SUNRISE and after == SUN_EVENT_SUNSET:
wanted_time_before = cast(datetime, sunrise) + before_offset
condition_trace_update_result(wanted_time_before=wanted_time_before)
wanted_time_after = cast(datetime, sunset) + after_offset
condition_trace_update_result(wanted_time_after=wanted_time_after)
return utcnow < wanted_time_before or utcnow > wanted_time_after
if sunrise is None and has_sunrise_condition:
# There is no sunrise today
condition_trace_set_result(False, message="no sunrise today")
return False
if sunset is None and has_sunset_condition:
# There is no sunset today
condition_trace_set_result(False, message="no sunset today")
return False
if before == SUN_EVENT_SUNRISE:
wanted_time_before = cast(datetime, sunrise) + before_offset
condition_trace_update_result(wanted_time_before=wanted_time_before)
if utcnow > wanted_time_before:
return False
if before == SUN_EVENT_SUNSET:
wanted_time_before = cast(datetime, sunset) + before_offset
condition_trace_update_result(wanted_time_before=wanted_time_before)
if utcnow > wanted_time_before:
return False
if after == SUN_EVENT_SUNRISE:
wanted_time_after = cast(datetime, sunrise) + after_offset
condition_trace_update_result(wanted_time_after=wanted_time_after)
if utcnow < wanted_time_after:
return False
if after == SUN_EVENT_SUNSET:
wanted_time_after = cast(datetime, sunset) + after_offset
condition_trace_update_result(wanted_time_after=wanted_time_after)
if utcnow < wanted_time_after:
return False
return True
def async_condition_from_config(config: ConfigType) -> ConditionCheckerType:
"""Wrap action method with sun based condition."""
before = config.get("before")
after = config.get("after")
before_offset = config.get("before_offset")
after_offset = config.get("after_offset")
@trace_condition_function
def sun_if(hass: HomeAssistant, variables: TemplateVarsType = None) -> bool:
"""Validate time based if-condition."""
return sun(hass, before, after, before_offset, after_offset)
return sun_if

View File

@@ -95,13 +95,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: TeslemetryConfigEntry) -
energysites: list[TeslemetryEnergyData] = []
# Create the stream
stream = TeslemetryStream(
session,
access_token,
server=f"{region.lower()}.teslemetry.com",
parse_timestamp=True,
manual=True,
)
stream: TeslemetryStream | None = None
for product in products:
if (
@@ -123,6 +117,16 @@ async def async_setup_entry(hass: HomeAssistant, entry: TeslemetryConfigEntry) -
serial_number=vin,
)
# Create stream if required
if not stream:
stream = TeslemetryStream(
session,
access_token,
server=f"{region.lower()}.teslemetry.com",
parse_timestamp=True,
manual=True,
)
remove_listener = stream.async_add_listener(
create_handle_vehicle_stream(vin, coordinator),
{"vin": vin},
@@ -240,7 +244,8 @@ async def async_setup_entry(hass: HomeAssistant, entry: TeslemetryConfigEntry) -
entry.runtime_data = TeslemetryData(vehicles, energysites, scopes)
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
entry.async_create_background_task(hass, stream.listen(), "Teslemetry Stream")
if stream:
entry.async_create_background_task(hass, stream.listen(), "Teslemetry Stream")
return True

View File

@@ -65,7 +65,7 @@ def setup_platform(
name = travel_time.get(CONF_NAME) or travel_time.get(CONF_ID)
sensors.append(
WashingtonStateTravelTimeSensor(
name, config.get(CONF_API_KEY), travel_time.get(CONF_ID)
name, config[CONF_API_KEY], travel_time.get(CONF_ID)
)
)
@@ -82,20 +82,20 @@ class WashingtonStateTransportSensor(SensorEntity):
_attr_icon = ICON
def __init__(self, name, access_code):
def __init__(self, name: str, access_code: str) -> None:
"""Initialize the sensor."""
self._data = {}
self._data: dict[str, str | int | None] = {}
self._access_code = access_code
self._name = name
self._state = None
self._state: int | None = None
@property
def name(self):
def name(self) -> str:
"""Return the name of the sensor."""
return self._name
@property
def native_value(self):
def native_value(self) -> int | None:
"""Return the state of the sensor."""
return self._state
@@ -106,7 +106,7 @@ class WashingtonStateTravelTimeSensor(WashingtonStateTransportSensor):
_attr_attribution = ATTRIBUTION
_attr_native_unit_of_measurement = UnitOfTime.MINUTES
def __init__(self, name, access_code, travel_time_id):
def __init__(self, name: str, access_code: str, travel_time_id: str) -> None:
"""Construct a travel time sensor."""
self._travel_time_id = travel_time_id
WashingtonStateTransportSensor.__init__(self, name, access_code)
@@ -123,13 +123,17 @@ class WashingtonStateTravelTimeSensor(WashingtonStateTransportSensor):
_LOGGER.warning("Invalid response from WSDOT API")
else:
self._data = response.json()
self._state = self._data.get(ATTR_CURRENT_TIME)
_state = self._data.get(ATTR_CURRENT_TIME)
if not isinstance(_state, int):
self._state = None
else:
self._state = _state
@property
def extra_state_attributes(self) -> dict[str, Any] | None:
"""Return other details about the sensor state."""
if self._data is not None:
attrs = {}
attrs: dict[str, str | int | None | datetime] = {}
for key in (
ATTR_AVG_TIME,
ATTR_NAME,
@@ -144,12 +148,15 @@ class WashingtonStateTravelTimeSensor(WashingtonStateTransportSensor):
return None
def _parse_wsdot_timestamp(timestamp):
def _parse_wsdot_timestamp(timestamp: Any) -> datetime | None:
"""Convert WSDOT timestamp to datetime."""
if not timestamp:
if not isinstance(timestamp, str):
return None
# ex: Date(1485040200000-0800)
milliseconds, tzone = re.search(r"Date\((\d+)([+-]\d\d)\d\d\)", timestamp).groups()
timestamp_parts = re.search(r"Date\((\d+)([+-]\d\d)\d\d\)", timestamp)
if timestamp_parts is None:
return None
milliseconds, tzone = timestamp_parts.groups()
return datetime.fromtimestamp(
int(milliseconds) / 1000, tz=timezone(timedelta(hours=int(tzone)))
)

View File

@@ -71,6 +71,7 @@ from homeassistant.components.websocket_api import (
ActiveConnection,
)
from homeassistant.config_entries import ConfigEntry, ConfigEntryState
from homeassistant.const import CONF_URL
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import config_validation as cv, device_registry as dr
from homeassistant.helpers.aiohttp_client import async_get_clientsession
@@ -88,13 +89,16 @@ from .const import (
DATA_CLIENT,
DOMAIN,
EVENT_DEVICE_ADDED_TO_REGISTRY,
LOGGER,
RESTORE_NVM_DRIVER_READY_TIMEOUT,
USER_AGENT,
)
from .helpers import (
CannotConnect,
async_enable_statistics,
async_get_node_from_device_id,
async_get_provisioning_entry_from_device_id,
async_get_version_info,
get_device_id,
)
@@ -2865,6 +2869,25 @@ async def websocket_hard_reset_controller(
async with asyncio.timeout(HARD_RESET_CONTROLLER_DRIVER_READY_TIMEOUT):
await wait_driver_ready.wait()
# When resetting the controller, the controller home id is also changed.
# The controller state in the client is stale after resetting the controller,
# so get the new home id with a new client using the helper function.
# The client state will be refreshed by reloading the config entry,
# after the unique id of the config entry has been updated.
try:
version_info = await async_get_version_info(hass, entry.data[CONF_URL])
except CannotConnect:
# Just log this error, as there's nothing to do about it here.
# The stale unique id needs to be handled by a repair flow,
# after the config entry has been reloaded.
LOGGER.error(
"Failed to get server version, cannot update config entry"
"unique id with new home id, after controller reset"
)
else:
hass.config_entries.async_update_entry(
entry, unique_id=str(version_info.home_id)
)
await hass.config_entries.async_reload(entry.entry_id)

View File

@@ -9,14 +9,13 @@ import logging
from pathlib import Path
from typing import Any
import aiohttp
from awesomeversion import AwesomeVersion
from serial.tools import list_ports
import voluptuous as vol
from zwave_js_server.client import Client
from zwave_js_server.exceptions import FailedCommand
from zwave_js_server.model.driver import Driver
from zwave_js_server.version import VersionInfo, get_server_version
from zwave_js_server.version import VersionInfo
from homeassistant.components import usb
from homeassistant.components.hassio import (
@@ -36,7 +35,6 @@ from homeassistant.const import CONF_NAME, CONF_URL
from homeassistant.core import HomeAssistant, callback
from homeassistant.data_entry_flow import AbortFlow
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.hassio import is_hassio
from homeassistant.helpers.service_info.hassio import HassioServiceInfo
from homeassistant.helpers.service_info.usb import UsbServiceInfo
@@ -69,6 +67,7 @@ from .const import (
DOMAIN,
RESTORE_NVM_DRIVER_READY_TIMEOUT,
)
from .helpers import CannotConnect, async_get_version_info
_LOGGER = logging.getLogger(__name__)
@@ -79,7 +78,6 @@ ADDON_SETUP_TIMEOUT = 5
ADDON_SETUP_TIMEOUT_ROUNDS = 40
CONF_EMULATE_HARDWARE = "emulate_hardware"
CONF_LOG_LEVEL = "log_level"
SERVER_VERSION_TIMEOUT = 10
ADDON_LOG_LEVELS = {
"error": "Error",
@@ -130,22 +128,6 @@ async def validate_input(hass: HomeAssistant, user_input: dict) -> VersionInfo:
raise InvalidInput("cannot_connect") from err
async def async_get_version_info(hass: HomeAssistant, ws_address: str) -> VersionInfo:
"""Return Z-Wave JS version info."""
try:
async with asyncio.timeout(SERVER_VERSION_TIMEOUT):
version_info: VersionInfo = await get_server_version(
ws_address, async_get_clientsession(hass)
)
except (TimeoutError, aiohttp.ClientError) as err:
# We don't want to spam the log if the add-on isn't started
# or takes a long time to start.
_LOGGER.debug("Failed to connect to Z-Wave JS server: %s", err)
raise CannotConnect from err
return version_info
def get_usb_ports() -> dict[str, str]:
"""Return a dict of USB ports and their friendly names."""
ports = list_ports.comports()
@@ -1357,10 +1339,6 @@ class ZWaveJSConfigFlow(ConfigFlow, domain=DOMAIN):
return client.driver
class CannotConnect(HomeAssistantError):
"""Indicate connection error."""
class InvalidInput(HomeAssistantError):
"""Error to indicate input data is invalid."""

View File

@@ -2,11 +2,13 @@
from __future__ import annotations
import asyncio
from collections.abc import Callable
from dataclasses import astuple, dataclass
import logging
from typing import Any, cast
import aiohttp
import voluptuous as vol
from zwave_js_server.client import Client as ZwaveClient
from zwave_js_server.const import (
@@ -25,6 +27,7 @@ from zwave_js_server.model.value import (
ValueDataType,
get_value_id_str,
)
from zwave_js_server.version import VersionInfo, get_server_version
from homeassistant.components.sensor import DOMAIN as SENSOR_DOMAIN
from homeassistant.config_entries import ConfigEntry, ConfigEntryState
@@ -38,6 +41,7 @@ from homeassistant.const import (
from homeassistant.core import HomeAssistant, callback
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import device_registry as dr, entity_registry as er
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.group import expand_entity_ids
from homeassistant.helpers.typing import ConfigType, VolSchemaType
@@ -54,6 +58,8 @@ from .const import (
LOGGER,
)
SERVER_VERSION_TIMEOUT = 10
@dataclass
class ZwaveValueID:
@@ -568,3 +574,23 @@ def get_network_identifier_for_notification(
return f"`{config_entry.title}`, with the home ID `{home_id}`,"
return f"with the home ID `{home_id}`"
return ""
async def async_get_version_info(hass: HomeAssistant, ws_address: str) -> VersionInfo:
"""Return Z-Wave JS version info."""
try:
async with asyncio.timeout(SERVER_VERSION_TIMEOUT):
version_info: VersionInfo = await get_server_version(
ws_address, async_get_clientsession(hass)
)
except (TimeoutError, aiohttp.ClientError) as err:
# We don't want to spam the log if the add-on isn't started
# or takes a long time to start.
LOGGER.debug("Failed to connect to Z-Wave JS server: %s", err)
raise CannotConnect from err
return version_info
class CannotConnect(HomeAssistantError):
"""Indicate connection error."""

View File

@@ -42,8 +42,6 @@ from homeassistant.const import (
ENTITY_MATCH_ANY,
STATE_UNAVAILABLE,
STATE_UNKNOWN,
SUN_EVENT_SUNRISE,
SUN_EVENT_SUNSET,
WEEKDAYS,
)
from homeassistant.core import HomeAssistant, State, callback
@@ -60,7 +58,6 @@ from homeassistant.util import dt as dt_util
from homeassistant.util.async_ import run_callback_threadsafe
from . import config_validation as cv, entity_registry as er
from .sun import get_astral_event_date
from .template import Template, render_complex
from .trace import (
TraceElement,
@@ -85,7 +82,6 @@ _PLATFORM_ALIASES = {
"numeric_state": None,
"or": None,
"state": None,
"sun": None,
"template": None,
"time": None,
"trigger": None,
@@ -655,105 +651,6 @@ def state_from_config(config: ConfigType) -> ConditionCheckerType:
return if_state
def sun(
hass: HomeAssistant,
before: str | None = None,
after: str | None = None,
before_offset: timedelta | None = None,
after_offset: timedelta | None = None,
) -> bool:
"""Test if current time matches sun requirements."""
utcnow = dt_util.utcnow()
today = dt_util.as_local(utcnow).date()
before_offset = before_offset or timedelta(0)
after_offset = after_offset or timedelta(0)
sunrise = get_astral_event_date(hass, SUN_EVENT_SUNRISE, today)
sunset = get_astral_event_date(hass, SUN_EVENT_SUNSET, today)
has_sunrise_condition = SUN_EVENT_SUNRISE in (before, after)
has_sunset_condition = SUN_EVENT_SUNSET in (before, after)
after_sunrise = today > dt_util.as_local(cast(datetime, sunrise)).date()
if after_sunrise and has_sunrise_condition:
tomorrow = today + timedelta(days=1)
sunrise = get_astral_event_date(hass, SUN_EVENT_SUNRISE, tomorrow)
after_sunset = today > dt_util.as_local(cast(datetime, sunset)).date()
if after_sunset and has_sunset_condition:
tomorrow = today + timedelta(days=1)
sunset = get_astral_event_date(hass, SUN_EVENT_SUNSET, tomorrow)
# Special case: before sunrise OR after sunset
# This will handle the very rare case in the polar region when the sun rises/sets
# but does not set/rise.
# However this entire condition does not handle those full days of darkness
# or light, the following should be used instead:
#
# condition:
# condition: state
# entity_id: sun.sun
# state: 'above_horizon' (or 'below_horizon')
#
if before == SUN_EVENT_SUNRISE and after == SUN_EVENT_SUNSET:
wanted_time_before = cast(datetime, sunrise) + before_offset
condition_trace_update_result(wanted_time_before=wanted_time_before)
wanted_time_after = cast(datetime, sunset) + after_offset
condition_trace_update_result(wanted_time_after=wanted_time_after)
return utcnow < wanted_time_before or utcnow > wanted_time_after
if sunrise is None and has_sunrise_condition:
# There is no sunrise today
condition_trace_set_result(False, message="no sunrise today")
return False
if sunset is None and has_sunset_condition:
# There is no sunset today
condition_trace_set_result(False, message="no sunset today")
return False
if before == SUN_EVENT_SUNRISE:
wanted_time_before = cast(datetime, sunrise) + before_offset
condition_trace_update_result(wanted_time_before=wanted_time_before)
if utcnow > wanted_time_before:
return False
if before == SUN_EVENT_SUNSET:
wanted_time_before = cast(datetime, sunset) + before_offset
condition_trace_update_result(wanted_time_before=wanted_time_before)
if utcnow > wanted_time_before:
return False
if after == SUN_EVENT_SUNRISE:
wanted_time_after = cast(datetime, sunrise) + after_offset
condition_trace_update_result(wanted_time_after=wanted_time_after)
if utcnow < wanted_time_after:
return False
if after == SUN_EVENT_SUNSET:
wanted_time_after = cast(datetime, sunset) + after_offset
condition_trace_update_result(wanted_time_after=wanted_time_after)
if utcnow < wanted_time_after:
return False
return True
def sun_from_config(config: ConfigType) -> ConditionCheckerType:
"""Wrap action method with sun based condition."""
before = config.get("before")
after = config.get("after")
before_offset = config.get("before_offset")
after_offset = config.get("after_offset")
@trace_condition_function
def sun_if(hass: HomeAssistant, variables: TemplateVarsType = None) -> bool:
"""Validate time based if-condition."""
return sun(hass, before, after, before_offset, after_offset)
return sun_if
def template(
hass: HomeAssistant, value_template: Template, variables: TemplateVarsType = None
) -> bool:
@@ -1054,8 +951,10 @@ async def async_validate_condition_config(
return config
platform = await _async_get_condition_platform(hass, config)
if platform is not None and hasattr(platform, "async_validate_condition_config"):
return await platform.async_validate_condition_config(hass, config)
if platform is not None:
if hasattr(platform, "async_validate_condition_config"):
return await platform.async_validate_condition_config(hass, config)
return cast(ConfigType, platform.CONDITION_SCHEMA(config))
if platform is None and condition in ("numeric_state", "state"):
validator = cast(
Callable[[HomeAssistant, ConfigType], ConfigType],

View File

@@ -1090,7 +1090,7 @@ type ValueSchemas = dict[Hashable, VolSchemaType | Callable[[Any], dict[str, Any
def key_value_schemas(
key: str,
value_schemas: ValueSchemas,
default_schema: VolSchemaType | None = None,
default_schema: VolSchemaType | Callable[[Any], dict[str, Any]] | None = None,
default_description: str | None = None,
) -> Callable[[Any], dict[Hashable, Any]]:
"""Create a validator that validates based on a value for specific key.
@@ -1745,18 +1745,35 @@ BUILT_IN_CONDITIONS: ValueSchemas = {
"numeric_state": NUMERIC_STATE_CONDITION_SCHEMA,
"or": OR_CONDITION_SCHEMA,
"state": STATE_CONDITION_SCHEMA,
"sun": SUN_CONDITION_SCHEMA,
"template": TEMPLATE_CONDITION_SCHEMA,
"time": TIME_CONDITION_SCHEMA,
"trigger": TRIGGER_CONDITION_SCHEMA,
"zone": ZONE_CONDITION_SCHEMA,
}
# This is first round of validation, we don't want to mutate the config here already,
# just ensure basics as condition type and alias are there.
def _base_condition_validator(value: Any) -> Any:
vol.Schema(
{
**CONDITION_BASE_SCHEMA,
CONF_CONDITION: vol.NotIn(BUILT_IN_CONDITIONS),
},
extra=vol.ALLOW_EXTRA,
)(value)
return value
CONDITION_SCHEMA: vol.Schema = vol.Schema(
vol.Any(
vol.All(
expand_condition_shorthand,
key_value_schemas(CONF_CONDITION, BUILT_IN_CONDITIONS),
key_value_schemas(
CONF_CONDITION,
BUILT_IN_CONDITIONS,
_base_condition_validator,
),
),
dynamic_template_condition,
)
@@ -1783,7 +1800,10 @@ CONDITION_ACTION_SCHEMA: vol.Schema = vol.Schema(
key_value_schemas(
CONF_CONDITION,
BUILT_IN_CONDITIONS,
dynamic_template_condition_action,
vol.Any(
dynamic_template_condition_action,
_base_condition_validator,
),
"a list of conditions or a valid template",
),
)
@@ -1842,7 +1862,7 @@ def _base_trigger_list_flatten(triggers: list[Any]) -> list[Any]:
return flatlist
# This is first round of validation, we don't want to process the config here already,
# This is first round of validation, we don't want to mutate the config here already,
# just ensure basics as platform and ID are there.
def _base_trigger_validator(value: Any) -> Any:
_base_trigger_validator_schema(value)

View File

@@ -1,6 +1,6 @@
# Automatically generated by gen_requirements_all.py, do not edit
aiodhcpwatcher==1.1.1
aiodhcpwatcher==1.2.0
aiodiscover==2.7.0
aiodns==3.4.0
aiohasupervisor==0.3.1

4
requirements_all.txt generated
View File

@@ -214,7 +214,7 @@ aiobotocore==2.21.1
aiocomelit==0.12.1
# homeassistant.components.dhcp
aiodhcpwatcher==1.1.1
aiodhcpwatcher==1.2.0
# homeassistant.components.dhcp
aiodiscover==2.7.0
@@ -750,7 +750,7 @@ datapoint==0.9.9
dbus-fast==2.43.0
# homeassistant.components.debugpy
debugpy==1.8.13
debugpy==1.8.14
# homeassistant.components.decora_wifi
# decora-wifi==1.4

View File

@@ -18,7 +18,7 @@ pre-commit==4.0.0
pydantic==2.11.3
pylint==3.3.7
pylint-per-file-ignores==1.4.0
pipdeptree==2.25.1
pipdeptree==2.26.1
pytest-asyncio==0.26.0
pytest-aiohttp==1.1.0
pytest-cov==6.0.0

View File

@@ -202,7 +202,7 @@ aiobotocore==2.21.1
aiocomelit==0.12.1
# homeassistant.components.dhcp
aiodhcpwatcher==1.1.1
aiodhcpwatcher==1.2.0
# homeassistant.components.dhcp
aiodiscover==2.7.0
@@ -647,7 +647,7 @@ datapoint==0.9.9
dbus-fast==2.43.0
# homeassistant.components.debugpy
debugpy==1.8.13
debugpy==1.8.14
# homeassistant.components.ecovacs
deebot-client==13.1.0

View File

@@ -24,7 +24,7 @@ RUN --mount=from=ghcr.io/astral-sh/uv:0.7.1,source=/uv,target=/bin/uv \
--no-cache \
-c /usr/src/homeassistant/homeassistant/package_constraints.txt \
-r /usr/src/homeassistant/requirements.txt \
stdlib-list==0.10.0 pipdeptree==2.25.1 tqdm==4.67.1 ruff==0.11.0 \
stdlib-list==0.10.0 pipdeptree==2.26.1 tqdm==4.67.1 ruff==0.11.0 \
PyTurboJPEG==1.7.5 go2rtc-client==0.1.2 ha-ffmpeg==3.2.2 hassil==2.2.3 home-assistant-intents==2025.5.7 mutagen==1.47.0 pymicro-vad==1.0.1 pyspeex-noise==1.0.2
LABEL "name"="hassfest"

View File

@@ -82,21 +82,21 @@ async def test_form_cloud(hass: HomeAssistant, mock_setup_entry: AsyncMock) -> N
assert result["type"] is FlowResultType.FORM
result2 = await hass.config_entries.flow.async_configure(
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"hub": TEST_SERVER},
)
assert result2["type"] is FlowResultType.FORM
assert result2["step_id"] == "local_or_cloud"
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "local_or_cloud"
result3 = await hass.config_entries.flow.async_configure(
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"api_type": "cloud"},
)
assert result3["type"] is FlowResultType.FORM
assert result3["step_id"] == "cloud"
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "cloud"
with (
patch("pyoverkiz.client.OverkizClient.login", return_value=True),
@@ -105,7 +105,7 @@ async def test_form_cloud(hass: HomeAssistant, mock_setup_entry: AsyncMock) -> N
return_value=MOCK_GATEWAY_RESPONSE,
),
):
await hass.config_entries.flow.async_configure(
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"username": TEST_EMAIL, "password": TEST_PASSWORD},
)
@@ -125,13 +125,13 @@ async def test_form_only_cloud_supported(
assert result["type"] is FlowResultType.FORM
result2 = await hass.config_entries.flow.async_configure(
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"hub": TEST_SERVER2},
)
assert result2["type"] is FlowResultType.FORM
assert result2["step_id"] == "cloud"
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "cloud"
with (
patch("pyoverkiz.client.OverkizClient.login", return_value=True),
@@ -140,7 +140,7 @@ async def test_form_only_cloud_supported(
return_value=MOCK_GATEWAY_RESPONSE,
),
):
await hass.config_entries.flow.async_configure(
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"username": TEST_EMAIL, "password": TEST_PASSWORD},
)
@@ -160,28 +160,28 @@ async def test_form_local_happy_flow(
assert result["type"] is FlowResultType.FORM
result2 = await hass.config_entries.flow.async_configure(
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"hub": TEST_SERVER},
)
assert result2["type"] is FlowResultType.FORM
assert result2["step_id"] == "local_or_cloud"
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "local_or_cloud"
result3 = await hass.config_entries.flow.async_configure(
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"api_type": "local"},
)
assert result3["type"] is FlowResultType.FORM
assert result3["step_id"] == "local"
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "local"
with patch.multiple(
"pyoverkiz.client.OverkizClient",
login=AsyncMock(return_value=True),
get_gateways=AsyncMock(return_value=MOCK_GATEWAY_RESPONSE),
):
result4 = await hass.config_entries.flow.async_configure(
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
"host": "gateway-1234-5678-1234.local:8443",
@@ -192,9 +192,9 @@ async def test_form_local_happy_flow(
await hass.async_block_till_done()
assert result4["type"] is FlowResultType.CREATE_ENTRY
assert result4["title"] == "gateway-1234-5678-1234.local:8443"
assert result4["data"] == {
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == "gateway-1234-5678-1234.local:8443"
assert result["data"] == {
"host": "gateway-1234-5678-1234.local:8443",
"token": TEST_TOKEN,
"verify_ssl": True,
@@ -227,32 +227,32 @@ async def test_form_invalid_auth_cloud(
assert result["type"] is FlowResultType.FORM
result2 = await hass.config_entries.flow.async_configure(
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"hub": TEST_SERVER},
)
assert result2["type"] is FlowResultType.FORM
assert result2["step_id"] == "local_or_cloud"
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "local_or_cloud"
result3 = await hass.config_entries.flow.async_configure(
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"api_type": "cloud"},
)
assert result3["type"] is FlowResultType.FORM
assert result3["step_id"] == "cloud"
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "cloud"
with patch("pyoverkiz.client.OverkizClient.login", side_effect=side_effect):
result4 = await hass.config_entries.flow.async_configure(
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"username": TEST_EMAIL, "password": TEST_PASSWORD},
)
await hass.async_block_till_done()
assert result4["type"] is FlowResultType.FORM
assert result4["errors"] == {"base": error}
assert result["type"] is FlowResultType.FORM
assert result["errors"] == {"base": error}
@pytest.mark.parametrize(
@@ -283,24 +283,24 @@ async def test_form_invalid_auth_local(
assert result["type"] is FlowResultType.FORM
result2 = await hass.config_entries.flow.async_configure(
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"hub": TEST_SERVER},
)
assert result2["type"] is FlowResultType.FORM
assert result2["step_id"] == "local_or_cloud"
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "local_or_cloud"
result3 = await hass.config_entries.flow.async_configure(
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"api_type": "local"},
)
assert result3["type"] is FlowResultType.FORM
assert result3["step_id"] == "local"
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "local"
with patch("pyoverkiz.client.OverkizClient.login", side_effect=side_effect):
result4 = await hass.config_entries.flow.async_configure(
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
"host": TEST_HOST,
@@ -311,8 +311,8 @@ async def test_form_invalid_auth_local(
await hass.async_block_till_done()
assert result4["type"] is FlowResultType.FORM
assert result4["errors"] == {"base": error}
assert result["type"] is FlowResultType.FORM
assert result["errors"] == {"base": error}
@pytest.mark.parametrize(
@@ -331,25 +331,25 @@ async def test_form_invalid_cozytouch_auth(
assert result["type"] is FlowResultType.FORM
result2 = await hass.config_entries.flow.async_configure(
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"hub": TEST_SERVER_COZYTOUCH},
)
assert result2["type"] is FlowResultType.FORM
assert result2["step_id"] == "cloud"
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "cloud"
with patch("pyoverkiz.client.OverkizClient.login", side_effect=side_effect):
result3 = await hass.config_entries.flow.async_configure(
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"username": TEST_EMAIL, "password": TEST_PASSWORD},
)
await hass.async_block_till_done()
assert result3["type"] is FlowResultType.FORM
assert result3["errors"] == {"base": error}
assert result3["step_id"] == "cloud"
assert result["type"] is FlowResultType.FORM
assert result["errors"] == {"base": error}
assert result["step_id"] == "cloud"
async def test_cloud_abort_on_duplicate_entry(
@@ -369,21 +369,21 @@ async def test_cloud_abort_on_duplicate_entry(
assert result["type"] is FlowResultType.FORM
result2 = await hass.config_entries.flow.async_configure(
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"hub": TEST_SERVER},
)
assert result2["type"] is FlowResultType.FORM
assert result2["step_id"] == "local_or_cloud"
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "local_or_cloud"
result3 = await hass.config_entries.flow.async_configure(
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"api_type": "cloud"},
)
assert result3["type"] is FlowResultType.FORM
assert result3["step_id"] == "cloud"
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "cloud"
with (
patch("pyoverkiz.client.OverkizClient.login", return_value=True),
@@ -392,13 +392,13 @@ async def test_cloud_abort_on_duplicate_entry(
return_value=MOCK_GATEWAY_RESPONSE,
),
):
result4 = await hass.config_entries.flow.async_configure(
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"username": TEST_EMAIL, "password": TEST_PASSWORD},
)
assert result4["type"] is FlowResultType.ABORT
assert result4["reason"] == "already_configured"
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "already_configured"
async def test_local_abort_on_duplicate_entry(
@@ -425,21 +425,21 @@ async def test_local_abort_on_duplicate_entry(
assert result["type"] is FlowResultType.FORM
result2 = await hass.config_entries.flow.async_configure(
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"hub": TEST_SERVER},
)
assert result2["type"] is FlowResultType.FORM
assert result2["step_id"] == "local_or_cloud"
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "local_or_cloud"
result3 = await hass.config_entries.flow.async_configure(
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"api_type": "local"},
)
assert result3["type"] is FlowResultType.FORM
assert result3["step_id"] == "local"
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "local"
with patch.multiple(
"pyoverkiz.client.OverkizClient",
@@ -447,7 +447,7 @@ async def test_local_abort_on_duplicate_entry(
get_gateways=AsyncMock(return_value=MOCK_GATEWAY_RESPONSE),
get_setup_option=AsyncMock(return_value=True),
):
result4 = await hass.config_entries.flow.async_configure(
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
"host": TEST_HOST,
@@ -456,8 +456,8 @@ async def test_local_abort_on_duplicate_entry(
},
)
assert result4["type"] is FlowResultType.ABORT
assert result4["reason"] == "already_configured"
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "already_configured"
async def test_cloud_allow_multiple_unique_entries(
@@ -478,21 +478,21 @@ async def test_cloud_allow_multiple_unique_entries(
assert result["type"] is FlowResultType.FORM
result2 = await hass.config_entries.flow.async_configure(
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"hub": TEST_SERVER},
)
assert result2["type"] is FlowResultType.FORM
assert result2["step_id"] == "local_or_cloud"
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "local_or_cloud"
result3 = await hass.config_entries.flow.async_configure(
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"api_type": "cloud"},
)
assert result3["type"] is FlowResultType.FORM
assert result3["step_id"] == "cloud"
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "cloud"
with (
patch("pyoverkiz.client.OverkizClient.login", return_value=True),
@@ -501,14 +501,14 @@ async def test_cloud_allow_multiple_unique_entries(
return_value=MOCK_GATEWAY_RESPONSE,
),
):
result4 = await hass.config_entries.flow.async_configure(
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"username": TEST_EMAIL, "password": TEST_PASSWORD},
)
assert result4["type"] is FlowResultType.CREATE_ENTRY
assert result4["title"] == TEST_EMAIL
assert result4["data"] == {
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == TEST_EMAIL
assert result["data"] == {
"api_type": "cloud",
"username": TEST_EMAIL,
"password": TEST_PASSWORD,
@@ -544,7 +544,7 @@ async def test_cloud_reauth_success(hass: HomeAssistant) -> None:
return_value=MOCK_GATEWAY_RESPONSE,
),
):
result2 = await hass.config_entries.flow.async_configure(
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
user_input={
"username": TEST_EMAIL,
@@ -552,8 +552,8 @@ async def test_cloud_reauth_success(hass: HomeAssistant) -> None:
},
)
assert result2["type"] is FlowResultType.ABORT
assert result2["reason"] == "reauth_successful"
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "reauth_successful"
assert mock_entry.data["username"] == TEST_EMAIL
assert mock_entry.data["password"] == TEST_PASSWORD2
@@ -586,7 +586,7 @@ async def test_cloud_reauth_wrong_account(hass: HomeAssistant) -> None:
return_value=MOCK_GATEWAY2_RESPONSE,
),
):
result2 = await hass.config_entries.flow.async_configure(
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
user_input={
"username": TEST_EMAIL,
@@ -594,8 +594,8 @@ async def test_cloud_reauth_wrong_account(hass: HomeAssistant) -> None:
},
)
assert result2["type"] is FlowResultType.ABORT
assert result2["reason"] == "reauth_wrong_account"
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "reauth_wrong_account"
async def test_local_reauth_legacy(hass: HomeAssistant) -> None:
@@ -759,15 +759,15 @@ async def test_dhcp_flow(hass: HomeAssistant, mock_setup_entry: AsyncMock) -> No
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == config_entries.SOURCE_USER
result2 = await hass.config_entries.flow.async_configure(
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"hub": TEST_SERVER},
)
assert result2["type"] is FlowResultType.FORM
assert result2["step_id"] == "local_or_cloud"
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "local_or_cloud"
await hass.config_entries.flow.async_configure(
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"api_type": "cloud"},
)
@@ -776,7 +776,7 @@ async def test_dhcp_flow(hass: HomeAssistant, mock_setup_entry: AsyncMock) -> No
patch("pyoverkiz.client.OverkizClient.login", return_value=True),
patch("pyoverkiz.client.OverkizClient.get_gateways", return_value=None),
):
result4 = await hass.config_entries.flow.async_configure(
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
"username": TEST_EMAIL,
@@ -784,9 +784,9 @@ async def test_dhcp_flow(hass: HomeAssistant, mock_setup_entry: AsyncMock) -> No
},
)
assert result4["type"] is FlowResultType.CREATE_ENTRY
assert result4["title"] == TEST_EMAIL
assert result4["data"] == {
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == TEST_EMAIL
assert result["data"] == {
"username": TEST_EMAIL,
"password": TEST_PASSWORD,
"hub": TEST_SERVER,
@@ -830,21 +830,21 @@ async def test_zeroconf_flow(hass: HomeAssistant, mock_setup_entry: AsyncMock) -
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == config_entries.SOURCE_USER
result2 = await hass.config_entries.flow.async_configure(
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"hub": TEST_SERVER},
)
assert result2["type"] is FlowResultType.FORM
assert result2["step_id"] == "local_or_cloud"
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "local_or_cloud"
result3 = await hass.config_entries.flow.async_configure(
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"api_type": "cloud"},
)
assert result3["type"] is FlowResultType.FORM
assert result3["step_id"] == "cloud"
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "cloud"
with (
patch("pyoverkiz.client.OverkizClient.login", return_value=True),
@@ -853,14 +853,14 @@ async def test_zeroconf_flow(hass: HomeAssistant, mock_setup_entry: AsyncMock) -
return_value=MOCK_GATEWAY_RESPONSE,
),
):
result4 = await hass.config_entries.flow.async_configure(
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"username": TEST_EMAIL, "password": TEST_PASSWORD},
)
assert result4["type"] is FlowResultType.CREATE_ENTRY
assert result4["title"] == TEST_EMAIL
assert result4["data"] == {
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == TEST_EMAIL
assert result["data"] == {
"username": TEST_EMAIL,
"password": TEST_PASSWORD,
"hub": TEST_SERVER,
@@ -883,28 +883,28 @@ async def test_local_zeroconf_flow(
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == config_entries.SOURCE_USER
result2 = await hass.config_entries.flow.async_configure(
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"hub": TEST_SERVER},
)
assert result2["type"] is FlowResultType.FORM
assert result2["step_id"] == "local_or_cloud"
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "local_or_cloud"
result3 = await hass.config_entries.flow.async_configure(
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{"api_type": "local"},
)
assert result3["type"] is FlowResultType.FORM
assert result3["step_id"] == "local"
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "local"
with patch.multiple(
"pyoverkiz.client.OverkizClient",
login=AsyncMock(return_value=True),
get_gateways=AsyncMock(return_value=MOCK_GATEWAY_RESPONSE),
):
result4 = await hass.config_entries.flow.async_configure(
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
{
"host": "gateway-1234-5678-9123.local:8443",
@@ -913,11 +913,11 @@ async def test_local_zeroconf_flow(
},
)
assert result4["type"] is FlowResultType.CREATE_ENTRY
assert result4["title"] == "gateway-1234-5678-9123.local:8443"
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == "gateway-1234-5678-9123.local:8443"
# Verify no username/password in data
assert result4["data"] == {
assert result["data"] == {
"host": "gateway-1234-5678-9123.local:8443",
"token": TEST_TOKEN,
"verify_ssl": False,

View File

@@ -67,7 +67,7 @@ def fake_host_fixture() -> Generator[None]:
"""Patch gethostbyname."""
with patch(
"homeassistant.components.samsungtv.config_flow.socket.gethostbyname",
return_value="fake_host",
return_value="10.20.43.21",
):
yield

View File

@@ -2,6 +2,7 @@
from copy import deepcopy
from ipaddress import ip_address
import socket
from unittest.mock import ANY, AsyncMock, Mock, call, patch
import pytest
@@ -17,7 +18,10 @@ from websockets import frames
from websockets.exceptions import ConnectionClosedError, WebSocketException
from homeassistant import config_entries
from homeassistant.components.samsungtv.config_flow import SamsungTVConfigFlow
from homeassistant.components.samsungtv.config_flow import (
SamsungTVConfigFlow,
_strip_uuid,
)
from homeassistant.components.samsungtv.const import (
CONF_MANUFACTURER,
CONF_SESSION_ID,
@@ -45,6 +49,7 @@ from homeassistant.const import (
)
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import BaseServiceInfo, FlowResultType
from homeassistant.helpers import device_registry as dr
from homeassistant.helpers.service_info.dhcp import DhcpServiceInfo
from homeassistant.helpers.service_info.ssdp import (
ATTR_UPNP_FRIENDLY_NAME,
@@ -102,35 +107,22 @@ AUTODETECT_LEGACY = {
"id": "ha.component.samsung",
"method": METHOD_LEGACY,
"port": LEGACY_PORT,
"host": "fake_host",
"host": "10.20.43.21",
"timeout": TIMEOUT_REQUEST,
}
AUTODETECT_WEBSOCKET_PLAIN = {
"host": "fake_host",
"name": "HomeAssistant",
"port": 8001,
"timeout": TIMEOUT_REQUEST,
"token": None,
}
AUTODETECT_WEBSOCKET_SSL = {
"host": "fake_host",
"host": "10.20.43.21",
"name": "HomeAssistant",
"port": 8002,
"timeout": TIMEOUT_REQUEST,
"token": None,
}
DEVICEINFO_WEBSOCKET_SSL = {
"host": "fake_host",
"host": "10.20.43.21",
"session": ANY,
"port": 8002,
"timeout": TIMEOUT_WEBSOCKET,
}
DEVICEINFO_WEBSOCKET_NO_SSL = {
"host": "fake_host",
"session": ANY,
"port": 8001,
"timeout": TIMEOUT_WEBSOCKET,
}
pytestmark = pytest.mark.usefixtures("mock_setup_entry")
@@ -145,14 +137,27 @@ async def test_user_legacy(hass: HomeAssistant) -> None:
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
# entry was added
# Wrong host allow to retry
with patch(
"homeassistant.components.samsungtv.config_flow.socket.gethostbyname",
side_effect=socket.gaierror("[Error -2] Name or Service not known"),
):
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input=MOCK_USER_DATA
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
assert result["errors"] == {"base": "invalid_host"}
# Good host creates entry
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input=MOCK_USER_DATA
)
# legacy tv entry created
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == "fake_host"
assert result["data"][CONF_HOST] == "fake_host"
assert result["title"] == "10.20.43.21"
assert result["data"][CONF_HOST] == "10.20.43.21"
assert result["data"][CONF_METHOD] == METHOD_LEGACY
assert result["data"][CONF_MANUFACTURER] == DEFAULT_MANUFACTURER
assert result["data"][CONF_MODEL] is None
@@ -185,8 +190,8 @@ async def test_user_legacy_does_not_ok_first_time(hass: HomeAssistant) -> None:
# legacy tv entry created
assert result3["type"] is FlowResultType.CREATE_ENTRY
assert result3["title"] == "fake_host"
assert result3["data"][CONF_HOST] == "fake_host"
assert result3["title"] == "10.20.43.21"
assert result3["data"][CONF_HOST] == "10.20.43.21"
assert result3["data"][CONF_METHOD] == METHOD_LEGACY
assert result3["data"][CONF_MANUFACTURER] == DEFAULT_MANUFACTURER
assert result3["data"][CONF_MODEL] is None
@@ -215,7 +220,7 @@ async def test_user_websocket(hass: HomeAssistant) -> None:
# websocket tv entry created
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == "Living Room (82GXARRS)"
assert result["data"][CONF_HOST] == "fake_host"
assert result["data"][CONF_HOST] == "10.20.43.21"
assert result["data"][CONF_METHOD] == "websocket"
assert result["data"][CONF_MANUFACTURER] == "Samsung"
assert result["data"][CONF_MODEL] == "82GXARRS"
@@ -263,7 +268,7 @@ async def test_user_encrypted_websocket(
assert result4["type"] is FlowResultType.CREATE_ENTRY
assert result4["title"] == "TV-UE48JU6470 (UE48JU6400)"
assert result4["data"][CONF_HOST] == "fake_host"
assert result4["data"][CONF_HOST] == "10.20.43.21"
assert result4["data"][CONF_MAC] == "aa:bb:aa:aa:aa:aa"
assert result4["data"][CONF_MANUFACTURER] == "Samsung"
assert result4["data"][CONF_MODEL] == "UE48JU6400"
@@ -394,7 +399,7 @@ async def test_user_websocket_auth_retry(hass: HomeAssistant) -> None:
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == "Living Room (82GXARRS)"
assert result["data"][CONF_HOST] == "fake_host"
assert result["data"][CONF_HOST] == "10.20.43.21"
assert result["data"][CONF_MANUFACTURER] == "Samsung"
assert result["data"][CONF_MODEL] == "82GXARRS"
assert result["result"].unique_id == "be9554b9-c9fb-41f4-8920-22da015376a4"
@@ -2001,11 +2006,9 @@ async def test_update_incorrect_udn_matching_mac_unique_id_added_from_ssdp(
assert entry.unique_id == "be9554b9-c9fb-41f4-8920-22da015376a4"
@pytest.mark.usefixtures(
"remote_websocket", "rest_api", "remote_encrypted_websocket_failing"
)
@pytest.mark.usefixtures("remote_websocket")
async def test_update_incorrect_udn_matching_mac_from_dhcp(
hass: HomeAssistant, mock_setup_entry: AsyncMock
hass: HomeAssistant, rest_api: Mock, mock_setup_entry: AsyncMock
) -> None:
"""Test that DHCP updates the wrong udn from ssdp via mac match."""
entry = MockConfigEntry(
@@ -2016,6 +2019,12 @@ async def test_update_incorrect_udn_matching_mac_from_dhcp(
)
entry.add_to_hass(hass)
assert entry.data[CONF_HOST] == MOCK_DHCP_DATA.ip
assert entry.data[CONF_MAC] == dr.format_mac(
rest_api.rest_device_info.return_value["device"]["wifiMac"]
)
assert entry.unique_id != _strip_uuid(rest_api.rest_device_info.return_value["id"])
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_DHCP},
@@ -2026,15 +2035,14 @@ async def test_update_incorrect_udn_matching_mac_from_dhcp(
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "already_configured"
assert entry.data[CONF_MAC] == "aa:bb:aa:aa:aa:aa"
# Same IP + same MAC => unique id updated
assert entry.unique_id == "be9554b9-c9fb-41f4-8920-22da015376a4"
@pytest.mark.usefixtures(
"remote_websocket", "rest_api", "remote_encrypted_websocket_failing"
)
@pytest.mark.usefixtures("remote_websocket")
async def test_no_update_incorrect_udn_not_matching_mac_from_dhcp(
hass: HomeAssistant, mock_setup_entry: AsyncMock
hass: HomeAssistant, rest_api: Mock, mock_setup_entry: AsyncMock
) -> None:
"""Test that DHCP does not update the wrong udn from ssdp via host match."""
entry = MockConfigEntry(
@@ -2045,6 +2053,12 @@ async def test_no_update_incorrect_udn_not_matching_mac_from_dhcp(
)
entry.add_to_hass(hass)
assert entry.data[CONF_HOST] == MOCK_DHCP_DATA.ip
assert entry.data[CONF_MAC] != dr.format_mac(
rest_api.rest_device_info.return_value["device"]["wifiMac"]
)
assert entry.unique_id != _strip_uuid(rest_api.rest_device_info.return_value["id"])
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_DHCP},
@@ -2055,7 +2069,8 @@ async def test_no_update_incorrect_udn_not_matching_mac_from_dhcp(
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "confirm"
assert entry.data[CONF_MAC] == "aa:bb:ss:ss:dd:pp"
# Same IP + different MAC => unique id not updated
assert entry.unique_id == "0d1cef00-00dc-1000-9c80-4844f7b172de"

View File

@@ -1,6 +1,5 @@
"""Tests for the Samsung TV Integration."""
from typing import Any
from unittest.mock import AsyncMock, Mock, patch
import pytest
@@ -16,8 +15,6 @@ from homeassistant.components.samsungtv.const import (
CONF_SSDP_MAIN_TV_AGENT_LOCATION,
CONF_SSDP_RENDERING_CONTROL_LOCATION,
DOMAIN,
LEGACY_PORT,
METHOD_LEGACY,
METHOD_WEBSOCKET,
UPNP_SVC_MAIN_TV_AGENT,
UPNP_SVC_RENDERING_CONTROL,
@@ -53,6 +50,7 @@ MOCK_CONFIG = {
CONF_HOST: "fake_host",
CONF_NAME: "fake_name",
CONF_METHOD: METHOD_WEBSOCKET,
CONF_PORT: 8001,
}
@@ -78,42 +76,6 @@ async def test_setup(hass: HomeAssistant) -> None:
)
async def test_setup_without_port_device_offline(hass: HomeAssistant) -> None:
"""Test import from yaml when the device is offline."""
with (
patch("homeassistant.components.samsungtv.bridge.Remote", side_effect=OSError),
patch(
"homeassistant.components.samsungtv.bridge.SamsungTVEncryptedWSAsyncRemote.start_listening",
side_effect=OSError,
),
patch(
"homeassistant.components.samsungtv.bridge.SamsungTVWSAsyncRemote.open",
side_effect=OSError,
),
patch(
"homeassistant.components.samsungtv.bridge.SamsungTVWSBridge.async_device_info",
return_value=None,
),
):
await setup_samsungtv_entry(hass, MOCK_CONFIG)
config_entries_domain = hass.config_entries.async_entries(DOMAIN)
assert len(config_entries_domain) == 1
assert config_entries_domain[0].state is ConfigEntryState.SETUP_RETRY
@pytest.mark.usefixtures(
"remote_websocket", "remote_encrypted_websocket_failing", "rest_api"
)
async def test_setup_without_port_device_online(hass: HomeAssistant) -> None:
"""Test import from yaml when the device is online."""
await setup_samsungtv_entry(hass, MOCK_CONFIG)
config_entries_domain = hass.config_entries.async_entries(DOMAIN)
assert len(config_entries_domain) == 1
assert config_entries_domain[0].data[CONF_MAC] == "aa:bb:aa:aa:aa:aa"
@pytest.mark.usefixtures("remote_websocket", "remote_encrypted_websocket_failing")
async def test_setup_h_j_model(
hass: HomeAssistant, rest_api: Mock, caplog: pytest.LogCaptureFixture
@@ -182,29 +144,6 @@ async def test_reauth_triggered_encrypted(hass: HomeAssistant) -> None:
assert len(flows_in_progress) == 1
@pytest.mark.usefixtures(
"remote_legacy", "remote_encrypted_websocket_failing", "rest_api_failing"
)
@pytest.mark.parametrize(
"entry_data",
[
{CONF_HOST: "1.2.3.4"}, # Missing port/method
{CONF_HOST: "1.2.3.4", CONF_PORT: LEGACY_PORT}, # Missing method
{CONF_HOST: "1.2.3.4", CONF_METHOD: METHOD_LEGACY}, # Missing port
],
)
async def test_update_imported_legacy(
hass: HomeAssistant, entry_data: dict[str, Any]
) -> None:
"""Test updating an imported legacy entry."""
await setup_samsungtv_entry(hass, entry_data)
entries = hass.config_entries.async_entries(DOMAIN)
assert len(entries) == 1
assert entries[0].data[CONF_METHOD] == METHOD_LEGACY
assert entries[0].data[CONF_PORT] == LEGACY_PORT
@pytest.mark.usefixtures("remote_websocket", "rest_api")
async def test_incorrectly_formatted_mac_fixed(hass: HomeAssistant) -> None:
"""Test incorrectly formatted mac is corrected."""

View File

@@ -1078,3 +1078,21 @@ async def test_xmod_model_lookup(
)
assert device
assert device.model == xmod_model
async def test_device_entry_bt_address(
hass: HomeAssistant,
device_registry: dr.DeviceRegistry,
mock_rpc_device: Mock,
) -> None:
"""Check if BT address is added to device entry connections."""
entry = await init_integration(hass, 2)
device = device_registry.async_get_device(
identifiers={(DOMAIN, entry.entry_id)},
connections={(dr.CONNECTION_NETWORK_MAC, dr.format_mac(entry.unique_id))},
)
assert device
assert len(device.connections) == 2
assert (dr.CONNECTION_BLUETOOTH, "12:34:56:78:9A:BE") in device.connections

File diff suppressed because it is too large Load Diff

View File

@@ -2529,9 +2529,8 @@ async def test_validate_config_works(
"state": "paulus",
},
(
"Unexpected value for condition: 'non_existing'. Expected and, device,"
" not, numeric_state, or, state, sun, template, time, trigger, zone "
"@ data[0]"
"Invalid condition \"non_existing\" specified {'condition': "
"'non_existing', 'entity_id': 'hello.world', 'state': 'paulus'}"
),
),
# Raises HomeAssistantError

View File

@@ -1,6 +1,7 @@
"""Provide common Z-Wave JS fixtures."""
import asyncio
from collections.abc import Generator
import copy
import io
from typing import Any, cast
@@ -15,6 +16,7 @@ from zwave_js_server.version import VersionInfo
from homeassistant.components.zwave_js import PLATFORMS
from homeassistant.components.zwave_js.const import DOMAIN
from homeassistant.components.zwave_js.helpers import SERVER_VERSION_TIMEOUT
from homeassistant.const import Platform
from homeassistant.core import HomeAssistant
from homeassistant.util.json import JsonArrayType
@@ -587,6 +589,44 @@ def mock_client_fixture(
yield client
@pytest.fixture(name="server_version_side_effect")
def server_version_side_effect_fixture() -> Any | None:
"""Return the server version side effect."""
return None
@pytest.fixture(name="get_server_version", autouse=True)
def mock_get_server_version(
server_version_side_effect: Any | None, server_version_timeout: int
) -> Generator[AsyncMock]:
"""Mock server version."""
version_info = VersionInfo(
driver_version="mock-driver-version",
server_version="mock-server-version",
home_id=1234,
min_schema_version=0,
max_schema_version=1,
)
with (
patch(
"homeassistant.components.zwave_js.helpers.get_server_version",
side_effect=server_version_side_effect,
return_value=version_info,
) as mock_version,
patch(
"homeassistant.components.zwave_js.helpers.SERVER_VERSION_TIMEOUT",
new=server_version_timeout,
),
):
yield mock_version
@pytest.fixture(name="server_version_timeout")
def mock_server_version_timeout() -> int:
"""Patch the timeout for getting server version."""
return SERVER_VERSION_TIMEOUT
@pytest.fixture(name="multisensor_6")
def multisensor_6_fixture(client, multisensor_6_state) -> Node:
"""Mock a multisensor 6 node."""

View File

@@ -7,6 +7,7 @@ import json
from typing import Any
from unittest.mock import AsyncMock, MagicMock, PropertyMock, call, patch
from aiohttp import ClientError
import pytest
from zwave_js_server.const import (
ExclusionStrategy,
@@ -5096,14 +5097,17 @@ async def test_subscribe_node_statistics(
async def test_hard_reset_controller(
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
device_registry: dr.DeviceRegistry,
client: MagicMock,
get_server_version: AsyncMock,
integration: MockConfigEntry,
hass_ws_client: WebSocketGenerator,
) -> None:
"""Test that the hard_reset_controller WS API call works."""
entry = integration
ws_client = await hass_ws_client(hass)
assert entry.unique_id == "3245146787"
async def async_send_command_driver_ready(
message: dict[str, Any],
@@ -5138,6 +5142,40 @@ async def test_hard_reset_controller(
assert client.async_send_command.call_args_list[0] == call(
{"command": "driver.hard_reset"}, 25
)
assert entry.unique_id == "1234"
client.async_send_command.reset_mock()
# Test client connect error when getting the server version.
get_server_version.side_effect = ClientError("Boom!")
await ws_client.send_json_auto_id(
{
TYPE: "zwave_js/hard_reset_controller",
ENTRY_ID: entry.entry_id,
}
)
msg = await ws_client.receive_json()
device = device_registry.async_get_device(
identifiers={get_device_id(client.driver, client.driver.controller.nodes[1])}
)
assert device is not None
assert msg["result"] == device.id
assert msg["success"]
assert client.async_send_command.call_count == 3
# The first call is the relevant hard reset command.
# 25 is the require_schema parameter.
assert client.async_send_command.call_args_list[0] == call(
{"command": "driver.hard_reset"}, 25
)
assert (
"Failed to get server version, cannot update config entry"
"unique id with new home id, after controller reset"
) in caplog.text
client.async_send_command.reset_mock()
@@ -5178,6 +5216,8 @@ async def test_hard_reset_controller(
{"command": "driver.hard_reset"}, 25
)
client.async_send_command.reset_mock()
# Test FailedZWaveCommand is caught
with patch(
"zwave_js_server.model.driver.Driver.async_hard_reset",

View File

@@ -17,8 +17,9 @@ from zwave_js_server.exceptions import FailedCommand
from zwave_js_server.version import VersionInfo
from homeassistant import config_entries, data_entry_flow
from homeassistant.components.zwave_js.config_flow import SERVER_VERSION_TIMEOUT, TITLE
from homeassistant.components.zwave_js.config_flow import TITLE
from homeassistant.components.zwave_js.const import ADDON_SLUG, CONF_USB_PATH, DOMAIN
from homeassistant.components.zwave_js.helpers import SERVER_VERSION_TIMEOUT
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResultType
from homeassistant.helpers.service_info.hassio import HassioServiceInfo
@@ -95,44 +96,6 @@ def mock_supervisor_fixture() -> Generator[None]:
yield
@pytest.fixture(name="server_version_side_effect")
def server_version_side_effect_fixture() -> Any | None:
"""Return the server version side effect."""
return None
@pytest.fixture(name="get_server_version", autouse=True)
def mock_get_server_version(
server_version_side_effect: Any | None, server_version_timeout: int
) -> Generator[AsyncMock]:
"""Mock server version."""
version_info = VersionInfo(
driver_version="mock-driver-version",
server_version="mock-server-version",
home_id=1234,
min_schema_version=0,
max_schema_version=1,
)
with (
patch(
"homeassistant.components.zwave_js.config_flow.get_server_version",
side_effect=server_version_side_effect,
return_value=version_info,
) as mock_version,
patch(
"homeassistant.components.zwave_js.config_flow.SERVER_VERSION_TIMEOUT",
new=server_version_timeout,
),
):
yield mock_version
@pytest.fixture(name="server_version_timeout")
def mock_server_version_timeout() -> int:
"""Patch the timeout for getting server version."""
return SERVER_VERSION_TIMEOUT
@pytest.fixture(name="addon_setup_time", autouse=True)
def mock_addon_setup_time() -> Generator[None]:
"""Mock add-on setup sleep time."""

File diff suppressed because it is too large Load Diff

View File

@@ -1460,11 +1460,6 @@ def test_key_value_schemas_with_default() -> None:
[
({"delay": "{{ invalid"}, "should be format 'HH:MM'"),
({"wait_template": "{{ invalid"}, "invalid template"),
({"condition": "invalid"}, "Unexpected value for condition: 'invalid'"),
(
{"condition": "not", "conditions": {"condition": "invalid"}},
"Unexpected value for condition: 'invalid'",
),
# The validation error message could be improved to explain that this is not
# a valid shorthand template
(
@@ -1496,7 +1491,7 @@ def test_key_value_schemas_with_default() -> None:
)
@pytest.mark.usefixtures("hass")
def test_script(caplog: pytest.LogCaptureFixture, config: dict, error: str) -> None:
"""Test script validation is user friendly."""
"""Test script action validation is user friendly."""
with pytest.raises(vol.Invalid, match=error):
cv.script_action(config)