Compare commits

...

11 Commits

Author SHA1 Message Date
Stefan Agner
9ddefaaacd Bump aiohasupervisor to 0.4.2 (#165854) 2026-03-17 23:08:57 +01:00
Ludovic BOUÉ
5c8df048b1 Fix timezone in account creation date in test snapshot (#165831) 2026-03-17 22:53:36 +01:00
Raj Laud
d86d85ec56 Fix victron_ble charger error sensor always showing unknown (#165713)
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-03-17 21:45:51 +00:00
tronikos
660f12b683 Implement dynamic-devices and stale-devices in Opower to mark it platinum (#165121)
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
2026-03-17 22:43:57 +01:00
Jan Bouwhuis
b8238c86e6 Cleanup unused vacuum test helpers (#165851) 2026-03-17 22:36:24 +01:00
Raman Gupta
754828188e Refactor Vizio integration to use DataUpdateCoordinator (#162188)
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-17 17:20:01 -04:00
Erik Montnemery
6992a3c72b Adjust name and docstring of some trigger tests (#165846) 2026-03-17 22:11:32 +01:00
Joost Lekkerkerker
738d4f662a Bump pySmartThings to 3.7.2 (#165810) 2026-03-17 21:57:20 +01:00
Carlos Sánchez López
7f33ac72ab Add alarm control panel support for Tuya WG2 alarm panel (Duosmart C30) (#165837) 2026-03-17 21:44:57 +01:00
Carlos Sánchez López
0891d814fa Add sensor support for Tuya WG2 alarm panel (Duosmart C30) (#165834) 2026-03-17 21:42:20 +01:00
Carlos Sánchez López
ddab50edcc Add binary sensor support for Tuya WG2 alarm panel (Duosmart C30) (#165833) 2026-03-17 21:41:57 +01:00
32 changed files with 1084 additions and 472 deletions

View File

@@ -6,6 +6,6 @@
"documentation": "https://www.home-assistant.io/integrations/hassio",
"iot_class": "local_polling",
"quality_scale": "internal",
"requirements": ["aiohasupervisor==0.4.1"],
"requirements": ["aiohasupervisor==0.4.2"],
"single_config_entry": true
}

View File

@@ -8,6 +8,6 @@
"integration_type": "service",
"iot_class": "cloud_polling",
"loggers": ["opower"],
"quality_scale": "silver",
"quality_scale": "platinum",
"requirements": ["opower==0.17.0"]
}

View File

@@ -58,7 +58,7 @@ rules:
docs-supported-functions: done
docs-troubleshooting: done
docs-use-cases: done
dynamic-devices: todo
dynamic-devices: done
entity-category: done
entity-device-class: done
entity-disabled-by-default: done
@@ -71,7 +71,7 @@ rules:
status: exempt
comment: The integration has no user-configurable options that are not authentication-related.
repair-issues: done
stale-devices: todo
stale-devices: done
# Platinum
async-dependency: done

View File

@@ -15,7 +15,8 @@ from homeassistant.components.sensor import (
SensorStateClass,
)
from homeassistant.const import EntityCategory, UnitOfEnergy, UnitOfVolume
from homeassistant.core import HomeAssistant
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import device_registry as dr, entity_registry as er
from homeassistant.helpers.device_registry import DeviceEntryType, DeviceInfo
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from homeassistant.helpers.typing import StateType
@@ -207,48 +208,102 @@ async def async_setup_entry(
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up the Opower sensor."""
coordinator = entry.runtime_data
entities: list[OpowerSensor] = []
opower_data_list = coordinator.data.values()
for opower_data in opower_data_list:
account = opower_data.account
forecast = opower_data.forecast
device_id = (
f"{coordinator.api.utility.subdomain()}_{account.utility_account_id}"
)
device = DeviceInfo(
identifiers={(DOMAIN, device_id)},
name=f"{account.meter_type.name} account {account.utility_account_id}",
manufacturer="Opower",
model=coordinator.api.utility.name(),
entry_type=DeviceEntryType.SERVICE,
)
sensors: tuple[OpowerEntityDescription, ...] = COMMON_SENSORS
if (
account.meter_type == MeterType.ELEC
and forecast is not None
and forecast.unit_of_measure == UnitOfMeasure.KWH
):
sensors += ELEC_SENSORS
elif (
account.meter_type == MeterType.GAS
and forecast is not None
and forecast.unit_of_measure in [UnitOfMeasure.THERM, UnitOfMeasure.CCF]
):
sensors += GAS_SENSORS
entities.extend(
OpowerSensor(
coordinator,
sensor,
account.utility_account_id,
device,
device_id,
)
for sensor in sensors
)
created_sensors: set[tuple[str, str]] = set()
async_add_entities(entities)
@callback
def _update_entities() -> None:
"""Update entities."""
new_entities: list[OpowerSensor] = []
current_account_device_ids: set[str] = set()
current_account_ids: set[str] = set()
for opower_data in coordinator.data.values():
account = opower_data.account
forecast = opower_data.forecast
device_id = (
f"{coordinator.api.utility.subdomain()}_{account.utility_account_id}"
)
current_account_device_ids.add(device_id)
current_account_ids.add(account.utility_account_id)
device = DeviceInfo(
identifiers={(DOMAIN, device_id)},
name=f"{account.meter_type.name} account {account.utility_account_id}",
manufacturer="Opower",
model=coordinator.api.utility.name(),
entry_type=DeviceEntryType.SERVICE,
)
sensors: tuple[OpowerEntityDescription, ...] = COMMON_SENSORS
if (
account.meter_type == MeterType.ELEC
and forecast is not None
and forecast.unit_of_measure == UnitOfMeasure.KWH
):
sensors += ELEC_SENSORS
elif (
account.meter_type == MeterType.GAS
and forecast is not None
and forecast.unit_of_measure in [UnitOfMeasure.THERM, UnitOfMeasure.CCF]
):
sensors += GAS_SENSORS
for sensor in sensors:
sensor_key = (account.utility_account_id, sensor.key)
if sensor_key in created_sensors:
continue
created_sensors.add(sensor_key)
new_entities.append(
OpowerSensor(
coordinator,
sensor,
account.utility_account_id,
device,
device_id,
)
)
if new_entities:
async_add_entities(new_entities)
# Remove any registered devices not in the current coordinator data
device_registry = dr.async_get(hass)
entity_registry = er.async_get(hass)
for device_entry in dr.async_entries_for_config_entry(
device_registry, entry.entry_id
):
device_domain_ids = {
identifier[1]
for identifier in device_entry.identifiers
if identifier[0] == DOMAIN
}
if not device_domain_ids:
# This device has no Opower identifiers; it may be a merged/shared
# device owned by another integration. Do not alter it here.
continue
if not device_domain_ids.isdisjoint(current_account_device_ids):
continue # device is still active
# Device is stale — remove its entities then detach it
for entity_entry in er.async_entries_for_device(
entity_registry, device_entry.id, include_disabled_entities=True
):
if entity_entry.config_entry_id != entry.entry_id:
continue
entity_registry.async_remove(entity_entry.entity_id)
device_registry.async_update_device(
device_entry.id, remove_config_entry_id=entry.entry_id
)
# Prune sensor tracking for accounts that are no longer present
if created_sensors:
stale_sensor_keys = {
sensor_key
for sensor_key in created_sensors
if sensor_key[0] not in current_account_ids
}
if stale_sensor_keys:
created_sensors.difference_update(stale_sensor_keys)
_update_entities()
entry.async_on_unload(coordinator.async_add_listener(_update_entities))
class OpowerSensor(CoordinatorEntity[OpowerCoordinator], SensorEntity):
@@ -272,6 +327,11 @@ class OpowerSensor(CoordinatorEntity[OpowerCoordinator], SensorEntity):
self._attr_device_info = device
self.utility_account_id = utility_account_id
@property
def available(self) -> bool:
"""Return if entity is available."""
return super().available and self.utility_account_id in self.coordinator.data
@property
def native_value(self) -> StateType | date | datetime:
"""Return the state."""

View File

@@ -38,5 +38,5 @@
"iot_class": "cloud_push",
"loggers": ["pysmartthings"],
"quality_scale": "bronze",
"requirements": ["pysmartthings==3.7.0"]
"requirements": ["pysmartthings==3.7.2"]
}

View File

@@ -35,7 +35,13 @@ ALARM: dict[DeviceCategory, tuple[AlarmControlPanelEntityDescription, ...]] = {
key=DPCode.MASTER_MODE,
name="Alarm",
),
)
),
DeviceCategory.WG2: (
AlarmControlPanelEntityDescription(
key=DPCode.MASTER_MODE,
name="Alarm",
),
),
}
_TUYA_TO_HA_STATE_MAPPINGS = {

View File

@@ -317,6 +317,11 @@ BINARY_SENSORS: dict[DeviceCategory, tuple[TuyaBinarySensorEntityDescription, ..
entity_category=EntityCategory.DIAGNOSTIC,
on_value="alarm",
),
TuyaBinarySensorEntityDescription(
key=DPCode.CHARGE_STATE,
device_class=BinarySensorDeviceClass.BATTERY_CHARGING,
entity_category=EntityCategory.DIAGNOSTIC,
),
),
DeviceCategory.WK: (
TuyaBinarySensorEntityDescription(

View File

@@ -1233,6 +1233,7 @@ SENSORS: dict[DeviceCategory, tuple[TuyaSensorEntityDescription, ...]] = {
),
*BATTERY_SENSORS,
),
DeviceCategory.WG2: (*BATTERY_SENSORS,),
DeviceCategory.WK: (*BATTERY_SENSORS,),
DeviceCategory.WKCZ: (
TuyaSensorEntityDescription(

View File

@@ -148,7 +148,10 @@ def error_to_state(value: float | str | None) -> str | None:
"network_c": "network",
"network_d": "network",
}
return value_map.get(value)
mapped = value_map.get(value)
if mapped is not None:
return mapped
return value if isinstance(value, str) and value in CHARGER_ERROR_OPTIONS else None
DEVICE_STATE_OPTIONS = [

View File

@@ -2,20 +2,34 @@
from __future__ import annotations
from typing import Any
from pyvizio import VizioAsync
from homeassistant.components.media_player import MediaPlayerDeviceClass
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_DEVICE_CLASS, Platform
from homeassistant.const import (
CONF_ACCESS_TOKEN,
CONF_DEVICE_CLASS,
CONF_HOST,
CONF_NAME,
Platform,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.storage import Store
from homeassistant.helpers.typing import ConfigType
from homeassistant.util.hass_dict import HassKey
from .const import CONF_APPS, DOMAIN
from .coordinator import VizioAppsDataUpdateCoordinator
from .const import DEFAULT_TIMEOUT, DEVICE_ID, DOMAIN, VIZIO_DEVICE_CLASSES
from .coordinator import (
VizioAppsDataUpdateCoordinator,
VizioConfigEntry,
VizioDeviceCoordinator,
VizioRuntimeData,
)
from .services import async_setup_services
DATA_APPS: HassKey[VizioAppsDataUpdateCoordinator] = HassKey(f"{DOMAIN}_apps")
CONFIG_SCHEMA = cv.config_entry_only_config_schema(DOMAIN)
PLATFORMS = [Platform.MEDIA_PLAYER]
@@ -26,38 +40,54 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
return True
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
async def async_setup_entry(hass: HomeAssistant, entry: VizioConfigEntry) -> bool:
"""Load the saved entities."""
host = entry.data[CONF_HOST]
token = entry.data.get(CONF_ACCESS_TOKEN)
device_class = entry.data[CONF_DEVICE_CLASS]
hass.data.setdefault(DOMAIN, {})
if (
CONF_APPS not in hass.data[DOMAIN]
and entry.data[CONF_DEVICE_CLASS] == MediaPlayerDeviceClass.TV
):
store: Store[list[dict[str, Any]]] = Store(hass, 1, DOMAIN)
coordinator = VizioAppsDataUpdateCoordinator(hass, store)
await coordinator.async_setup()
hass.data[DOMAIN][CONF_APPS] = coordinator
await coordinator.async_refresh()
# Create device
device = VizioAsync(
DEVICE_ID,
host,
entry.data[CONF_NAME],
auth_token=token,
device_type=VIZIO_DEVICE_CLASSES[device_class],
session=async_get_clientsession(hass, False),
timeout=DEFAULT_TIMEOUT,
)
# Create device coordinator
device_coordinator = VizioDeviceCoordinator(hass, entry, device)
await device_coordinator.async_config_entry_first_refresh()
# Create apps coordinator for TVs (shared across entries)
if device_class == MediaPlayerDeviceClass.TV and DATA_APPS not in hass.data:
apps_coordinator = VizioAppsDataUpdateCoordinator(hass, Store(hass, 1, DOMAIN))
await apps_coordinator.async_setup()
hass.data[DATA_APPS] = apps_coordinator
await apps_coordinator.async_refresh()
entry.runtime_data = VizioRuntimeData(
device_coordinator=device_coordinator,
)
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
return True
async def async_unload_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> bool:
async def async_unload_entry(hass: HomeAssistant, entry: VizioConfigEntry) -> bool:
"""Unload a config entry."""
unload_ok = await hass.config_entries.async_unload_platforms(
config_entry, PLATFORMS
)
if not any(
entry.data[CONF_DEVICE_CLASS] == MediaPlayerDeviceClass.TV
for entry in hass.config_entries.async_loaded_entries(DOMAIN)
):
if coordinator := hass.data[DOMAIN].pop(CONF_APPS, None):
await coordinator.async_shutdown()
unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
if not hass.data[DOMAIN]:
hass.data.pop(DOMAIN)
# Clean up apps coordinator if no TV entries remain
if unload_ok and not any(
e.data[CONF_DEVICE_CLASS] == MediaPlayerDeviceClass.TV
for e in hass.config_entries.async_loaded_entries(DOMAIN)
if e.entry_id != entry.entry_id
):
if apps_coordinator := hass.data.pop(DATA_APPS, None):
await apps_coordinator.async_shutdown()
return unload_ok

View File

@@ -8,13 +8,12 @@ import socket
from typing import Any
from pyvizio import VizioAsync, async_guess_device_type
from pyvizio.const import APP_HOME
from pyvizio.const import APP_HOME, APPS
import voluptuous as vol
from homeassistant.components.media_player import MediaPlayerDeviceClass
from homeassistant.config_entries import (
SOURCE_ZEROCONF,
ConfigEntry,
ConfigFlow,
ConfigFlowResult,
OptionsFlow,
@@ -34,6 +33,7 @@ from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.service_info.zeroconf import ZeroconfServiceInfo
from homeassistant.util.network import is_ip_address
from . import DATA_APPS
from .const import (
CONF_APPS,
CONF_APPS_TO_INCLUDE_OR_EXCLUDE,
@@ -45,6 +45,7 @@ from .const import (
DEVICE_ID,
DOMAIN,
)
from .coordinator import VizioConfigEntry
_LOGGER = logging.getLogger(__name__)
@@ -106,6 +107,14 @@ def _host_is_same(host1: str, host2: str) -> bool:
class VizioOptionsConfigFlow(OptionsFlow):
"""Handle Vizio options."""
def _get_app_list(self) -> list[dict[str, Any]]:
"""Return the current apps list, falling back to defaults."""
if (
apps_coordinator := self.hass.data.get(DATA_APPS)
) and apps_coordinator.data:
return apps_coordinator.data
return APPS
async def async_step_init(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
@@ -157,10 +166,7 @@ class VizioOptionsConfigFlow(OptionsFlow):
): cv.multi_select(
[
APP_HOME["name"],
*(
app["name"]
for app in self.hass.data[DOMAIN][CONF_APPS].data
),
*(app["name"] for app in self._get_app_list()),
]
),
}
@@ -176,7 +182,9 @@ class VizioConfigFlow(ConfigFlow, domain=DOMAIN):
@staticmethod
@callback
def async_get_options_flow(config_entry: ConfigEntry) -> VizioOptionsConfigFlow:
def async_get_options_flow(
config_entry: VizioConfigEntry,
) -> VizioOptionsConfigFlow:
"""Get the options flow for this handler."""
return VizioOptionsConfigFlow()

View File

@@ -2,22 +2,150 @@
from __future__ import annotations
from dataclasses import dataclass
from datetime import timedelta
import logging
from typing import Any
from typing import TYPE_CHECKING, Any
from pyvizio.const import APPS
from pyvizio import VizioAsync
from pyvizio.api.apps import AppConfig
from pyvizio.api.input import InputItem
from pyvizio.const import APPS, INPUT_APPS
from pyvizio.util import gen_apps_list_from_url
from homeassistant.components.media_player import MediaPlayerDeviceClass
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_DEVICE_CLASS, CONF_HOST, CONF_NAME
from homeassistant.core import HomeAssistant
from homeassistant.helpers import device_registry as dr
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.storage import Store
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import DOMAIN
from .const import DOMAIN, VIZIO_AUDIO_SETTINGS, VIZIO_SOUND_MODE
type VizioConfigEntry = ConfigEntry[VizioRuntimeData]
_LOGGER = logging.getLogger(__name__)
SCAN_INTERVAL = timedelta(seconds=30)
@dataclass(frozen=True)
class VizioRuntimeData:
"""Runtime data for Vizio integration."""
device_coordinator: VizioDeviceCoordinator
@dataclass(frozen=True)
class VizioDeviceData:
"""Raw data fetched from Vizio device."""
# Power state
is_on: bool
# Audio settings from get_all_settings("audio")
audio_settings: dict[str, Any] | None = None
# Sound mode options from get_setting_options("audio", "eq")
sound_mode_list: list[str] | None = None
# Current input from get_current_input()
current_input: str | None = None
# Available inputs from get_inputs_list()
input_list: list[InputItem] | None = None
# Current app config from get_current_app_config() (TVs only)
current_app_config: AppConfig | None = None
class VizioDeviceCoordinator(DataUpdateCoordinator[VizioDeviceData]):
"""Coordinator for Vizio device data."""
config_entry: VizioConfigEntry
def __init__(
self,
hass: HomeAssistant,
config_entry: VizioConfigEntry,
device: VizioAsync,
) -> None:
"""Initialize the coordinator."""
super().__init__(
hass,
_LOGGER,
config_entry=config_entry,
name=DOMAIN,
update_interval=SCAN_INTERVAL,
)
self.device = device
async def _async_setup(self) -> None:
"""Fetch device info and update device registry."""
model = await self.device.get_model_name(log_api_exception=False)
version = await self.device.get_version(log_api_exception=False)
if TYPE_CHECKING:
assert self.config_entry.unique_id
device_registry = dr.async_get(self.hass)
device_registry.async_get_or_create(
config_entry_id=self.config_entry.entry_id,
identifiers={(DOMAIN, self.config_entry.unique_id)},
manufacturer="VIZIO",
name=self.config_entry.data[CONF_NAME],
model=model,
sw_version=version,
)
async def _async_update_data(self) -> VizioDeviceData:
"""Fetch all device data."""
is_on = await self.device.get_power_state(log_api_exception=False)
if is_on is None:
raise UpdateFailed(
f"Unable to connect to {self.config_entry.data[CONF_HOST]}"
)
if not is_on:
return VizioDeviceData(is_on=False)
# Device is on - fetch all data
audio_settings = await self.device.get_all_settings(
VIZIO_AUDIO_SETTINGS, log_api_exception=False
)
sound_mode_list = None
if audio_settings and VIZIO_SOUND_MODE in audio_settings:
sound_mode_list = await self.device.get_setting_options(
VIZIO_AUDIO_SETTINGS, VIZIO_SOUND_MODE, log_api_exception=False
)
current_input = await self.device.get_current_input(log_api_exception=False)
input_list = await self.device.get_inputs_list(log_api_exception=False)
current_app_config = None
# Only attempt to fetch app config if the device is a TV and supports apps
if (
self.config_entry.data[CONF_DEVICE_CLASS] == MediaPlayerDeviceClass.TV
and input_list
and any(input_item.name in INPUT_APPS for input_item in input_list)
):
current_app_config = await self.device.get_current_app_config(
log_api_exception=False
)
return VizioDeviceData(
is_on=True,
audio_settings=audio_settings,
sound_mode_list=sound_mode_list,
current_input=current_input,
input_list=input_list,
current_app_config=current_app_config,
)
class VizioAppsDataUpdateCoordinator(DataUpdateCoordinator[list[dict[str, Any]]]):
"""Define an object to hold Vizio app config data."""

View File

@@ -2,11 +2,7 @@
from __future__ import annotations
from datetime import timedelta
import logging
from pyvizio import AppConfig, VizioAsync
from pyvizio.api.apps import find_app_name
from pyvizio.api.apps import AppConfig, find_app_name
from pyvizio.const import APP_HOME, INPUT_APPS, NO_APP_RUNNING, UNKNOWN_APP
from homeassistant.components.media_player import (
@@ -15,58 +11,45 @@ from homeassistant.components.media_player import (
MediaPlayerEntityFeature,
MediaPlayerState,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
CONF_ACCESS_TOKEN,
CONF_DEVICE_CLASS,
CONF_EXCLUDE,
CONF_HOST,
CONF_INCLUDE,
CONF_NAME,
)
from homeassistant.const import CONF_DEVICE_CLASS, CONF_EXCLUDE, CONF_INCLUDE
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import device_registry as dr
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.device_registry import DeviceInfo
from homeassistant.helpers.dispatcher import (
async_dispatcher_connect,
async_dispatcher_send,
)
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from . import DATA_APPS
from .const import (
CONF_ADDITIONAL_CONFIGS,
CONF_APPS,
CONF_VOLUME_STEP,
DEFAULT_TIMEOUT,
DEFAULT_VOLUME_STEP,
DEVICE_ID,
DOMAIN,
SUPPORTED_COMMANDS,
VIZIO_AUDIO_SETTINGS,
VIZIO_DEVICE_CLASSES,
VIZIO_MUTE,
VIZIO_MUTE_ON,
VIZIO_SOUND_MODE,
VIZIO_VOLUME,
)
from .coordinator import VizioAppsDataUpdateCoordinator
from .coordinator import (
VizioAppsDataUpdateCoordinator,
VizioConfigEntry,
VizioDeviceCoordinator,
)
_LOGGER = logging.getLogger(__name__)
SCAN_INTERVAL = timedelta(seconds=30)
PARALLEL_UPDATES = 0
async def async_setup_entry(
hass: HomeAssistant,
config_entry: ConfigEntry,
config_entry: VizioConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up a Vizio media player entry."""
host = config_entry.data[CONF_HOST]
token = config_entry.data.get(CONF_ACCESS_TOKEN)
name = config_entry.data[CONF_NAME]
device_class = config_entry.data[CONF_DEVICE_CLASS]
# If config entry options not set up, set them up,
@@ -105,59 +88,51 @@ async def async_setup_entry(
**params, # type: ignore[arg-type]
)
device = VizioAsync(
DEVICE_ID,
host,
name,
auth_token=token,
device_type=VIZIO_DEVICE_CLASSES[device_class],
session=async_get_clientsession(hass, False),
timeout=DEFAULT_TIMEOUT,
entity = VizioDevice(
config_entry,
device_class,
config_entry.runtime_data.device_coordinator,
hass.data.get(DATA_APPS) if device_class == MediaPlayerDeviceClass.TV else None,
)
apps_coordinator = hass.data[DOMAIN].get(CONF_APPS)
entity = VizioDevice(config_entry, device, name, device_class, apps_coordinator)
async_add_entities([entity], update_before_add=True)
async_add_entities([entity])
class VizioDevice(MediaPlayerEntity):
class VizioDevice(CoordinatorEntity[VizioDeviceCoordinator], MediaPlayerEntity):
"""Media Player implementation which performs REST requests to device."""
_attr_has_entity_name = True
_attr_name = None
_received_device_info = False
_current_input: str | None = None
_current_app_config: AppConfig | None = None
def __init__(
self,
config_entry: ConfigEntry,
device: VizioAsync,
name: str,
config_entry: VizioConfigEntry,
device_class: MediaPlayerDeviceClass,
coordinator: VizioDeviceCoordinator,
apps_coordinator: VizioAppsDataUpdateCoordinator | None,
) -> None:
"""Initialize Vizio device."""
super().__init__(coordinator)
self._config_entry = config_entry
self._apps_coordinator = apps_coordinator
self._volume_step = config_entry.options[CONF_VOLUME_STEP]
self._current_input: str | None = None
self._current_app_config: AppConfig | None = None
self._attr_sound_mode_list = []
self._available_inputs: list[str] = []
self._available_apps: list[str] = []
self._volume_step = config_entry.options[CONF_VOLUME_STEP]
self._all_apps = apps_coordinator.data if apps_coordinator else None
self._conf_apps = config_entry.options.get(CONF_APPS, {})
self._additional_app_configs = config_entry.data.get(CONF_APPS, {}).get(
CONF_ADDITIONAL_CONFIGS, []
)
self._device = device
self._max_volume = float(device.get_max_volume())
self._attr_assumed_state = True
self._device = coordinator.device
self._max_volume = float(coordinator.device.get_max_volume())
# Entity class attributes that will change with each update (we only include
# the ones that are initialized differently from the defaults)
self._attr_sound_mode_list = []
self._attr_supported_features = SUPPORTED_COMMANDS[device_class]
# Entity class attributes that will not change
@@ -165,11 +140,7 @@ class VizioDevice(MediaPlayerEntity):
assert unique_id
self._attr_unique_id = unique_id
self._attr_device_class = device_class
self._attr_device_info = DeviceInfo(
identifiers={(DOMAIN, unique_id)},
manufacturer="VIZIO",
name=name,
)
self._attr_device_info = DeviceInfo(identifiers={(DOMAIN, unique_id)})
def _apps_list(self, apps: list[str]) -> list[str]:
"""Return process apps list based on configured filters."""
@@ -181,112 +152,72 @@ class VizioDevice(MediaPlayerEntity):
return apps
async def async_update(self) -> None:
"""Retrieve latest state of the device."""
if (
is_on := await self._device.get_power_state(log_api_exception=False)
) is None:
if self._attr_available:
_LOGGER.warning(
"Lost connection to %s", self._config_entry.data[CONF_HOST]
)
self._attr_available = False
return
@callback
def _handle_coordinator_update(self) -> None:
"""Handle updated data from the coordinator."""
data = self.coordinator.data
if not self._attr_available:
_LOGGER.warning(
"Restored connection to %s", self._config_entry.data[CONF_HOST]
)
self._attr_available = True
if not self._received_device_info:
device_reg = dr.async_get(self.hass)
assert self._config_entry.unique_id
device = device_reg.async_get_device(
identifiers={(DOMAIN, self._config_entry.unique_id)}
)
if device:
device_reg.async_update_device(
device.id,
model=await self._device.get_model_name(log_api_exception=False),
sw_version=await self._device.get_version(log_api_exception=False),
)
self._received_device_info = True
if not is_on:
# Handle device off
if not data.is_on:
self._attr_state = MediaPlayerState.OFF
self._attr_volume_level = None
self._attr_is_volume_muted = None
self._current_input = None
self._attr_app_name = None
self._current_app_config = None
self._attr_sound_mode = None
self._attr_app_name = None
self._current_input = None
self._current_app_config = None
super()._handle_coordinator_update()
return
# Device is on - apply coordinator data
self._attr_state = MediaPlayerState.ON
if audio_settings := await self._device.get_all_settings(
VIZIO_AUDIO_SETTINGS, log_api_exception=False
):
# Audio settings
if data.audio_settings:
self._attr_volume_level = (
float(audio_settings[VIZIO_VOLUME]) / self._max_volume
float(data.audio_settings[VIZIO_VOLUME]) / self._max_volume
)
if VIZIO_MUTE in audio_settings:
if VIZIO_MUTE in data.audio_settings:
self._attr_is_volume_muted = (
audio_settings[VIZIO_MUTE].lower() == VIZIO_MUTE_ON
data.audio_settings[VIZIO_MUTE].lower() == VIZIO_MUTE_ON
)
else:
self._attr_is_volume_muted = None
if VIZIO_SOUND_MODE in audio_settings:
if VIZIO_SOUND_MODE in data.audio_settings:
self._attr_supported_features |= (
MediaPlayerEntityFeature.SELECT_SOUND_MODE
)
self._attr_sound_mode = audio_settings[VIZIO_SOUND_MODE]
self._attr_sound_mode = data.audio_settings[VIZIO_SOUND_MODE]
if not self._attr_sound_mode_list:
self._attr_sound_mode_list = await self._device.get_setting_options(
VIZIO_AUDIO_SETTINGS,
VIZIO_SOUND_MODE,
log_api_exception=False,
)
self._attr_sound_mode_list = data.sound_mode_list or []
else:
# Explicitly remove MediaPlayerEntityFeature.SELECT_SOUND_MODE from supported features
self._attr_supported_features &= (
~MediaPlayerEntityFeature.SELECT_SOUND_MODE
)
if input_ := await self._device.get_current_input(log_api_exception=False):
self._current_input = input_
# Input state
if data.current_input:
self._current_input = data.current_input
if data.input_list:
self._available_inputs = [i.name for i in data.input_list]
# If no inputs returned, end update
if not (inputs := await self._device.get_inputs_list(log_api_exception=False)):
return
self._available_inputs = [input_.name for input_ in inputs]
# Return before setting app variables if INPUT_APPS isn't in available inputs
if self._attr_device_class == MediaPlayerDeviceClass.SPEAKER or not any(
app for app in INPUT_APPS if app in self._available_inputs
# App state (TV only) - check if device supports apps
if (
self._attr_device_class == MediaPlayerDeviceClass.TV
and self._available_inputs
and any(app in self._available_inputs for app in INPUT_APPS)
):
return
all_apps = self._all_apps or ()
self._available_apps = self._apps_list([app["name"] for app in all_apps])
self._current_app_config = data.current_app_config
self._attr_app_name = find_app_name(
self._current_app_config,
[APP_HOME, *all_apps, *self._additional_app_configs],
)
if self._attr_app_name == NO_APP_RUNNING:
self._attr_app_name = None
# Create list of available known apps from known app list after
# filtering by CONF_INCLUDE/CONF_EXCLUDE
self._available_apps = self._apps_list(
[app["name"] for app in self._all_apps or ()]
)
self._current_app_config = await self._device.get_current_app_config(
log_api_exception=False
)
self._attr_app_name = find_app_name(
self._current_app_config,
[APP_HOME, *(self._all_apps or ()), *self._additional_app_configs],
)
if self._attr_app_name == NO_APP_RUNNING:
self._attr_app_name = None
super()._handle_coordinator_update()
def _get_additional_app_names(self) -> list[str]:
"""Return list of additional apps that were included in configuration.yaml."""
@@ -296,7 +227,7 @@ class VizioDevice(MediaPlayerEntity):
@staticmethod
async def _async_send_update_options_signal(
hass: HomeAssistant, config_entry: ConfigEntry
hass: HomeAssistant, config_entry: VizioConfigEntry
) -> None:
"""Send update event when Vizio config entry is updated."""
# Move this method to component level if another entity ever gets added for a
@@ -304,7 +235,7 @@ class VizioDevice(MediaPlayerEntity):
# See here: https://github.com/home-assistant/core/pull/30653#discussion_r366426121
async_dispatcher_send(hass, config_entry.entry_id, config_entry)
async def _async_update_options(self, config_entry: ConfigEntry) -> None:
async def _async_update_options(self, config_entry: VizioConfigEntry) -> None:
"""Update options if the update signal comes from this entity."""
self._volume_step = config_entry.options[CONF_VOLUME_STEP]
# Update so that CONF_ADDITIONAL_CONFIGS gets retained for imports
@@ -323,6 +254,11 @@ class VizioDevice(MediaPlayerEntity):
async def async_added_to_hass(self) -> None:
"""Register callbacks when entity is added."""
await super().async_added_to_hass()
# Process initial coordinator data
self._handle_coordinator_update()
# Register callback for when config entry is updated.
self.async_on_remove(
self._config_entry.add_update_listener(
@@ -337,21 +273,17 @@ class VizioDevice(MediaPlayerEntity):
)
)
if not self._apps_coordinator:
if not (apps_coordinator := self._apps_coordinator):
return
# Register callback for app list updates if device is a TV
@callback
def apps_list_update() -> None:
"""Update list of all apps."""
if not self._apps_coordinator:
return
self._all_apps = self._apps_coordinator.data
self._all_apps = apps_coordinator.data
self.async_write_ha_state()
self.async_on_remove(
self._apps_coordinator.async_add_listener(apps_list_update)
)
self.async_on_remove(apps_coordinator.async_add_listener(apps_list_update))
@property
def source(self) -> str | None:

4
requirements_all.txt generated
View File

@@ -276,7 +276,7 @@ aioguardian==2026.01.1
aioharmony==0.5.3
# homeassistant.components.hassio
aiohasupervisor==0.4.1
aiohasupervisor==0.4.2
# homeassistant.components.home_connect
aiohomeconnect==0.32.0
@@ -2494,7 +2494,7 @@ pysmappee==0.2.29
pysmarlaapi==1.0.2
# homeassistant.components.smartthings
pysmartthings==3.7.0
pysmartthings==3.7.2
# homeassistant.components.smarty
pysmarty2==0.10.3

View File

@@ -264,7 +264,7 @@ aioguardian==2026.01.1
aioharmony==0.5.3
# homeassistant.components.hassio
aiohasupervisor==0.4.1
aiohasupervisor==0.4.2
# homeassistant.components.home_connect
aiohomeconnect==0.32.0
@@ -2126,7 +2126,7 @@ pysmappee==0.2.29
pysmarlaapi==1.0.2
# homeassistant.components.smartthings
pysmartthings==3.7.0
pysmartthings==3.7.2
# homeassistant.components.smarty
pysmarty2==0.10.3

View File

@@ -127,7 +127,7 @@ async def test_button_triggers_gated_by_labs_flag(
),
],
)
async def test_button_state_trigger_behavior_any(
async def test_button_state_trigger(
hass: HomeAssistant,
service_calls: list[ServiceCall],
target_buttons: dict[str, list[str]],
@@ -137,7 +137,7 @@ async def test_button_state_trigger_behavior_any(
trigger: str,
states: list[TriggerStateDescription],
) -> None:
"""Test that the button state trigger fires when any button state changes to a specific state."""
"""Test that the button state trigger fires when targeted button state changes."""
other_entity_ids = set(target_buttons["included"]) - {entity_id}
# Set all buttons, including the tested button, to the initial state

View File

@@ -9,7 +9,7 @@
'discoverable': True,
'group': False,
'locked': False,
'created_at': datetime.datetime(2016, 11, 24, 0, 0, tzinfo=tzutc()),
'created_at': datetime.datetime(2016, 11, 24, 0, 0, tzinfo=tzlocal()),
'following_count': 328,
'followers_count': 3169,
'statuses_count': 69523,

View File

@@ -6,10 +6,11 @@ from unittest.mock import AsyncMock, patch
from opower import CostRead
import pytest
from homeassistant.components.opower.const import DOMAIN
from homeassistant.components.recorder import Recorder
from homeassistant.const import ATTR_UNIT_OF_MEASUREMENT, UnitOfEnergy, UnitOfVolume
from homeassistant.core import HomeAssistant
from homeassistant.helpers import entity_registry as er
from homeassistant.helpers import device_registry as dr, entity_registry as er
from homeassistant.util import dt as dt_util
from tests.common import MockConfigEntry
@@ -114,3 +115,111 @@ async def test_sensors(
state = hass.states.get("sensor.gas_account_222222_last_updated")
assert state
assert state.state == "2023-01-02T08:00:00+00:00"
async def test_dynamic_and_stale_devices(
recorder_mock: Recorder,
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
mock_opower_api: AsyncMock,
device_registry: dr.DeviceRegistry,
entity_registry: er.EntityRegistry,
) -> None:
"""Test the dynamic addition and removal of Opower devices."""
original_accounts = mock_opower_api.async_get_accounts.return_value
original_forecasts = mock_opower_api.async_get_forecast.return_value
await hass.config_entries.async_setup(mock_config_entry.entry_id)
await hass.async_block_till_done()
devices = dr.async_entries_for_config_entry(
device_registry, mock_config_entry.entry_id
)
entities = er.async_entries_for_config_entry(
entity_registry, mock_config_entry.entry_id
)
initial_device_ids = {device.id for device in devices}
initial_entity_ids = {entity.entity_id for entity in entities}
# Ensure we actually created some devices and entities for this entry
assert initial_device_ids
assert initial_entity_ids
# Remove the second account and update data
mock_opower_api.async_get_accounts.return_value = [original_accounts[0]]
mock_opower_api.async_get_forecast.return_value = [original_forecasts[0]]
coordinator = mock_config_entry.runtime_data
await coordinator.async_refresh()
await hass.async_block_till_done()
devices = dr.async_entries_for_config_entry(
device_registry, mock_config_entry.entry_id
)
entities = er.async_entries_for_config_entry(
entity_registry, mock_config_entry.entry_id
)
device_ids_after_removal = {device.id for device in devices}
entity_ids_after_removal = {entity.entity_id for entity in entities}
# After removing one account, we should have removed some devices/entities
# but not added any new ones.
assert device_ids_after_removal <= initial_device_ids
assert entity_ids_after_removal <= initial_entity_ids
assert device_ids_after_removal != initial_device_ids
assert entity_ids_after_removal != initial_entity_ids
# Add back the second account
mock_opower_api.async_get_accounts.return_value = original_accounts
mock_opower_api.async_get_forecast.return_value = original_forecasts
await coordinator.async_refresh()
await hass.async_block_till_done()
devices = dr.async_entries_for_config_entry(
device_registry, mock_config_entry.entry_id
)
entities = er.async_entries_for_config_entry(
entity_registry, mock_config_entry.entry_id
)
device_ids_after_restore = {device.id for device in devices}
entity_ids_after_restore = {entity.entity_id for entity in entities}
# After restoring the second account, we should be back to the original
# number of devices and entities (IDs themselves may change on re-create).
assert len(device_ids_after_restore) == len(initial_device_ids)
assert len(entity_ids_after_restore) == len(initial_entity_ids)
async def test_stale_device_removed_on_load(
recorder_mock: Recorder,
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
mock_opower_api: AsyncMock,
device_registry: dr.DeviceRegistry,
) -> None:
"""Test that a stale device present before setup is removed on first load."""
# Simulate a device that was created by a previous version / old account
# and is already registered before the integration sets up.
stale_device = device_registry.async_get_or_create(
config_entry_id=mock_config_entry.entry_id,
identifiers={(DOMAIN, "pge_stale_account_99999")},
)
assert device_registry.async_get(stale_device.id) is not None
await hass.config_entries.async_setup(mock_config_entry.entry_id)
await hass.async_block_till_done()
# Stale device should have been removed on first coordinator update
assert device_registry.async_get(stale_device.id) is None
# Active devices for known accounts should still be present,
# and the stale identifier should no longer be registered.
active_devices = dr.async_entries_for_config_entry(
device_registry, mock_config_entry.entry_id
)
active_identifiers = {
identifier
for device in active_devices
for (_domain, identifier) in device.identifiers
}
assert "pge_111111" in active_identifiers
assert "pge_222222" in active_identifiers
assert "pge_stale_account_99999" not in active_identifiers

View File

@@ -127,7 +127,7 @@ async def test_scene_triggers_gated_by_labs_flag(
),
],
)
async def test_scene_state_trigger_behavior_any(
async def test_scene_state_trigger(
hass: HomeAssistant,
service_calls: list[ServiceCall],
target_scenes: dict[str, list[str]],
@@ -137,7 +137,7 @@ async def test_scene_state_trigger_behavior_any(
trigger: str,
states: list[TriggerStateDescription],
) -> None:
"""Test that the scene state trigger fires when any scene state changes to a specific state."""
"""Test that the scene state trigger fires when targeted scene state changes."""
other_entity_ids = set(target_scenes["included"]) - {entity_id}
# Set all scenes, including the tested scene, to the initial state

View File

@@ -88,7 +88,7 @@ async def test_text_triggers_gated_by_labs_flag(
),
],
)
async def test_text_state_trigger_behavior_any(
async def test_text_state_trigger(
hass: HomeAssistant,
service_calls: list[ServiceCall],
target_texts: dict[str, list[str]],
@@ -98,7 +98,7 @@ async def test_text_state_trigger_behavior_any(
trigger: str,
states: list[TriggerStateDescription],
) -> None:
"""Test that the text state trigger fires when any text state changes to a specific state."""
"""Test that the text state trigger fires when targeted text state changes."""
other_entity_ids = set(target_texts["included"]) - {entity_id}
# Set all texts, including the tested text, to the initial state

View File

@@ -1,4 +1,58 @@
# serializer version: 1
# name: test_platform_setup_and_discovery[alarm_control_panel.c30-entry]
EntityRegistryEntrySnapshot({
'aliases': list([
None,
]),
'area_id': None,
'capabilities': None,
'config_entry_id': <ANY>,
'config_subentry_id': <ANY>,
'device_class': None,
'device_id': <ANY>,
'disabled_by': None,
'domain': 'alarm_control_panel',
'entity_category': None,
'entity_id': 'alarm_control_panel.c30',
'has_entity_name': True,
'hidden_by': None,
'icon': None,
'id': <ANY>,
'labels': set({
}),
'name': None,
'object_id_base': None,
'options': dict({
}),
'original_device_class': None,
'original_icon': None,
'original_name': None,
'platform': 'tuya',
'previous_unique_id': None,
'suggested_object_id': None,
'supported_features': <AlarmControlPanelEntityFeature: 11>,
'translation_key': None,
'unique_id': 'tuya.rirsc4vhpbv2whkp2gwmaster_mode',
'unit_of_measurement': None,
})
# ---
# name: test_platform_setup_and_discovery[alarm_control_panel.c30-state]
StateSnapshot({
'attributes': ReadOnlyDict({
'changed_by': None,
'code_arm_required': False,
'code_format': None,
'friendly_name': 'C30',
'supported_features': <AlarmControlPanelEntityFeature: 11>,
}),
'context': <ANY>,
'entity_id': 'alarm_control_panel.c30',
'last_changed': <ANY>,
'last_reported': <ANY>,
'last_updated': <ANY>,
'state': 'disarmed',
})
# ---
# name: test_platform_setup_and_discovery[alarm_control_panel.multifunction_alarm-entry]
EntityRegistryEntrySnapshot({
'aliases': list([

View File

@@ -101,6 +101,57 @@
'state': 'off',
})
# ---
# name: test_platform_setup_and_discovery[binary_sensor.c30_charging-entry]
EntityRegistryEntrySnapshot({
'aliases': list([
None,
]),
'area_id': None,
'capabilities': None,
'config_entry_id': <ANY>,
'config_subentry_id': <ANY>,
'device_class': None,
'device_id': <ANY>,
'disabled_by': None,
'domain': 'binary_sensor',
'entity_category': <EntityCategory.DIAGNOSTIC: 'diagnostic'>,
'entity_id': 'binary_sensor.c30_charging',
'has_entity_name': True,
'hidden_by': None,
'icon': None,
'id': <ANY>,
'labels': set({
}),
'name': None,
'object_id_base': 'Charging',
'options': dict({
}),
'original_device_class': <BinarySensorDeviceClass.BATTERY_CHARGING: 'battery_charging'>,
'original_icon': None,
'original_name': 'Charging',
'platform': 'tuya',
'previous_unique_id': None,
'suggested_object_id': None,
'supported_features': 0,
'translation_key': None,
'unique_id': 'tuya.rirsc4vhpbv2whkp2gwcharge_state',
'unit_of_measurement': None,
})
# ---
# name: test_platform_setup_and_discovery[binary_sensor.c30_charging-state]
StateSnapshot({
'attributes': ReadOnlyDict({
'device_class': 'battery_charging',
'friendly_name': 'C30 Charging',
}),
'context': <ANY>,
'entity_id': 'binary_sensor.c30_charging',
'last_changed': <ANY>,
'last_reported': <ANY>,
'last_updated': <ANY>,
'state': 'off',
})
# ---
# name: test_platform_setup_and_discovery[binary_sensor.cat_feeder_feeding-entry]
EntityRegistryEntrySnapshot({
'aliases': list([

View File

@@ -7150,7 +7150,7 @@
'labels': set({
}),
'manufacturer': 'Tuya',
'model': 'C30 (unsupported)',
'model': 'C30',
'model_id': 'pkhw2vbphv4csrir',
'name': 'C30',
'name_by_user': None,

View File

@@ -3944,6 +3944,61 @@
'state': '0.0',
})
# ---
# name: test_platform_setup_and_discovery[sensor.c30_battery-entry]
EntityRegistryEntrySnapshot({
'aliases': list([
None,
]),
'area_id': None,
'capabilities': dict({
'state_class': <SensorStateClass.MEASUREMENT: 'measurement'>,
}),
'config_entry_id': <ANY>,
'config_subentry_id': <ANY>,
'device_class': None,
'device_id': <ANY>,
'disabled_by': None,
'domain': 'sensor',
'entity_category': <EntityCategory.DIAGNOSTIC: 'diagnostic'>,
'entity_id': 'sensor.c30_battery',
'has_entity_name': True,
'hidden_by': None,
'icon': None,
'id': <ANY>,
'labels': set({
}),
'name': None,
'object_id_base': 'Battery',
'options': dict({
}),
'original_device_class': <SensorDeviceClass.BATTERY: 'battery'>,
'original_icon': None,
'original_name': 'Battery',
'platform': 'tuya',
'previous_unique_id': None,
'suggested_object_id': None,
'supported_features': 0,
'translation_key': 'battery',
'unique_id': 'tuya.rirsc4vhpbv2whkp2gwbattery_percentage',
'unit_of_measurement': '%',
})
# ---
# name: test_platform_setup_and_discovery[sensor.c30_battery-state]
StateSnapshot({
'attributes': ReadOnlyDict({
'device_class': 'battery',
'friendly_name': 'C30 Battery',
'state_class': <SensorStateClass.MEASUREMENT: 'measurement'>,
'unit_of_measurement': '%',
}),
'context': <ANY>,
'entity_id': 'sensor.c30_battery',
'last_changed': <ANY>,
'last_reported': <ANY>,
'last_updated': <ANY>,
'state': '85.0',
})
# ---
# name: test_platform_setup_and_discovery[sensor.c9_battery-entry]
EntityRegistryEntrySnapshot({
'aliases': list([

View File

@@ -43,8 +43,11 @@ async def test_platform_setup_and_discovery(
@patch("homeassistant.components.tuya.PLATFORMS", [Platform.ALARM_CONTROL_PANEL])
@pytest.mark.parametrize(
"mock_device_code",
["mal_gyitctrjj1kefxp2"],
("mock_device_code", "entity_id"),
[
("mal_gyitctrjj1kefxp2", "alarm_control_panel.multifunction_alarm"),
("wg2_pkhw2vbphv4csrir", "alarm_control_panel.c30"),
],
)
@pytest.mark.parametrize(
("service", "command"),
@@ -62,9 +65,9 @@ async def test_service(
mock_device: CustomerDevice,
service: str,
command: dict[str, Any],
entity_id: str,
) -> None:
"""Test service."""
entity_id = "alarm_control_panel.multifunction_alarm"
await initialize_entry(hass, mock_manager, mock_config_entry, mock_device)
state = hass.states.get(entity_id)
@@ -82,8 +85,11 @@ async def test_service(
@patch("homeassistant.components.tuya.PLATFORMS", [Platform.ALARM_CONTROL_PANEL])
@pytest.mark.parametrize(
"mock_device_code",
["mal_gyitctrjj1kefxp2"],
("mock_device_code", "entity_id"),
[
("mal_gyitctrjj1kefxp2", "alarm_control_panel.multifunction_alarm"),
("wg2_pkhw2vbphv4csrir", "alarm_control_panel.c30"),
],
)
@pytest.mark.parametrize(
("status_updates", "expected_state"),
@@ -131,9 +137,9 @@ async def test_state(
mock_device: CustomerDevice,
status_updates: dict[str, Any],
expected_state: str,
entity_id: str,
) -> None:
"""Test state."""
entity_id = "alarm_control_panel.multifunction_alarm"
mock_device.status.update(status_updates)
await initialize_entry(hass, mock_manager, mock_config_entry, mock_device)

View File

@@ -30,13 +30,6 @@ from homeassistant.const import (
SERVICE_TURN_ON,
)
from homeassistant.core import HomeAssistant
from homeassistant.loader import bind_hass
@bind_hass
def turn_on(hass: HomeAssistant, entity_id: str = ENTITY_MATCH_ALL) -> None:
"""Turn all or specified vacuum on."""
hass.add_job(async_turn_on, hass, entity_id)
async def async_turn_on(hass: HomeAssistant, entity_id: str = ENTITY_MATCH_ALL) -> None:
@@ -45,12 +38,6 @@ async def async_turn_on(hass: HomeAssistant, entity_id: str = ENTITY_MATCH_ALL)
await hass.services.async_call(DOMAIN, SERVICE_TURN_ON, data, blocking=True)
@bind_hass
def turn_off(hass: HomeAssistant, entity_id: str = ENTITY_MATCH_ALL) -> None:
"""Turn all or specified vacuum off."""
hass.add_job(async_turn_off, hass, entity_id)
async def async_turn_off(
hass: HomeAssistant, entity_id: str = ENTITY_MATCH_ALL
) -> None:
@@ -59,37 +46,18 @@ async def async_turn_off(
await hass.services.async_call(DOMAIN, SERVICE_TURN_OFF, data, blocking=True)
@bind_hass
def toggle(hass: HomeAssistant, entity_id: str = ENTITY_MATCH_ALL) -> None:
"""Toggle all or specified vacuum."""
hass.add_job(async_toggle, hass, entity_id)
async def async_toggle(hass: HomeAssistant, entity_id: str = ENTITY_MATCH_ALL) -> None:
"""Toggle all or specified vacuum."""
data = {ATTR_ENTITY_ID: entity_id} if entity_id else None
await hass.services.async_call(DOMAIN, SERVICE_TOGGLE, data, blocking=True)
@bind_hass
def locate(hass: HomeAssistant, entity_id: str = ENTITY_MATCH_ALL) -> None:
"""Locate all or specified vacuum."""
hass.add_job(async_locate, hass, entity_id)
async def async_locate(hass: HomeAssistant, entity_id: str = ENTITY_MATCH_ALL) -> None:
"""Locate all or specified vacuum."""
data = {ATTR_ENTITY_ID: entity_id} if entity_id else None
await hass.services.async_call(DOMAIN, SERVICE_LOCATE, data, blocking=True)
def clean_area(
hass: HomeAssistant, cleaning_area_id: list[str], entity_id: str = ENTITY_MATCH_ALL
) -> None:
"""Tell all or specified vacuum to perform an area clean."""
hass.add_job(async_clean_area, hass, cleaning_area_id, entity_id)
async def async_clean_area(
hass: HomeAssistant,
cleaning_area_id: list[str],
@@ -102,12 +70,6 @@ async def async_clean_area(
await hass.services.async_call(DOMAIN, SERVICE_CLEAN_AREA, data, blocking=True)
@bind_hass
def clean_spot(hass: HomeAssistant, entity_id: str = ENTITY_MATCH_ALL) -> None:
"""Tell all or specified vacuum to perform a spot clean-up."""
hass.add_job(async_clean_spot, hass, entity_id)
async def async_clean_spot(
hass: HomeAssistant, entity_id: str = ENTITY_MATCH_ALL
) -> None:
@@ -116,12 +78,6 @@ async def async_clean_spot(
await hass.services.async_call(DOMAIN, SERVICE_CLEAN_SPOT, data, blocking=True)
@bind_hass
def return_to_base(hass: HomeAssistant, entity_id: str = ENTITY_MATCH_ALL) -> None:
"""Tell all or specified vacuum to return to base."""
hass.add_job(async_return_to_base, hass, entity_id)
async def async_return_to_base(
hass: HomeAssistant, entity_id: str = ENTITY_MATCH_ALL
) -> None:
@@ -130,12 +86,6 @@ async def async_return_to_base(
await hass.services.async_call(DOMAIN, SERVICE_RETURN_TO_BASE, data, blocking=True)
@bind_hass
def start_pause(hass: HomeAssistant, entity_id: str = ENTITY_MATCH_ALL) -> None:
"""Tell all or specified vacuum to start or pause the current task."""
hass.add_job(async_start_pause, hass, entity_id)
async def async_start_pause(
hass: HomeAssistant, entity_id: str = ENTITY_MATCH_ALL
) -> None:
@@ -144,50 +94,24 @@ async def async_start_pause(
await hass.services.async_call(DOMAIN, SERVICE_START_PAUSE, data, blocking=True)
@bind_hass
def start(hass: HomeAssistant, entity_id: str = ENTITY_MATCH_ALL) -> None:
"""Tell all or specified vacuum to start or resume the current task."""
hass.add_job(async_start, hass, entity_id)
async def async_start(hass: HomeAssistant, entity_id: str = ENTITY_MATCH_ALL) -> None:
"""Tell all or specified vacuum to start or resume the current task."""
data = {ATTR_ENTITY_ID: entity_id} if entity_id else None
await hass.services.async_call(DOMAIN, SERVICE_START, data, blocking=True)
@bind_hass
def pause(hass: HomeAssistant, entity_id: str = ENTITY_MATCH_ALL) -> None:
"""Tell all or the specified vacuum to pause the current task."""
hass.add_job(async_pause, hass, entity_id)
async def async_pause(hass: HomeAssistant, entity_id: str = ENTITY_MATCH_ALL) -> None:
"""Tell all or the specified vacuum to pause the current task."""
data = {ATTR_ENTITY_ID: entity_id} if entity_id else None
await hass.services.async_call(DOMAIN, SERVICE_PAUSE, data, blocking=True)
@bind_hass
def stop(hass: HomeAssistant, entity_id: str = ENTITY_MATCH_ALL) -> None:
"""Stop all or specified vacuum."""
hass.add_job(async_stop, hass, entity_id)
async def async_stop(hass: HomeAssistant, entity_id: str = ENTITY_MATCH_ALL) -> None:
"""Stop all or specified vacuum."""
data = {ATTR_ENTITY_ID: entity_id} if entity_id else None
await hass.services.async_call(DOMAIN, SERVICE_STOP, data, blocking=True)
@bind_hass
def set_fan_speed(
hass: HomeAssistant, fan_speed: str, entity_id: str = ENTITY_MATCH_ALL
) -> None:
"""Set fan speed for all or specified vacuum."""
hass.add_job(async_set_fan_speed, hass, fan_speed, entity_id)
async def async_set_fan_speed(
hass: HomeAssistant, fan_speed: str, entity_id: str = ENTITY_MATCH_ALL
) -> None:
@@ -197,17 +121,6 @@ async def async_set_fan_speed(
await hass.services.async_call(DOMAIN, SERVICE_SET_FAN_SPEED, data, blocking=True)
@bind_hass
def send_command(
hass: HomeAssistant,
command: str,
params: dict[str, Any] | list[Any] | None = None,
entity_id: str = ENTITY_MATCH_ALL,
) -> None:
"""Send command to all or specified vacuum."""
hass.add_job(async_send_command, hass, command, params, entity_id)
async def async_send_command(
hass: HomeAssistant,
command: str,

View File

@@ -290,7 +290,7 @@
'last_changed': <ANY>,
'last_reported': <ANY>,
'last_updated': <ANY>,
'state': 'unknown',
'state': 'no_error',
})
# ---
# name: test_sensors[ac_charger][sensor.smart_charger_output_phase_1_current-entry]
@@ -2861,7 +2861,7 @@
'last_changed': <ANY>,
'last_reported': <ANY>,
'last_updated': <ANY>,
'state': 'unknown',
'state': 'no_error',
})
# ---
# name: test_sensors[solar_charger][sensor.solar_charger_external_device_load-entry]

View File

@@ -4,6 +4,8 @@ from home_assistant_bluetooth import BluetoothServiceInfo
import pytest
from syrupy.assertion import SnapshotAssertion
from homeassistant.components.victron_ble.const import DOMAIN, VICTRON_IDENTIFIER
from homeassistant.const import CONF_ACCESS_TOKEN
from homeassistant.core import HomeAssistant
from homeassistant.helpers import entity_registry as er
@@ -25,6 +27,19 @@ from .fixtures import (
from tests.common import MockConfigEntry, snapshot_platform
from tests.components.bluetooth import inject_bluetooth_service_info
# Crafted solar charger advertisements with specific charger_error values.
# These are real encrypted payloads using VICTRON_SOLAR_CHARGER_TOKEN.
SOLAR_CHARGER_ERROR_PAYLOADS = {
# ChargerError.NO_ERROR -> state "no_error"
"no_error": "100242a0016207adceb37b605d7e0ee21b24df5c0404040410951e81ea42b0492e356ad5ed8f7eb7",
# ChargerError.INTERNAL_SUPPLY_A -> mapped to state "internal_supply"
"internal_supply": "100242a0016207adce787b605d7e0ee21b24df5c0404040410951e81ea42b0492e356ad5ed8f7eb7",
# ChargerError.VOLTAGE_HIGH -> state "voltage_high"
"voltage_high": "100242a0016207adceb17b605d7e0ee21b24df5c0404040410951e81ea42b0492e356ad5ed8f7eb7",
# ChargerError.NETWORK_A -> mapped to state "network"
"network": "100242a0016207adcef77b605d7e0ee21b24df5c0404040410951e81ea42b0492e356ad5ed8f7eb7",
}
@pytest.mark.usefixtures("enable_bluetooth")
@pytest.mark.parametrize(
@@ -72,3 +87,48 @@ async def test_sensors(
# Use snapshot testing to verify all entity states and registry entries
await snapshot_platform(hass, entity_registry, snapshot, entry.entry_id)
@pytest.mark.usefixtures("enable_bluetooth")
@pytest.mark.parametrize(
("payload_hex", "expected_state"),
[
(SOLAR_CHARGER_ERROR_PAYLOADS["no_error"], "no_error"),
(SOLAR_CHARGER_ERROR_PAYLOADS["internal_supply"], "internal_supply"),
(SOLAR_CHARGER_ERROR_PAYLOADS["voltage_high"], "voltage_high"),
(SOLAR_CHARGER_ERROR_PAYLOADS["network"], "network"),
],
ids=["no_error", "internal_supply_variant", "voltage_high", "network_variant"],
)
async def test_charger_error_state(
hass: HomeAssistant,
payload_hex: str,
expected_state: str,
) -> None:
"""Test that charger error values are correctly mapped to sensor states."""
entry = MockConfigEntry(
domain=DOMAIN,
data={CONF_ACCESS_TOKEN: VICTRON_SOLAR_CHARGER_TOKEN},
unique_id=VICTRON_SOLAR_CHARGER_SERVICE_INFO.address,
)
entry.add_to_hass(hass)
assert await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done()
service_info = BluetoothServiceInfo(
name="Solar Charger",
address=VICTRON_SOLAR_CHARGER_SERVICE_INFO.address,
rssi=-60,
manufacturer_data={VICTRON_IDENTIFIER: bytes.fromhex(payload_hex)},
service_data={},
service_uuids=[],
source="local",
)
inject_bluetooth_service_info(hass, service_info)
await hass.async_block_till_done()
state = hass.states.get("sensor.solar_charger_charger_error")
assert state is not None
assert state.state == expected_state

View File

@@ -142,13 +142,36 @@ def vizio_bypass_setup_fixture() -> Generator[None]:
@pytest.fixture(name="vizio_bypass_update")
def vizio_bypass_update_fixture() -> Generator[None]:
"""Mock component update."""
"""Mock component update with minimal data."""
with (
patch(
"homeassistant.components.vizio.media_player.VizioAsync.can_connect_with_auth_check",
"homeassistant.components.vizio.VizioAsync.get_power_state",
return_value=True,
),
patch("homeassistant.components.vizio.media_player.VizioDevice.async_update"),
patch(
"homeassistant.components.vizio.VizioAsync.get_all_settings",
return_value=None,
),
patch(
"homeassistant.components.vizio.VizioAsync.get_current_input",
return_value=None,
),
patch(
"homeassistant.components.vizio.VizioAsync.get_inputs_list",
return_value=None,
),
patch(
"homeassistant.components.vizio.VizioAsync.get_current_app_config",
return_value=None,
),
patch(
"homeassistant.components.vizio.VizioAsync.get_model_name",
return_value=None,
),
patch(
"homeassistant.components.vizio.VizioAsync.get_version",
return_value=None,
),
):
yield
@@ -172,7 +195,15 @@ def vizio_cant_connect_fixture() -> Generator[None]:
AsyncMock(return_value=False),
),
patch(
"homeassistant.components.vizio.media_player.VizioAsync.get_power_state",
"homeassistant.components.vizio.VizioAsync.get_power_state",
return_value=None,
),
patch(
"homeassistant.components.vizio.VizioAsync.get_model_name",
return_value=None,
),
patch(
"homeassistant.components.vizio.VizioAsync.get_version",
return_value=None,
),
):
@@ -184,11 +215,7 @@ def vizio_update_fixture() -> Generator[None]:
"""Mock valid updates to vizio device."""
with (
patch(
"homeassistant.components.vizio.media_player.VizioAsync.can_connect_with_auth_check",
return_value=True,
),
patch(
"homeassistant.components.vizio.media_player.VizioAsync.get_all_settings",
"homeassistant.components.vizio.VizioAsync.get_all_settings",
return_value={
"volume": int(MAX_VOLUME[DEVICE_CLASS_SPEAKER] / 2),
"eq": CURRENT_EQ,
@@ -196,29 +223,33 @@ def vizio_update_fixture() -> Generator[None]:
},
),
patch(
"homeassistant.components.vizio.media_player.VizioAsync.get_setting_options",
"homeassistant.components.vizio.VizioAsync.get_setting_options",
return_value=EQ_LIST,
),
patch(
"homeassistant.components.vizio.media_player.VizioAsync.get_current_input",
"homeassistant.components.vizio.VizioAsync.get_current_input",
return_value=CURRENT_INPUT,
),
patch(
"homeassistant.components.vizio.media_player.VizioAsync.get_inputs_list",
"homeassistant.components.vizio.VizioAsync.get_inputs_list",
return_value=get_mock_inputs(INPUT_LIST),
),
patch(
"homeassistant.components.vizio.media_player.VizioAsync.get_power_state",
"homeassistant.components.vizio.VizioAsync.get_power_state",
return_value=True,
),
patch(
"homeassistant.components.vizio.media_player.VizioAsync.get_model_name",
"homeassistant.components.vizio.VizioAsync.get_model_name",
return_value=MODEL,
),
patch(
"homeassistant.components.vizio.media_player.VizioAsync.get_version",
"homeassistant.components.vizio.VizioAsync.get_version",
return_value=VERSION,
),
patch(
"homeassistant.components.vizio.VizioAsync.get_current_app_config",
return_value=None,
),
):
yield
@@ -228,15 +259,15 @@ def vizio_update_with_apps_fixture(vizio_update: None) -> Generator[None]:
"""Mock valid updates to vizio device that supports apps."""
with (
patch(
"homeassistant.components.vizio.media_player.VizioAsync.get_inputs_list",
"homeassistant.components.vizio.VizioAsync.get_inputs_list",
return_value=get_mock_inputs(INPUT_LIST_WITH_APPS),
),
patch(
"homeassistant.components.vizio.media_player.VizioAsync.get_current_input",
"homeassistant.components.vizio.VizioAsync.get_current_input",
return_value="CAST",
),
patch(
"homeassistant.components.vizio.media_player.VizioAsync.get_current_app_config",
"homeassistant.components.vizio.VizioAsync.get_current_app_config",
return_value=AppConfig(**CURRENT_APP_CONFIG),
),
):
@@ -248,15 +279,15 @@ def vizio_update_with_apps_on_input_fixture(vizio_update: None) -> Generator[Non
"""Mock valid updates to vizio device that supports apps but is on a TV input."""
with (
patch(
"homeassistant.components.vizio.media_player.VizioAsync.get_inputs_list",
"homeassistant.components.vizio.VizioAsync.get_inputs_list",
return_value=get_mock_inputs(INPUT_LIST_WITH_APPS),
),
patch(
"homeassistant.components.vizio.media_player.VizioAsync.get_current_input",
"homeassistant.components.vizio.VizioAsync.get_current_input",
return_value=CURRENT_INPUT,
),
patch(
"homeassistant.components.vizio.media_player.VizioAsync.get_current_app_config",
"homeassistant.components.vizio.VizioAsync.get_current_app_config",
return_value=AppConfig("unknown", 1, "app"),
),
):

View File

@@ -5,6 +5,7 @@ import dataclasses
import pytest
from homeassistant.components.media_player import MediaPlayerDeviceClass
from homeassistant.components.vizio import DATA_APPS
from homeassistant.components.vizio.const import (
CONF_APPS,
CONF_APPS_TO_INCLUDE_OR_EXCLUDE,
@@ -142,6 +143,36 @@ async def test_tv_options_flow_no_apps(hass: HomeAssistant) -> None:
assert CONF_APPS not in result["data"]
@pytest.mark.usefixtures("vizio_connect", "vizio_bypass_update")
async def test_tv_options_flow_apps_fallback(hass: HomeAssistant) -> None:
"""Test options config flow falls back to default APPS when coordinator absent."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}, data=MOCK_USER_VALID_TV_CONFIG
)
await hass.async_block_till_done()
assert result["type"] is FlowResultType.CREATE_ENTRY
entry = result["result"]
# Remove apps coordinator to simulate it being unavailable
hass.data.pop(DATA_APPS)
result = await hass.config_entries.options.async_init(entry.entry_id, data=None)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "init"
# Completing the flow should still work with the APPS fallback
options = {CONF_VOLUME_STEP: VOLUME_STEP}
options.update(MOCK_INCLUDE_APPS)
result = await hass.config_entries.options.async_configure(
result["flow_id"], user_input=options
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["data"][CONF_APPS] == {CONF_INCLUDE: [CURRENT_APP]}
@pytest.mark.usefixtures("vizio_connect", "vizio_bypass_update")
async def test_tv_options_flow_with_apps(hass: HomeAssistant) -> None:
"""Test options config flow for TV with providing apps option."""

View File

@@ -7,6 +7,7 @@ from freezegun.api import FrozenDateTimeFactory
import pytest
from homeassistant.components.media_player import MediaPlayerDeviceClass
from homeassistant.components.vizio import DATA_APPS
from homeassistant.components.vizio.const import DOMAIN
from homeassistant.const import (
CONF_ACCESS_TOKEN,
@@ -17,14 +18,17 @@ from homeassistant.const import (
Platform,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers import device_registry as dr
from .const import (
APP_LIST,
HOST2,
MOCK_SPEAKER_CONFIG,
MOCK_USER_VALID_TV_CONFIG,
MODEL,
NAME2,
UNIQUE_ID,
VERSION,
)
from tests.common import MockConfigEntry, async_fire_time_changed
@@ -40,7 +44,7 @@ async def test_tv_load_and_unload(hass: HomeAssistant) -> None:
assert await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
assert len(hass.states.async_entity_ids(Platform.MEDIA_PLAYER)) == 1
assert DOMAIN in hass.data
assert DATA_APPS in hass.data
assert await hass.config_entries.async_unload(config_entry.entry_id)
await hass.async_block_till_done()
@@ -48,7 +52,7 @@ async def test_tv_load_and_unload(hass: HomeAssistant) -> None:
assert len(entities) == 1
for entity in entities:
assert hass.states.get(entity).state == STATE_UNAVAILABLE
assert DOMAIN not in hass.data
assert DATA_APPS not in hass.data
@pytest.mark.usefixtures("vizio_connect", "vizio_update")
@@ -61,7 +65,6 @@ async def test_speaker_load_and_unload(hass: HomeAssistant) -> None:
assert await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
assert len(hass.states.async_entity_ids(Platform.MEDIA_PLAYER)) == 1
assert DOMAIN in hass.data
assert await hass.config_entries.async_unload(config_entry.entry_id)
await hass.async_block_till_done()
@@ -69,7 +72,6 @@ async def test_speaker_load_and_unload(hass: HomeAssistant) -> None:
assert len(entities) == 1
for entity in entities:
assert hass.states.get(entity).state == STATE_UNAVAILABLE
assert DOMAIN not in hass.data
@pytest.mark.usefixtures(
@@ -88,6 +90,7 @@ async def test_coordinator_update_failure(
assert await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
assert len(hass.states.async_entity_ids(Platform.MEDIA_PLAYER)) == 1
assert DATA_APPS in hass.data
# Failing 25 days in a row should result in a single log message
# (first one after 10 days, next one would be at 30 days)
@@ -152,3 +155,41 @@ async def test_apps_coordinator_persists_until_last_tv_unloads(
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert mock_fetch.call_count == 0
@pytest.mark.usefixtures("vizio_connect", "vizio_update")
async def test_device_registry_model_and_version(
hass: HomeAssistant, device_registry: dr.DeviceRegistry
) -> None:
"""Test that coordinator populates device registry with model and version."""
config_entry = MockConfigEntry(
domain=DOMAIN, data=MOCK_USER_VALID_TV_CONFIG, unique_id=UNIQUE_ID
)
config_entry.add_to_hass(hass)
assert await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
device = device_registry.async_get_device(identifiers={(DOMAIN, UNIQUE_ID)})
assert device is not None
assert device.model == MODEL
assert device.sw_version == VERSION
assert device.manufacturer == "VIZIO"
@pytest.mark.usefixtures("vizio_connect", "vizio_bypass_update")
async def test_device_registry_without_model_or_version(
hass: HomeAssistant, device_registry: dr.DeviceRegistry
) -> None:
"""Test device registry when model and version are unavailable."""
config_entry = MockConfigEntry(
domain=DOMAIN, data=MOCK_USER_VALID_TV_CONFIG, unique_id=UNIQUE_ID
)
config_entry.add_to_hass(hass)
assert await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
device = device_registry.async_get_device(identifiers={(DOMAIN, UNIQUE_ID)})
assert device is not None
assert device.model is None
assert device.sw_version is None
assert device.manufacturer == "VIZIO"

View File

@@ -8,7 +8,7 @@ from datetime import timedelta
from typing import Any
from unittest.mock import call, patch
from freezegun import freeze_time
from freezegun.api import FrozenDateTimeFactory
import pytest
from pyvizio.api.apps import AppConfig
from pyvizio.const import (
@@ -40,6 +40,7 @@ from homeassistant.components.media_player import (
SERVICE_VOLUME_SET,
SERVICE_VOLUME_UP,
MediaPlayerDeviceClass,
MediaPlayerEntityFeature,
)
from homeassistant.components.vizio.const import (
CONF_ADDITIONAL_CONFIGS,
@@ -49,6 +50,7 @@ from homeassistant.components.vizio.const import (
DOMAIN,
)
from homeassistant.components.vizio.services import SERVICE_UPDATE_SETTING
from homeassistant.config_entries import ConfigEntryState
from homeassistant.const import ATTR_ENTITY_ID, STATE_OFF, STATE_ON, STATE_UNAVAILABLE
from homeassistant.core import HomeAssistant
from homeassistant.util import dt as dt_util
@@ -88,15 +90,12 @@ async def _add_config_entry_to_hass(
await hass.async_block_till_done()
def _get_ha_power_state(vizio_power_state: bool | None) -> str:
def _get_ha_power_state(vizio_power_state: bool) -> str:
"""Return HA power state given Vizio power state."""
if vizio_power_state:
return STATE_ON
if vizio_power_state is False:
return STATE_OFF
return STATE_UNAVAILABLE
return STATE_OFF
def _assert_sources_and_volume(attr: dict[str, Any], vizio_device_class: str) -> None:
@@ -124,27 +123,27 @@ def _get_attr_and_assert_base_attr(
@asynccontextmanager
async def _cm_for_test_setup_without_apps(
all_settings: dict[str, Any], vizio_power_state: bool | None
all_settings: dict[str, Any], vizio_power_state: bool
) -> AsyncIterator[None]:
"""Context manager to setup test for Vizio devices without including app specific patches."""
with (
patch(
"homeassistant.components.vizio.media_player.VizioAsync.get_all_settings",
"homeassistant.components.vizio.VizioAsync.get_all_settings",
return_value=all_settings,
),
patch(
"homeassistant.components.vizio.media_player.VizioAsync.get_setting_options",
"homeassistant.components.vizio.VizioAsync.get_setting_options",
return_value=EQ_LIST,
),
patch(
"homeassistant.components.vizio.media_player.VizioAsync.get_power_state",
"homeassistant.components.vizio.VizioAsync.get_power_state",
return_value=vizio_power_state,
),
):
yield
async def _test_setup_tv(hass: HomeAssistant, vizio_power_state: bool | None) -> None:
async def _test_setup_tv(hass: HomeAssistant, vizio_power_state: bool) -> None:
"""Test Vizio TV entity setup."""
ha_power_state = _get_ha_power_state(vizio_power_state)
@@ -155,7 +154,11 @@ async def _test_setup_tv(hass: HomeAssistant, vizio_power_state: bool | None) ->
)
async with _cm_for_test_setup_without_apps(
{"volume": int(MAX_VOLUME[VIZIO_DEVICE_CLASS_TV] / 2), "mute": "Off"},
{
"volume": int(MAX_VOLUME[VIZIO_DEVICE_CLASS_TV] / 2),
"mute": "Off",
"eq": CURRENT_EQ,
},
vizio_power_state,
):
await _add_config_entry_to_hass(hass, config_entry)
@@ -165,12 +168,10 @@ async def _test_setup_tv(hass: HomeAssistant, vizio_power_state: bool | None) ->
)
if ha_power_state == STATE_ON:
_assert_sources_and_volume(attr, VIZIO_DEVICE_CLASS_TV)
assert "sound_mode" not in attr
assert attr[ATTR_SOUND_MODE] == CURRENT_EQ
async def _test_setup_speaker(
hass: HomeAssistant, vizio_power_state: bool | None
) -> None:
async def _test_setup_speaker(hass: HomeAssistant, vizio_power_state: bool) -> None:
"""Test Vizio Speaker entity setup."""
ha_power_state = _get_ha_power_state(vizio_power_state)
@@ -190,18 +191,14 @@ async def _test_setup_speaker(
audio_settings,
vizio_power_state,
):
with patch(
"homeassistant.components.vizio.media_player.VizioAsync.get_current_app_config",
) as service_call:
await _add_config_entry_to_hass(hass, config_entry)
await _add_config_entry_to_hass(hass, config_entry)
attr = _get_attr_and_assert_base_attr(
hass, MediaPlayerDeviceClass.SPEAKER, ha_power_state
)
if ha_power_state == STATE_ON:
_assert_sources_and_volume(attr, VIZIO_DEVICE_CLASS_SPEAKER)
assert not service_call.called
assert "sound_mode" in attr
attr = _get_attr_and_assert_base_attr(
hass, MediaPlayerDeviceClass.SPEAKER, ha_power_state
)
if ha_power_state == STATE_ON:
_assert_sources_and_volume(attr, VIZIO_DEVICE_CLASS_SPEAKER)
assert "sound_mode" in attr
@asynccontextmanager
@@ -218,7 +215,7 @@ async def _cm_for_test_setup_tv_with_apps(
True,
):
with patch(
"homeassistant.components.vizio.media_player.VizioAsync.get_current_app_config",
"homeassistant.components.vizio.VizioAsync.get_current_app_config",
return_value=AppConfig(**app_config),
):
await _add_config_entry_to_hass(hass, config_entry)
@@ -262,7 +259,7 @@ async def _test_service(
service_data.update(additional_service_data)
with patch(
f"homeassistant.components.vizio.media_player.VizioAsync.{vizio_func_name}"
f"homeassistant.components.vizio.VizioAsync.{vizio_func_name}"
) as service_call:
await hass.services.async_call(
domain,
@@ -288,14 +285,6 @@ async def test_speaker_off(hass: HomeAssistant) -> None:
await _test_setup_speaker(hass, False)
@pytest.mark.usefixtures("vizio_connect", "vizio_update")
async def test_speaker_unavailable(
hass: HomeAssistant,
) -> None:
"""Test Vizio Speaker entity setup when unavailable."""
await _test_setup_speaker(hass, None)
@pytest.mark.usefixtures("vizio_connect", "vizio_update")
async def test_init_tv_on(hass: HomeAssistant) -> None:
"""Test Vizio TV entity setup when on."""
@@ -308,32 +297,28 @@ async def test_init_tv_off(hass: HomeAssistant) -> None:
await _test_setup_tv(hass, False)
@pytest.mark.usefixtures("vizio_connect", "vizio_update")
async def test_init_tv_unavailable(hass: HomeAssistant) -> None:
"""Test Vizio TV entity setup when unavailable."""
await _test_setup_tv(hass, None)
@pytest.mark.usefixtures("vizio_cant_connect")
async def test_setup_unavailable_speaker(hass: HomeAssistant) -> None:
"""Test speaker entity sets up as unavailable."""
"""Test speaker config entry retries setup when device is unavailable."""
config_entry = MockConfigEntry(
domain=DOMAIN, data=MOCK_SPEAKER_CONFIG, unique_id=UNIQUE_ID
)
await _add_config_entry_to_hass(hass, config_entry)
assert len(hass.states.async_entity_ids(MP_DOMAIN)) == 1
assert hass.states.get("media_player.vizio").state == STATE_UNAVAILABLE
config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
assert config_entry.state is ConfigEntryState.SETUP_RETRY
@pytest.mark.usefixtures("vizio_cant_connect")
async def test_setup_unavailable_tv(hass: HomeAssistant) -> None:
"""Test TV entity sets up as unavailable."""
"""Test TV config entry retries setup when device is unavailable."""
config_entry = MockConfigEntry(
domain=DOMAIN, data=MOCK_USER_VALID_TV_CONFIG, unique_id=UNIQUE_ID
)
await _add_config_entry_to_hass(hass, config_entry)
assert len(hass.states.async_entity_ids(MP_DOMAIN)) == 1
assert hass.states.get("media_player.vizio").state == STATE_UNAVAILABLE
config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
assert config_entry.state is ConfigEntryState.SETUP_RETRY
@pytest.mark.usefixtures("vizio_connect", "vizio_update")
@@ -377,7 +362,7 @@ async def test_services(hass: HomeAssistant) -> None:
"vol_up",
SERVICE_VOLUME_SET,
{ATTR_MEDIA_VOLUME_LEVEL: 1},
num=(100 - 15),
num=50, # From 50% to 100% = 50 steps (TV max volume 100, starting at 50)
)
await _test_service(
hass,
@@ -385,7 +370,7 @@ async def test_services(hass: HomeAssistant) -> None:
"vol_down",
SERVICE_VOLUME_SET,
{ATTR_MEDIA_VOLUME_LEVEL: 0},
num=(15 - 0),
num=100, # From 100% (after previous vol_up) to 0% = 100 steps
)
await _test_service(hass, MP_DOMAIN, "ch_up", SERVICE_MEDIA_NEXT_TRACK, None)
await _test_service(hass, MP_DOMAIN, "ch_down", SERVICE_MEDIA_PREVIOUS_TRACK, None)
@@ -444,66 +429,52 @@ async def test_options_update(hass: HomeAssistant) -> None:
)
async def _test_update_availability_switch(
@pytest.mark.usefixtures("vizio_connect", "vizio_update")
async def test_update_available_to_unavailable(
hass: HomeAssistant,
initial_power_state: bool | None,
final_power_state: bool | None,
caplog: pytest.LogCaptureFixture,
freezer: FrozenDateTimeFactory,
) -> None:
now = dt_util.utcnow()
future_interval = timedelta(minutes=1)
"""Test device becomes unavailable after being available."""
await _test_setup_speaker(hass, True)
# Setup device as if time is right now
with freeze_time(now):
await _test_setup_speaker(hass, initial_power_state)
# Clear captured logs so that only availability state changes are captured for
# future assertion
caplog.clear()
# Fast forward time to future twice to trigger update and assert vizio log message
for i in range(1, 3):
future = now + (future_interval * i)
with (
patch(
"homeassistant.components.vizio.media_player.VizioAsync.get_power_state",
return_value=final_power_state,
),
freeze_time(future),
):
async_fire_time_changed(hass, future)
await hass.async_block_till_done()
if final_power_state is None:
assert hass.states.get(ENTITY_ID).state == STATE_UNAVAILABLE
else:
assert hass.states.get(ENTITY_ID).state != STATE_UNAVAILABLE
# Ensure connection status messages from vizio.media_player appear exactly once
# (on availability state change)
vizio_log_list = [
log
for log in caplog.records
if log.name == "homeassistant.components.vizio.media_player"
]
assert len(vizio_log_list) == 1
# Simulate device becoming unreachable
with patch(
"homeassistant.components.vizio.VizioAsync.get_power_state",
return_value=None,
):
freezer.tick(timedelta(minutes=1))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert hass.states.get(ENTITY_ID).state == STATE_UNAVAILABLE
@pytest.mark.usefixtures("vizio_connect", "vizio_update")
async def test_update_unavailable_to_available(
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
freezer: FrozenDateTimeFactory,
) -> None:
"""Test device becomes available after being unavailable."""
await _test_update_availability_switch(hass, None, True, caplog)
await _test_setup_speaker(hass, True)
# First, make device unavailable
with patch(
"homeassistant.components.vizio.VizioAsync.get_power_state",
return_value=None,
):
freezer.tick(timedelta(minutes=1))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert hass.states.get(ENTITY_ID).state == STATE_UNAVAILABLE
@pytest.mark.usefixtures("vizio_connect", "vizio_update")
async def test_update_available_to_unavailable(
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test device becomes unavailable after being available."""
await _test_update_availability_switch(hass, True, None, caplog)
# Then, make device available again
with patch(
"homeassistant.components.vizio.VizioAsync.get_power_state",
return_value=True,
):
freezer.tick(timedelta(minutes=1))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert hass.states.get(ENTITY_ID).state != STATE_UNAVAILABLE
@pytest.mark.usefixtures("vizio_connect", "vizio_update_with_apps")
@@ -619,11 +590,9 @@ async def test_setup_with_apps_additional_apps_config(
# Test that invalid app does nothing
with (
patch("homeassistant.components.vizio.VizioAsync.launch_app") as service_call1,
patch(
"homeassistant.components.vizio.media_player.VizioAsync.launch_app"
) as service_call1,
patch(
"homeassistant.components.vizio.media_player.VizioAsync.launch_app_config"
"homeassistant.components.vizio.VizioAsync.launch_app_config"
) as service_call2,
):
await hass.services.async_call(
@@ -679,7 +648,7 @@ async def test_setup_tv_without_mute(hass: HomeAssistant) -> None:
async with _cm_for_test_setup_without_apps(
{"volume": int(MAX_VOLUME[VIZIO_DEVICE_CLASS_TV] / 2)},
STATE_ON,
True,
):
await _add_config_entry_to_hass(hass, config_entry)
@@ -735,3 +704,122 @@ async def test_vizio_update_with_apps_on_input(hass: HomeAssistant) -> None:
attr = _get_attr_and_assert_base_attr(hass, MediaPlayerDeviceClass.TV, STATE_ON)
# app ID should not be in the attributes
assert "app_id" not in attr
@pytest.mark.usefixtures("vizio_connect", "vizio_update")
async def test_coordinator_update_on_to_off(
hass: HomeAssistant,
freezer: FrozenDateTimeFactory,
) -> None:
"""Test device transitions from on to off during coordinator refresh."""
await _test_setup_speaker(hass, True)
attr = _get_attr_and_assert_base_attr(
hass, MediaPlayerDeviceClass.SPEAKER, STATE_ON
)
assert attr[ATTR_MEDIA_VOLUME_LEVEL] is not None
assert ATTR_SOUND_MODE in attr
# Device turns off
with patch(
"homeassistant.components.vizio.VizioAsync.get_power_state",
return_value=False,
):
freezer.tick(timedelta(minutes=1))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert hass.states.get(ENTITY_ID).state == STATE_OFF
attr = hass.states.get(ENTITY_ID).attributes
assert attr.get(ATTR_MEDIA_VOLUME_LEVEL) is None
assert attr.get(ATTR_MEDIA_VOLUME_MUTED) is None
assert attr.get(ATTR_SOUND_MODE) is None
@pytest.mark.usefixtures("vizio_connect", "vizio_update")
async def test_coordinator_update_off_to_on(
hass: HomeAssistant,
freezer: FrozenDateTimeFactory,
) -> None:
"""Test device transitions from off to on during coordinator refresh."""
await _test_setup_speaker(hass, False)
assert hass.states.get(ENTITY_ID).state == STATE_OFF
# Device turns on
freezer.tick(timedelta(minutes=1))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert hass.states.get(ENTITY_ID).state == STATE_ON
attr = hass.states.get(ENTITY_ID).attributes
assert attr[ATTR_MEDIA_VOLUME_LEVEL] is not None
assert ATTR_SOUND_MODE in attr
@pytest.mark.usefixtures("vizio_connect", "vizio_update")
async def test_sound_mode_feature_toggling(
hass: HomeAssistant,
freezer: FrozenDateTimeFactory,
) -> None:
"""Test sound mode feature is added when present and removed when absent."""
await _test_setup_speaker(hass, True)
attr = _get_attr_and_assert_base_attr(
hass, MediaPlayerDeviceClass.SPEAKER, STATE_ON
)
assert ATTR_SOUND_MODE in attr
state = hass.states.get(ENTITY_ID)
assert (
state.attributes["supported_features"]
& MediaPlayerEntityFeature.SELECT_SOUND_MODE
)
# Update with audio settings that have no sound mode
with (
patch(
"homeassistant.components.vizio.VizioAsync.get_all_settings",
return_value={"volume": 50, "mute": "Off"},
),
patch(
"homeassistant.components.vizio.VizioAsync.get_power_state",
return_value=True,
),
):
freezer.tick(timedelta(minutes=1))
async_fire_time_changed(hass)
await hass.async_block_till_done()
state = hass.states.get(ENTITY_ID)
assert state.state == STATE_ON
assert not (
state.attributes["supported_features"]
& MediaPlayerEntityFeature.SELECT_SOUND_MODE
)
@pytest.mark.usefixtures("vizio_connect", "vizio_update")
async def test_sound_mode_list_cached(
hass: HomeAssistant,
freezer: FrozenDateTimeFactory,
) -> None:
"""Test sound mode list is cached after first retrieval."""
await _test_setup_speaker(hass, True)
attr = hass.states.get(ENTITY_ID).attributes
assert attr["sound_mode_list"] == EQ_LIST
# Update with different sound mode options — cached list should persist
with (
patch(
"homeassistant.components.vizio.VizioAsync.get_setting_options",
return_value=["Different1", "Different2"],
),
patch(
"homeassistant.components.vizio.VizioAsync.get_power_state",
return_value=True,
),
):
freezer.tick(timedelta(minutes=1))
async_fire_time_changed(hass)
await hass.async_block_till_done()
attr = hass.states.get(ENTITY_ID).attributes
# Sound mode list should still be the original cached list
assert attr["sound_mode_list"] == EQ_LIST