mirror of
https://github.com/home-assistant/core.git
synced 2026-05-26 10:45:11 +02:00
Compare commits
17 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 36ee46ca8a | |||
| bfd36858f9 | |||
| 55d7892369 | |||
| 0539296683 | |||
| 94581d8ab6 | |||
| 7d6ec7fc58 | |||
| f49de3548e | |||
| 49ab42d3a2 | |||
| 383f6142f0 | |||
| 2f120cf604 | |||
| 37288849b3 | |||
| aa8659f507 | |||
| 40c0d79d1d | |||
| bef8632d78 | |||
| f00decfaa3 | |||
| 42e7add026 | |||
| 263aa3f16e |
@@ -5,8 +5,12 @@ from typing import Any
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.components import labs, websocket_api
|
||||
from homeassistant.const import EVENT_HOMEASSISTANT_STARTED
|
||||
from homeassistant.core import Event, HomeAssistant, callback
|
||||
from homeassistant.components.hassio import HassioNotReadyError
|
||||
from homeassistant.config_entries import SOURCE_SYSTEM, ConfigEntry
|
||||
from homeassistant.core import HomeAssistant, callback
|
||||
from homeassistant.exceptions import ConfigEntryNotReady
|
||||
from homeassistant.helpers import discovery_flow
|
||||
from homeassistant.helpers.start import async_at_started
|
||||
from homeassistant.helpers.typing import ConfigType
|
||||
from homeassistant.util.hass_dict import HassKey
|
||||
|
||||
@@ -49,6 +53,7 @@ CONFIG_SCHEMA = vol.Schema(
|
||||
)
|
||||
|
||||
DATA_COMPONENT: HassKey[Analytics] = HassKey(DOMAIN)
|
||||
_DATA_SNAPSHOTS_URL: HassKey[str | None] = HassKey(f"{DOMAIN}_snapshots_url")
|
||||
|
||||
LABS_SNAPSHOT_FEATURE = "snapshots"
|
||||
|
||||
@@ -57,18 +62,39 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
||||
"""Set up the analytics integration."""
|
||||
analytics_config = config.get(DOMAIN, {})
|
||||
|
||||
snapshots_url: str | None = None
|
||||
if CONF_SNAPSHOTS_URL in analytics_config:
|
||||
await labs.async_update_preview_feature(
|
||||
hass, DOMAIN, LABS_SNAPSHOT_FEATURE, enabled=True
|
||||
)
|
||||
snapshots_url = analytics_config[CONF_SNAPSHOTS_URL]
|
||||
else:
|
||||
snapshots_url = None
|
||||
|
||||
hass.data[_DATA_SNAPSHOTS_URL] = snapshots_url
|
||||
|
||||
discovery_flow.async_create_flow(
|
||||
hass, DOMAIN, context={"source": SOURCE_SYSTEM}, data={}
|
||||
)
|
||||
|
||||
websocket_api.async_register_command(hass, websocket_analytics)
|
||||
websocket_api.async_register_command(hass, websocket_analytics_preferences)
|
||||
|
||||
hass.http.register_view(AnalyticsDevicesView)
|
||||
|
||||
return True
|
||||
|
||||
|
||||
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
||||
"""Set up Analytics from a config entry."""
|
||||
snapshots_url = hass.data.get(_DATA_SNAPSHOTS_URL)
|
||||
analytics = Analytics(hass, snapshots_url)
|
||||
|
||||
# Load stored data
|
||||
await analytics.load()
|
||||
try:
|
||||
await analytics.load()
|
||||
except HassioNotReadyError as err:
|
||||
raise ConfigEntryNotReady(
|
||||
translation_domain=DOMAIN,
|
||||
translation_key="supervisor_not_ready",
|
||||
) from err
|
||||
|
||||
started = False
|
||||
|
||||
@@ -80,26 +106,30 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
||||
if started:
|
||||
await analytics.async_schedule()
|
||||
|
||||
async def start_schedule(_event: Event) -> None:
|
||||
"""Start the send schedule after the started event."""
|
||||
async def start_schedule(hass: HomeAssistant) -> None:
|
||||
"""Start the send schedule once Home Assistant has started."""
|
||||
nonlocal started
|
||||
started = True
|
||||
await analytics.async_schedule()
|
||||
|
||||
labs.async_subscribe_preview_feature(
|
||||
hass, DOMAIN, LABS_SNAPSHOT_FEATURE, _async_handle_labs_update
|
||||
entry.async_on_unload(
|
||||
labs.async_subscribe_preview_feature(
|
||||
hass, DOMAIN, LABS_SNAPSHOT_FEATURE, _async_handle_labs_update
|
||||
)
|
||||
)
|
||||
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STARTED, start_schedule)
|
||||
|
||||
websocket_api.async_register_command(hass, websocket_analytics)
|
||||
websocket_api.async_register_command(hass, websocket_analytics_preferences)
|
||||
|
||||
hass.http.register_view(AnalyticsDevicesView)
|
||||
entry.async_on_unload(async_at_started(hass, start_schedule))
|
||||
|
||||
hass.data[DATA_COMPONENT] = analytics
|
||||
return True
|
||||
|
||||
|
||||
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
||||
"""Unload an Analytics config entry."""
|
||||
analytics = hass.data.pop(DATA_COMPONENT)
|
||||
analytics.cancel_scheduled()
|
||||
return True
|
||||
|
||||
|
||||
@callback
|
||||
@websocket_api.require_admin
|
||||
@websocket_api.websocket_command({vol.Required("type"): "analytics"})
|
||||
@@ -109,7 +139,9 @@ def websocket_analytics(
|
||||
msg: dict[str, Any],
|
||||
) -> None:
|
||||
"""Return analytics preferences."""
|
||||
analytics = hass.data[DATA_COMPONENT]
|
||||
if (analytics := hass.data.get(DATA_COMPONENT)) is None:
|
||||
connection.send_error(msg["id"], websocket_api.ERR_NOT_FOUND, "Not loaded")
|
||||
return
|
||||
connection.send_result(
|
||||
msg["id"],
|
||||
{ATTR_PREFERENCES: analytics.preferences, ATTR_ONBOARDED: analytics.onboarded},
|
||||
@@ -130,8 +162,10 @@ async def websocket_analytics_preferences(
|
||||
msg: dict[str, Any],
|
||||
) -> None:
|
||||
"""Update analytics preferences."""
|
||||
if (analytics := hass.data.get(DATA_COMPONENT)) is None:
|
||||
connection.send_error(msg["id"], websocket_api.ERR_NOT_FOUND, "Not loaded")
|
||||
return
|
||||
preferences = msg[ATTR_PREFERENCES]
|
||||
analytics = hass.data[DATA_COMPONENT]
|
||||
|
||||
await analytics.save_preferences(preferences)
|
||||
await analytics.async_schedule()
|
||||
|
||||
@@ -299,12 +299,8 @@ class Analytics:
|
||||
self._data = AnalyticsData.from_dict(stored)
|
||||
|
||||
if self.supervisor and not self.onboarded:
|
||||
# This may raise HassioNotReadyError if Supervisor was unreachable
|
||||
# during setup of the Supervisor integration. That will fail setup
|
||||
# of this integration. However there is no better option at this time
|
||||
# since we need to get the diagnostic setting from Supervisor to correctly
|
||||
# setup this integration and we can't raise ConfigEntryNotReady to
|
||||
# trigger a retry from async_setup.
|
||||
# This may raise HassioNotReadyError if Supervisor was unreachable.
|
||||
# The caller is responsible for handling this and triggering a retry.
|
||||
supervisor_info = hassio.get_supervisor_info(self._hass)
|
||||
|
||||
# User have not configured analytics, get this setting from the supervisor
|
||||
@@ -349,8 +345,7 @@ class Analytics:
|
||||
await self._save()
|
||||
|
||||
if self.supervisor:
|
||||
# get_supervisor_info was called during setup so we can't get here
|
||||
# if it raised. The others may raise HassioNotReadyError if only some
|
||||
# The others may raise HassioNotReadyError if only some
|
||||
# data was successfully fetched from Supervisor
|
||||
supervisor_info = hassio.get_supervisor_info(hass)
|
||||
with contextlib.suppress(hassio.HassioNotReadyError):
|
||||
@@ -630,6 +625,16 @@ class Analytics:
|
||||
err,
|
||||
)
|
||||
|
||||
@callback
|
||||
def cancel_scheduled(self) -> None:
|
||||
"""Cancel all scheduled analytics tasks."""
|
||||
if self._basic_scheduled is not None:
|
||||
self._basic_scheduled()
|
||||
self._basic_scheduled = None
|
||||
if self._snapshot_scheduled is not None:
|
||||
self._snapshot_scheduled()
|
||||
self._snapshot_scheduled = None
|
||||
|
||||
async def async_schedule(self) -> None:
|
||||
"""Schedule analytics."""
|
||||
if not self.onboarded:
|
||||
|
||||
@@ -0,0 +1,19 @@
|
||||
"""Config flow for Analytics integration."""
|
||||
|
||||
from typing import Any
|
||||
|
||||
from homeassistant.config_entries import ConfigFlow, ConfigFlowResult
|
||||
|
||||
from .const import DOMAIN
|
||||
|
||||
|
||||
class AnalyticsConfigFlow(ConfigFlow, domain=DOMAIN):
|
||||
"""Handle a config flow for Analytics."""
|
||||
|
||||
VERSION = 1
|
||||
|
||||
async def async_step_system(
|
||||
self, user_input: dict[str, Any] | None = None
|
||||
) -> ConfigFlowResult:
|
||||
"""Handle the initial step."""
|
||||
return self.async_create_entry(title="Analytics", data={})
|
||||
@@ -3,6 +3,7 @@
|
||||
"name": "Analytics",
|
||||
"after_dependencies": ["energy", "hassio", "recorder"],
|
||||
"codeowners": ["@home-assistant/core"],
|
||||
"config_flow": true,
|
||||
"dependencies": ["api", "websocket_api", "http"],
|
||||
"documentation": "https://www.home-assistant.io/integrations/analytics",
|
||||
"integration_type": "system",
|
||||
@@ -14,5 +15,6 @@
|
||||
"report_issue_url": "https://github.com/OHF-Device-Database/device-database/issues/new"
|
||||
}
|
||||
},
|
||||
"quality_scale": "internal"
|
||||
"quality_scale": "internal",
|
||||
"single_config_entry": true
|
||||
}
|
||||
|
||||
@@ -1,4 +1,9 @@
|
||||
{
|
||||
"exceptions": {
|
||||
"supervisor_not_ready": {
|
||||
"message": "Supervisor was not ready during setup, will retry"
|
||||
}
|
||||
},
|
||||
"preview_features": {
|
||||
"snapshots": {
|
||||
"description": "We're creating the [Open Home Foundation Device Database](https://www.home-assistant.io/blog/2026/02/02/about-device-database/): a free, open source community-powered resource to help users find practical information about how smart home devices perform in real installations.\n\nYou can help us build it by opting in to share anonymized data about your devices. This data will only ever include device-specific details (like model or manufacturer) – never personally identifying information (like the names you assign).\n\nFind out how we process your data (should you choose to contribute) in our [Data Use Statement](https://www.openhomefoundation.org/device-database-data-use-statement).",
|
||||
|
||||
@@ -19,7 +19,9 @@ EXPECTED_ENTRY_VERSION = (
|
||||
@callback
|
||||
def async_info(hass: HomeAssistant) -> list[HardwareInfo]:
|
||||
"""Return board info."""
|
||||
entries = hass.config_entries.async_entries(DOMAIN)
|
||||
entries = hass.config_entries.async_entries(
|
||||
DOMAIN, include_ignore=False, include_disabled=False
|
||||
)
|
||||
return [
|
||||
HardwareInfo(
|
||||
board=None,
|
||||
|
||||
@@ -35,7 +35,6 @@ from homeassistant.helpers.device_registry import DeviceInfo
|
||||
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
|
||||
from homeassistant.helpers.typing import StateType
|
||||
from homeassistant.util.dt import utcnow
|
||||
from homeassistant.util.variance import ignore_variance
|
||||
|
||||
from .const import DOMAIN
|
||||
from .coordinator import HomeWizardConfigEntry, HWEnergyDeviceUpdateCoordinator
|
||||
@@ -66,13 +65,6 @@ def to_percentage(value: float | None) -> float | None:
|
||||
return value * 100 if value is not None else None
|
||||
|
||||
|
||||
def uptime_to_datetime(value: int) -> datetime:
|
||||
"""Convert seconds to datetime timestamp."""
|
||||
return utcnow().replace(microsecond=0) - timedelta(seconds=value)
|
||||
|
||||
|
||||
uptime_to_stable_datetime = ignore_variance(uptime_to_datetime, timedelta(minutes=5))
|
||||
|
||||
SENSORS: Final[tuple[HomeWizardSensorEntityDescription, ...]] = (
|
||||
HomeWizardSensorEntityDescription(
|
||||
key="smr_version",
|
||||
@@ -643,7 +635,7 @@ SENSORS: Final[tuple[HomeWizardSensorEntityDescription, ...]] = (
|
||||
HomeWizardSensorEntityDescription(
|
||||
key="uptime",
|
||||
translation_key="uptime",
|
||||
device_class=SensorDeviceClass.TIMESTAMP,
|
||||
device_class=SensorDeviceClass.UPTIME,
|
||||
entity_category=EntityCategory.DIAGNOSTIC,
|
||||
entity_registry_enabled_default=False,
|
||||
has_fn=(
|
||||
@@ -651,7 +643,7 @@ SENSORS: Final[tuple[HomeWizardSensorEntityDescription, ...]] = (
|
||||
),
|
||||
value_fn=(
|
||||
lambda data: (
|
||||
uptime_to_stable_datetime(data.system.uptime_s)
|
||||
utcnow() - timedelta(seconds=data.system.uptime_s)
|
||||
if data.system is not None and data.system.uptime_s is not None
|
||||
else None
|
||||
)
|
||||
|
||||
@@ -87,6 +87,8 @@ def async_get_triggers(
|
||||
|
||||
# Get Hue device id from device identifier
|
||||
hue_dev_id = get_hue_device_id(device_entry)
|
||||
if hue_dev_id is None or hue_dev_id not in api.devices:
|
||||
return []
|
||||
# extract triggers from all button resources of this Hue device
|
||||
triggers: list[dict[str, Any]] = []
|
||||
model_id = api.devices[hue_dev_id].product_data.product_name
|
||||
|
||||
@@ -118,6 +118,8 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
||||
)
|
||||
device = devices.add_x10_device(housecode, unitcode, x10_type, steps)
|
||||
|
||||
create_insteon_device(hass, devices.modem, entry.entry_id)
|
||||
|
||||
await hass.config_entries.async_forward_entry_setups(entry, INSTEON_PLATFORMS)
|
||||
|
||||
for address in devices:
|
||||
@@ -131,8 +133,6 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
||||
register_new_device_callback(hass)
|
||||
async_setup_services(hass)
|
||||
|
||||
create_insteon_device(hass, devices.modem, entry.entry_id)
|
||||
|
||||
entry.async_create_background_task(
|
||||
hass, async_get_device_config(hass, entry), "insteon-get-device-config"
|
||||
)
|
||||
|
||||
@@ -241,7 +241,7 @@ def preprocess_turn_on_alternatives(
|
||||
|
||||
if (color_name := params.pop(ATTR_COLOR_NAME, None)) is not None:
|
||||
try:
|
||||
params[ATTR_RGB_COLOR] = color_util.color_name_to_rgb(color_name)
|
||||
params[ATTR_RGB_COLOR] = tuple(color_util.color_name_to_rgb(color_name))
|
||||
except ValueError:
|
||||
_LOGGER.warning("Got unknown color %s, falling back to white", color_name)
|
||||
params[ATTR_RGB_COLOR] = (255, 255, 255)
|
||||
|
||||
@@ -60,7 +60,7 @@ def get_matter_device_info(
|
||||
return None
|
||||
|
||||
return MatterDeviceInfo(
|
||||
unique_id=node.device_info.uniqueID,
|
||||
unique_id=node.device_info.uniqueID or "",
|
||||
vendor_id=hex(node.device_info.vendorID),
|
||||
product_id=hex(node.device_info.productID),
|
||||
)
|
||||
|
||||
@@ -6,13 +6,14 @@ from typing import Any
|
||||
from chip.clusters import Objects
|
||||
from matter_server.common.helpers.util import dataclass_to_dict, parse_attribute_path
|
||||
|
||||
from homeassistant.components.diagnostics import REDACTED
|
||||
from homeassistant.components.diagnostics import REDACTED, async_redact_data
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers import device_registry as dr
|
||||
|
||||
from .helpers import MatterConfigEntry, get_matter, get_node_from_device_entry
|
||||
|
||||
ATTRIBUTES_TO_REDACT = {Objects.BasicInformation.Attributes.Location}
|
||||
SERVER_INFO_TO_REDACT = {"wifi_ssid"}
|
||||
|
||||
|
||||
def redact_matter_attributes(node_data: dict[str, Any]) -> dict[str, Any]:
|
||||
@@ -44,6 +45,7 @@ async def async_get_config_entry_diagnostics(
|
||||
matter = get_matter(hass)
|
||||
server_diagnostics = await matter.matter_client.get_diagnostics()
|
||||
data = dataclass_to_dict(server_diagnostics)
|
||||
data["info"] = async_redact_data(data["info"], SERVER_INFO_TO_REDACT)
|
||||
nodes = [redact_matter_attributes(node_data) for node_data in data["nodes"]]
|
||||
data["nodes"] = nodes
|
||||
|
||||
@@ -59,7 +61,9 @@ async def async_get_device_diagnostics(
|
||||
node = get_node_from_device_entry(hass, device)
|
||||
|
||||
return {
|
||||
"server_info": dataclass_to_dict(server_diagnostics.info),
|
||||
"server_info": async_redact_data(
|
||||
dataclass_to_dict(server_diagnostics.info), SERVER_INFO_TO_REDACT
|
||||
),
|
||||
"node": redact_matter_attributes(
|
||||
remove_serialization_type(dataclass_to_dict(node.node_data) if node else {})
|
||||
),
|
||||
|
||||
@@ -8,6 +8,6 @@
|
||||
"documentation": "https://www.home-assistant.io/integrations/matter",
|
||||
"integration_type": "hub",
|
||||
"iot_class": "local_push",
|
||||
"requirements": ["matter-python-client==0.6.0"],
|
||||
"requirements": ["matter-python-client==0.7.1"],
|
||||
"zeroconf": ["_matter._tcp.local.", "_matterc._udp.local."]
|
||||
}
|
||||
|
||||
@@ -108,6 +108,7 @@ ABBREVIATIONS = {
|
||||
"mode_stat_t": "mode_state_topic",
|
||||
"mode_stat_tpl": "mode_state_template",
|
||||
"modes": "modes",
|
||||
"msg_exp_int": "message_expiry_interval",
|
||||
"name": "name",
|
||||
"o": "origin",
|
||||
"off_dly": "off_delay",
|
||||
|
||||
@@ -120,6 +120,8 @@ from homeassistant.helpers.hassio import is_hassio
|
||||
from homeassistant.helpers.json import json_dumps
|
||||
from homeassistant.helpers.selector import (
|
||||
BooleanSelector,
|
||||
DurationSelector,
|
||||
DurationSelectorConfig,
|
||||
FileSelector,
|
||||
FileSelectorConfig,
|
||||
NumberSelector,
|
||||
@@ -227,6 +229,7 @@ from .const import (
|
||||
CONF_LAST_RESET_VALUE_TEMPLATE,
|
||||
CONF_MAX,
|
||||
CONF_MAX_KELVIN,
|
||||
CONF_MESSAGE_EXPIRY_INTERVAL,
|
||||
CONF_MIN,
|
||||
CONF_MIN_KELVIN,
|
||||
CONF_MODE_COMMAND_TEMPLATE,
|
||||
@@ -3721,6 +3724,11 @@ MQTT_DEVICE_PLATFORM_FIELDS = {
|
||||
default=DEFAULT_QOS,
|
||||
section="mqtt_settings",
|
||||
),
|
||||
CONF_MESSAGE_EXPIRY_INTERVAL: PlatformField(
|
||||
selector=DurationSelector(DurationSelectorConfig(enable_day=True)),
|
||||
required=False,
|
||||
section="mqtt_settings",
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -49,6 +49,7 @@ CONF_IMAGE_TOPIC = "image_topic"
|
||||
CONF_JSON_ATTRS_TOPIC = "json_attributes_topic"
|
||||
CONF_JSON_ATTRS_TEMPLATE = "json_attributes_template"
|
||||
CONF_KEEPALIVE = "keepalive"
|
||||
CONF_MESSAGE_EXPIRY_INTERVAL = "message_expiry_interval"
|
||||
CONF_ORIGIN = "origin"
|
||||
CONF_QOS = ATTR_QOS
|
||||
CONF_RETAIN = ATTR_RETAIN
|
||||
|
||||
@@ -17,7 +17,7 @@ from .models import DATA_MQTT, PublishPayloadType
|
||||
STORED_MESSAGES = 10
|
||||
|
||||
|
||||
@dataclass
|
||||
@dataclass(frozen=True, slots=True)
|
||||
class TimestampedPublishMessage:
|
||||
"""MQTT Message."""
|
||||
|
||||
@@ -26,6 +26,8 @@ class TimestampedPublishMessage:
|
||||
qos: int
|
||||
retain: bool
|
||||
timestamp: float
|
||||
encoding: str | None
|
||||
kwargs: dict[str, Any]
|
||||
|
||||
|
||||
def log_message(
|
||||
@@ -35,6 +37,8 @@ def log_message(
|
||||
payload: PublishPayloadType,
|
||||
qos: int,
|
||||
retain: bool,
|
||||
encoding: str | None,
|
||||
**kwargs: Any,
|
||||
) -> None:
|
||||
"""Log an outgoing MQTT message."""
|
||||
entity_info = hass.data[DATA_MQTT].debug_info_entities.setdefault(
|
||||
@@ -45,7 +49,13 @@ def log_message(
|
||||
"messages": deque(maxlen=STORED_MESSAGES),
|
||||
}
|
||||
msg = TimestampedPublishMessage(
|
||||
topic, payload, qos, retain, timestamp=time.monotonic()
|
||||
topic,
|
||||
payload,
|
||||
qos,
|
||||
retain,
|
||||
timestamp=time.monotonic(),
|
||||
encoding=encoding,
|
||||
kwargs=kwargs,
|
||||
)
|
||||
entity_info["transmitted"][topic]["messages"].append(msg)
|
||||
|
||||
|
||||
@@ -84,6 +84,7 @@ from .const import (
|
||||
CONF_JSON_ATTRS_TEMPLATE,
|
||||
CONF_JSON_ATTRS_TOPIC,
|
||||
CONF_MANUFACTURER,
|
||||
CONF_MESSAGE_EXPIRY_INTERVAL,
|
||||
CONF_PAYLOAD_AVAILABLE,
|
||||
CONF_PAYLOAD_NOT_AVAILABLE,
|
||||
CONF_QOS,
|
||||
@@ -94,7 +95,6 @@ from .const import (
|
||||
CONF_SW_VERSION,
|
||||
CONF_TOPIC,
|
||||
CONF_VIA_DEVICE,
|
||||
DEFAULT_ENCODING,
|
||||
DOMAIN,
|
||||
MQTT_CONNECTION_STATE,
|
||||
)
|
||||
@@ -153,6 +153,8 @@ MQTT_ATTRIBUTES_BLOCKED = {
|
||||
"unit_of_measurement",
|
||||
}
|
||||
|
||||
PUBLISH_KWARGS = (CONF_MESSAGE_EXPIRY_INTERVAL,)
|
||||
|
||||
|
||||
@callback
|
||||
def async_handle_schema_error(
|
||||
@@ -1539,36 +1541,20 @@ class MqttEntity(
|
||||
await MqttDiscoveryUpdateMixin.async_will_remove_from_hass(self)
|
||||
debug_info.remove_entity_data(self.hass, self.entity_id)
|
||||
|
||||
async def async_publish(
|
||||
self,
|
||||
topic: str,
|
||||
payload: PublishPayloadType,
|
||||
qos: int = 0,
|
||||
retain: bool = False,
|
||||
encoding: str | None = DEFAULT_ENCODING,
|
||||
) -> None:
|
||||
"""Publish message to an MQTT topic."""
|
||||
log_message(self.hass, self.entity_id, topic, payload, qos, retain)
|
||||
await async_publish(
|
||||
self.hass,
|
||||
topic,
|
||||
payload,
|
||||
qos,
|
||||
retain,
|
||||
encoding,
|
||||
)
|
||||
|
||||
async def async_publish_with_config(
|
||||
self, topic: str, payload: PublishPayloadType
|
||||
) -> None:
|
||||
"""Publish payload to a topic using config."""
|
||||
await self.async_publish(
|
||||
topic,
|
||||
payload,
|
||||
self._config[CONF_QOS],
|
||||
self._config[CONF_RETAIN],
|
||||
self._config[CONF_ENCODING],
|
||||
kwargs: dict[str, Any] = {
|
||||
key: value for key, value in self._config.items() if key in PUBLISH_KWARGS
|
||||
}
|
||||
qos: int = self._config[CONF_QOS]
|
||||
retain: bool = self._config[CONF_RETAIN]
|
||||
encoding: str = self._config[CONF_ENCODING]
|
||||
log_message(
|
||||
self.hass, self.entity_id, topic, payload, qos, retain, encoding, **kwargs
|
||||
)
|
||||
await async_publish(self.hass, topic, payload, qos, retain, encoding, **kwargs)
|
||||
|
||||
@staticmethod
|
||||
@abstractmethod
|
||||
|
||||
@@ -509,10 +509,20 @@ class MqttComponentConfig:
|
||||
discovery_payload: MQTTDiscoveryPayload
|
||||
|
||||
|
||||
class MessageExpiryInterval(TypedDict, total=False):
|
||||
"""Hold the Message Expiry Interval."""
|
||||
|
||||
days: float
|
||||
hours: float
|
||||
minutes: float
|
||||
seconds: float
|
||||
|
||||
|
||||
class DeviceMqttOptions(TypedDict, total=False):
|
||||
"""Hold the shared MQTT specific options for an MQTT device."""
|
||||
|
||||
qos: int
|
||||
message_expiry_interval: MessageExpiryInterval
|
||||
|
||||
|
||||
class MqttDeviceData(TypedDict, total=False):
|
||||
|
||||
@@ -40,6 +40,7 @@ from .const import (
|
||||
CONF_JSON_ATTRS_TEMPLATE,
|
||||
CONF_JSON_ATTRS_TOPIC,
|
||||
CONF_MANUFACTURER,
|
||||
CONF_MESSAGE_EXPIRY_INTERVAL,
|
||||
CONF_ORIGIN,
|
||||
CONF_PAYLOAD_AVAILABLE,
|
||||
CONF_PAYLOAD_NOT_AVAILABLE,
|
||||
@@ -66,6 +67,7 @@ SHARED_OPTIONS = [
|
||||
CONF_AVAILABILITY_TOPIC,
|
||||
CONF_COMMAND_TOPIC,
|
||||
CONF_ENCODING,
|
||||
CONF_MESSAGE_EXPIRY_INTERVAL,
|
||||
CONF_PAYLOAD_AVAILABLE,
|
||||
CONF_PAYLOAD_NOT_AVAILABLE,
|
||||
CONF_STATE_TOPIC,
|
||||
@@ -161,6 +163,14 @@ MQTT_ORIGIN_INFO_SCHEMA = vol.All(
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
def valid_message_expiry_interval(value: Any) -> int:
|
||||
"""Return Message Expiry Interval in seconds."""
|
||||
if isinstance(value, int):
|
||||
return cv.positive_int(value) # type: ignore[no-any-return]
|
||||
return int(cv.positive_time_period_dict(value).total_seconds())
|
||||
|
||||
|
||||
MQTT_ENTITY_COMMON_SCHEMA = _MQTT_AVAILABILITY_SCHEMA.extend(
|
||||
{
|
||||
vol.Optional(CONF_DEVICE): MQTT_ENTITY_DEVICE_INFO_SCHEMA,
|
||||
@@ -172,6 +182,7 @@ MQTT_ENTITY_COMMON_SCHEMA = _MQTT_AVAILABILITY_SCHEMA.extend(
|
||||
vol.Optional(CONF_JSON_ATTRS_TOPIC): valid_subscribe_topic,
|
||||
vol.Optional(CONF_JSON_ATTRS_TEMPLATE): cv.template,
|
||||
vol.Optional(CONF_DEFAULT_ENTITY_ID): cv.string,
|
||||
vol.Optional(CONF_MESSAGE_EXPIRY_INTERVAL): valid_message_expiry_interval,
|
||||
vol.Optional(CONF_UNIQUE_ID): cv.string,
|
||||
}
|
||||
)
|
||||
@@ -203,6 +214,7 @@ DEVICE_DISCOVERY_SCHEMA = _MQTT_AVAILABILITY_SCHEMA.extend(
|
||||
vol.Required(CONF_ORIGIN): MQTT_ORIGIN_INFO_SCHEMA,
|
||||
vol.Optional(CONF_STATE_TOPIC): valid_subscribe_topic,
|
||||
vol.Optional(CONF_COMMAND_TOPIC): valid_publish_topic,
|
||||
vol.Optional(CONF_MESSAGE_EXPIRY_INTERVAL): valid_message_expiry_interval,
|
||||
vol.Optional(CONF_QOS): valid_qos_schema,
|
||||
vol.Optional(CONF_ENCODING): cv.string,
|
||||
}
|
||||
|
||||
@@ -197,9 +197,11 @@
|
||||
},
|
||||
"mqtt_settings": {
|
||||
"data": {
|
||||
"message_expiry_interval": "Message Expiry Interval",
|
||||
"qos": "QoS"
|
||||
},
|
||||
"data_description": {
|
||||
"message_expiry_interval": "Retention time interval for published message.",
|
||||
"qos": "The Quality of Service value the device's entities should use."
|
||||
},
|
||||
"name": "MQTT settings"
|
||||
|
||||
@@ -6,6 +6,7 @@ from homeassistant.components.binary_sensor import (
|
||||
BinarySensorDeviceClass,
|
||||
BinarySensorEntity,
|
||||
)
|
||||
from homeassistant.const import ATTR_ID
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
|
||||
|
||||
@@ -14,7 +15,6 @@ from .const import (
|
||||
ATTR_DESCRIPTION,
|
||||
ATTR_EXPIRES,
|
||||
ATTR_HEADLINE,
|
||||
ATTR_ID,
|
||||
ATTR_RECOMMENDED_ACTIONS,
|
||||
ATTR_SENDER,
|
||||
ATTR_SENT,
|
||||
|
||||
@@ -29,8 +29,6 @@ ATTR_SEVERITY: str = "severity"
|
||||
ATTR_RECOMMENDED_ACTIONS: str = "recommended_actions"
|
||||
ATTR_AFFECTED_AREAS: str = "affected_areas"
|
||||
ATTR_WEB: str = "web"
|
||||
# pylint: disable-next=home-assistant-duplicate-const
|
||||
ATTR_ID: str = "id"
|
||||
ATTR_SENT: str = "sent"
|
||||
ATTR_START: str = "start"
|
||||
ATTR_EXPIRES: str = "expires"
|
||||
|
||||
@@ -37,11 +37,15 @@ class OpenhomeConfigFlow(ConfigFlow, domain=DOMAIN):
|
||||
_LOGGER.debug("async_step_ssdp: Incomplete discovery, ignoring")
|
||||
return self.async_abort(reason="incomplete_discovery")
|
||||
|
||||
_LOGGER.debug(
|
||||
"async_step_ssdp: setting unique id %s", discovery_info.upnp[ATTR_UPNP_UDN]
|
||||
)
|
||||
udn = discovery_info.upnp[ATTR_UPNP_UDN]
|
||||
if isinstance(udn, list):
|
||||
if not udn:
|
||||
return self.async_abort(reason="incomplete_discovery")
|
||||
udn = udn[0]
|
||||
|
||||
await self.async_set_unique_id(discovery_info.upnp[ATTR_UPNP_UDN])
|
||||
_LOGGER.debug("async_step_ssdp: setting unique id %s", udn)
|
||||
|
||||
await self.async_set_unique_id(udn)
|
||||
self._abort_if_unique_id_configured({CONF_HOST: discovery_info.ssdp_location})
|
||||
|
||||
_LOGGER.debug(
|
||||
|
||||
@@ -1,9 +1,7 @@
|
||||
"""The System Bridge integration."""
|
||||
|
||||
import asyncio
|
||||
from dataclasses import asdict
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
from systembridgeconnector.exceptions import (
|
||||
AuthenticationException,
|
||||
@@ -11,71 +9,34 @@ from systembridgeconnector.exceptions import (
|
||||
ConnectionErrorException,
|
||||
DataMissingException,
|
||||
)
|
||||
from systembridgeconnector.models.keyboard_key import KeyboardKey
|
||||
from systembridgeconnector.models.keyboard_text import KeyboardText
|
||||
from systembridgeconnector.models.modules.processes import Process
|
||||
from systembridgeconnector.models.open_path import OpenPath
|
||||
from systembridgeconnector.models.open_url import OpenUrl
|
||||
from systembridgeconnector.version import Version
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.config_entries import ConfigEntry, ConfigEntryState
|
||||
from homeassistant.config_entries import ConfigEntry
|
||||
from homeassistant.const import (
|
||||
CONF_API_KEY,
|
||||
CONF_COMMAND,
|
||||
CONF_ENTITY_ID,
|
||||
CONF_HOST,
|
||||
CONF_ID,
|
||||
CONF_NAME,
|
||||
CONF_PATH,
|
||||
CONF_PORT,
|
||||
CONF_TOKEN,
|
||||
CONF_URL,
|
||||
Platform,
|
||||
)
|
||||
from homeassistant.core import (
|
||||
HomeAssistant,
|
||||
ServiceCall,
|
||||
ServiceResponse,
|
||||
SupportsResponse,
|
||||
)
|
||||
from homeassistant.exceptions import (
|
||||
ConfigEntryAuthFailed,
|
||||
ConfigEntryNotReady,
|
||||
HomeAssistantError,
|
||||
ServiceValidationError,
|
||||
)
|
||||
from homeassistant.helpers import (
|
||||
config_validation as cv,
|
||||
device_registry as dr,
|
||||
discovery,
|
||||
)
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.exceptions import ConfigEntryAuthFailed, ConfigEntryNotReady
|
||||
from homeassistant.helpers import config_validation as cv, discovery
|
||||
from homeassistant.helpers.aiohttp_client import async_get_clientsession
|
||||
from homeassistant.helpers.issue_registry import IssueSeverity, async_create_issue
|
||||
from homeassistant.helpers.typing import ConfigType
|
||||
|
||||
from .config_flow import SystemBridgeConfigFlow
|
||||
from .const import DATA_WAIT_TIMEOUT, DOMAIN, MODULES
|
||||
from .coordinator import SystemBridgeConfigEntry, SystemBridgeDataUpdateCoordinator
|
||||
|
||||
|
||||
def _get_coordinator(
|
||||
hass: HomeAssistant, entry_id: str
|
||||
) -> SystemBridgeDataUpdateCoordinator:
|
||||
"""Return the coordinator for a config entry id."""
|
||||
entry: SystemBridgeConfigEntry | None = hass.config_entries.async_get_entry(
|
||||
entry_id
|
||||
)
|
||||
if entry is None or entry.state is not ConfigEntryState.LOADED:
|
||||
raise ServiceValidationError(
|
||||
translation_domain=DOMAIN,
|
||||
translation_key="device_not_found",
|
||||
translation_placeholders={"device": entry_id},
|
||||
)
|
||||
return entry.runtime_data
|
||||
|
||||
from .services import async_setup_services
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
CONFIG_SCHEMA = cv.config_entry_only_config_schema(DOMAIN)
|
||||
|
||||
PLATFORMS = [
|
||||
Platform.BINARY_SENSOR,
|
||||
Platform.MEDIA_PLAYER,
|
||||
@@ -84,26 +45,12 @@ PLATFORMS = [
|
||||
Platform.UPDATE,
|
||||
]
|
||||
|
||||
CONF_BRIDGE = "bridge"
|
||||
CONF_KEY = "key"
|
||||
CONF_TEXT = "text"
|
||||
|
||||
SERVICE_GET_PROCESS_BY_ID = "get_process_by_id"
|
||||
SERVICE_GET_PROCESSES_BY_NAME = "get_processes_by_name"
|
||||
SERVICE_OPEN_PATH = "open_path"
|
||||
SERVICE_POWER_COMMAND = "power_command"
|
||||
SERVICE_OPEN_URL = "open_url"
|
||||
SERVICE_SEND_KEYPRESS = "send_keypress"
|
||||
SERVICE_SEND_TEXT = "send_text"
|
||||
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
||||
"""Set up the System Bridge services."""
|
||||
|
||||
POWER_COMMAND_MAP = {
|
||||
"hibernate": "power_hibernate",
|
||||
"lock": "power_lock",
|
||||
"logout": "power_logout",
|
||||
"restart": "power_restart",
|
||||
"shutdown": "power_shutdown",
|
||||
"sleep": "power_sleep",
|
||||
}
|
||||
async_setup_services(hass)
|
||||
return True
|
||||
|
||||
|
||||
async def async_setup_entry(
|
||||
@@ -231,219 +178,6 @@ async def async_setup_entry(
|
||||
)
|
||||
)
|
||||
|
||||
if hass.services.has_service(DOMAIN, SERVICE_OPEN_URL):
|
||||
return True
|
||||
|
||||
def valid_device(device: str) -> str:
|
||||
"""Check device is valid."""
|
||||
device_registry = dr.async_get(hass)
|
||||
device_entry = device_registry.async_get(device)
|
||||
if device_entry is not None:
|
||||
try:
|
||||
return next(
|
||||
entry.entry_id
|
||||
for entry in hass.config_entries.async_entries(DOMAIN)
|
||||
if entry.entry_id in device_entry.config_entries
|
||||
)
|
||||
except StopIteration as exception:
|
||||
raise HomeAssistantError(
|
||||
translation_domain=DOMAIN,
|
||||
translation_key="device_not_found",
|
||||
translation_placeholders={"device": device},
|
||||
) from exception
|
||||
raise HomeAssistantError(
|
||||
translation_domain=DOMAIN,
|
||||
translation_key="device_not_found",
|
||||
translation_placeholders={"device": device},
|
||||
)
|
||||
|
||||
async def handle_get_process_by_id(service_call: ServiceCall) -> ServiceResponse:
|
||||
"""Handle the get process by id service call."""
|
||||
_LOGGER.debug("Get process by id: %s", service_call.data)
|
||||
coordinator = _get_coordinator(hass, service_call.data[CONF_BRIDGE])
|
||||
processes: list[Process] = coordinator.data.processes
|
||||
|
||||
# Find process.id from list, raise ServiceValidationError if not found
|
||||
try:
|
||||
return asdict(
|
||||
next(
|
||||
process
|
||||
for process in processes
|
||||
if process.id == service_call.data[CONF_ID]
|
||||
)
|
||||
)
|
||||
except StopIteration as exception:
|
||||
raise ServiceValidationError(
|
||||
translation_domain=DOMAIN,
|
||||
translation_key="process_not_found",
|
||||
translation_placeholders={"id": service_call.data[CONF_ID]},
|
||||
) from exception
|
||||
|
||||
async def handle_get_processes_by_name(
|
||||
service_call: ServiceCall,
|
||||
) -> ServiceResponse:
|
||||
"""Handle the get process by name service call."""
|
||||
_LOGGER.debug("Get process by name: %s", service_call.data)
|
||||
coordinator = _get_coordinator(hass, service_call.data[CONF_BRIDGE])
|
||||
|
||||
# Find processes from list
|
||||
items: list[dict[str, Any]] = [
|
||||
asdict(process)
|
||||
for process in coordinator.data.processes
|
||||
if process.name is not None
|
||||
and service_call.data[CONF_NAME].lower() in process.name.lower()
|
||||
]
|
||||
|
||||
return {
|
||||
"count": len(items),
|
||||
"processes": list(items),
|
||||
}
|
||||
|
||||
async def handle_open_path(service_call: ServiceCall) -> ServiceResponse:
|
||||
"""Handle the open path service call."""
|
||||
_LOGGER.debug("Open path: %s", service_call.data)
|
||||
coordinator = _get_coordinator(hass, service_call.data[CONF_BRIDGE])
|
||||
response = await coordinator.websocket_client.open_path(
|
||||
OpenPath(path=service_call.data[CONF_PATH])
|
||||
)
|
||||
return asdict(response)
|
||||
|
||||
async def handle_power_command(service_call: ServiceCall) -> ServiceResponse:
|
||||
"""Handle the power command service call."""
|
||||
_LOGGER.debug("Power command: %s", service_call.data)
|
||||
coordinator = _get_coordinator(hass, service_call.data[CONF_BRIDGE])
|
||||
response = await getattr(
|
||||
coordinator.websocket_client,
|
||||
POWER_COMMAND_MAP[service_call.data[CONF_COMMAND]],
|
||||
)()
|
||||
return asdict(response)
|
||||
|
||||
async def handle_open_url(service_call: ServiceCall) -> ServiceResponse:
|
||||
"""Handle the open url service call."""
|
||||
_LOGGER.debug("Open URL: %s", service_call.data)
|
||||
coordinator = _get_coordinator(hass, service_call.data[CONF_BRIDGE])
|
||||
response = await coordinator.websocket_client.open_url(
|
||||
OpenUrl(url=service_call.data[CONF_URL])
|
||||
)
|
||||
return asdict(response)
|
||||
|
||||
async def handle_send_keypress(service_call: ServiceCall) -> ServiceResponse:
|
||||
"""Handle the send_keypress service call."""
|
||||
coordinator = _get_coordinator(hass, service_call.data[CONF_BRIDGE])
|
||||
response = await coordinator.websocket_client.keyboard_keypress(
|
||||
KeyboardKey(key=service_call.data[CONF_KEY])
|
||||
)
|
||||
return asdict(response)
|
||||
|
||||
async def handle_send_text(service_call: ServiceCall) -> ServiceResponse:
|
||||
"""Handle the send_keypress service call."""
|
||||
coordinator = _get_coordinator(hass, service_call.data[CONF_BRIDGE])
|
||||
response = await coordinator.websocket_client.keyboard_text(
|
||||
KeyboardText(text=service_call.data[CONF_TEXT])
|
||||
)
|
||||
return asdict(response)
|
||||
|
||||
# pylint: disable-next=home-assistant-service-registered-in-setup-entry
|
||||
hass.services.async_register(
|
||||
DOMAIN,
|
||||
SERVICE_GET_PROCESS_BY_ID,
|
||||
handle_get_process_by_id,
|
||||
schema=vol.Schema(
|
||||
{
|
||||
vol.Required(CONF_BRIDGE): valid_device,
|
||||
vol.Required(CONF_ID): cv.positive_int,
|
||||
},
|
||||
),
|
||||
supports_response=SupportsResponse.ONLY,
|
||||
)
|
||||
|
||||
# pylint: disable-next=home-assistant-service-registered-in-setup-entry
|
||||
hass.services.async_register(
|
||||
DOMAIN,
|
||||
SERVICE_GET_PROCESSES_BY_NAME,
|
||||
handle_get_processes_by_name,
|
||||
schema=vol.Schema(
|
||||
{
|
||||
vol.Required(CONF_BRIDGE): valid_device,
|
||||
vol.Required(CONF_NAME): cv.string,
|
||||
},
|
||||
),
|
||||
supports_response=SupportsResponse.ONLY,
|
||||
)
|
||||
|
||||
# pylint: disable-next=home-assistant-service-registered-in-setup-entry
|
||||
hass.services.async_register(
|
||||
DOMAIN,
|
||||
SERVICE_OPEN_PATH,
|
||||
handle_open_path,
|
||||
schema=vol.Schema(
|
||||
{
|
||||
vol.Required(CONF_BRIDGE): valid_device,
|
||||
vol.Required(CONF_PATH): cv.string,
|
||||
},
|
||||
),
|
||||
supports_response=SupportsResponse.ONLY,
|
||||
)
|
||||
|
||||
# pylint: disable-next=home-assistant-service-registered-in-setup-entry
|
||||
hass.services.async_register(
|
||||
DOMAIN,
|
||||
SERVICE_POWER_COMMAND,
|
||||
handle_power_command,
|
||||
schema=vol.Schema(
|
||||
{
|
||||
vol.Required(CONF_BRIDGE): valid_device,
|
||||
vol.Required(CONF_COMMAND): vol.In(POWER_COMMAND_MAP),
|
||||
},
|
||||
),
|
||||
supports_response=SupportsResponse.ONLY,
|
||||
)
|
||||
|
||||
# pylint: disable-next=home-assistant-service-registered-in-setup-entry
|
||||
hass.services.async_register(
|
||||
DOMAIN,
|
||||
SERVICE_OPEN_URL,
|
||||
handle_open_url,
|
||||
schema=vol.Schema(
|
||||
{
|
||||
vol.Required(CONF_BRIDGE): valid_device,
|
||||
vol.Required(CONF_URL): cv.string,
|
||||
},
|
||||
),
|
||||
supports_response=SupportsResponse.ONLY,
|
||||
)
|
||||
|
||||
# pylint: disable-next=home-assistant-service-registered-in-setup-entry
|
||||
hass.services.async_register(
|
||||
DOMAIN,
|
||||
SERVICE_SEND_KEYPRESS,
|
||||
handle_send_keypress,
|
||||
schema=vol.Schema(
|
||||
{
|
||||
vol.Required(CONF_BRIDGE): valid_device,
|
||||
vol.Required(CONF_KEY): cv.string,
|
||||
},
|
||||
),
|
||||
supports_response=SupportsResponse.ONLY,
|
||||
description_placeholders={
|
||||
"syntax_keys_documentation_url": "http://robotjs.io/docs/syntax#keys"
|
||||
},
|
||||
)
|
||||
|
||||
# pylint: disable-next=home-assistant-service-registered-in-setup-entry
|
||||
hass.services.async_register(
|
||||
DOMAIN,
|
||||
SERVICE_SEND_TEXT,
|
||||
handle_send_text,
|
||||
schema=vol.Schema(
|
||||
{
|
||||
vol.Required(CONF_BRIDGE): valid_device,
|
||||
vol.Required(CONF_TEXT): cv.string,
|
||||
},
|
||||
),
|
||||
supports_response=SupportsResponse.ONLY,
|
||||
)
|
||||
|
||||
# Reload entry when its updated.
|
||||
entry.async_on_unload(entry.add_update_listener(async_reload_entry))
|
||||
|
||||
|
||||
@@ -0,0 +1,269 @@
|
||||
"""Service registration for System Bridge integration."""
|
||||
|
||||
from dataclasses import asdict
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
from systembridgeconnector.models.keyboard_key import KeyboardKey
|
||||
from systembridgeconnector.models.keyboard_text import KeyboardText
|
||||
from systembridgeconnector.models.modules.processes import Process
|
||||
from systembridgeconnector.models.open_path import OpenPath
|
||||
from systembridgeconnector.models.open_url import OpenUrl
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.const import CONF_COMMAND, CONF_ID, CONF_NAME, CONF_PATH, CONF_URL
|
||||
from homeassistant.core import (
|
||||
HomeAssistant,
|
||||
ServiceCall,
|
||||
ServiceResponse,
|
||||
SupportsResponse,
|
||||
callback,
|
||||
)
|
||||
from homeassistant.exceptions import ServiceValidationError
|
||||
from homeassistant.helpers import (
|
||||
config_validation as cv,
|
||||
device_registry as dr,
|
||||
service,
|
||||
)
|
||||
|
||||
from .const import DOMAIN
|
||||
from .coordinator import SystemBridgeConfigEntry, SystemBridgeDataUpdateCoordinator
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
CONF_BRIDGE = "bridge"
|
||||
CONF_KEY = "key"
|
||||
CONF_TEXT = "text"
|
||||
|
||||
POWER_COMMAND_MAP = {
|
||||
"hibernate": "power_hibernate",
|
||||
"lock": "power_lock",
|
||||
"logout": "power_logout",
|
||||
"restart": "power_restart",
|
||||
"shutdown": "power_shutdown",
|
||||
"sleep": "power_sleep",
|
||||
}
|
||||
|
||||
|
||||
@callback
|
||||
def async_setup_services(hass: HomeAssistant) -> None:
|
||||
"""Set up services for System Bridge integration."""
|
||||
|
||||
hass.services.async_register(
|
||||
DOMAIN,
|
||||
"get_process_by_id",
|
||||
handle_get_process_by_id,
|
||||
schema=vol.Schema(
|
||||
{
|
||||
vol.Required(CONF_BRIDGE): cv.string,
|
||||
vol.Required(CONF_ID): cv.positive_int,
|
||||
},
|
||||
),
|
||||
supports_response=SupportsResponse.ONLY,
|
||||
)
|
||||
|
||||
hass.services.async_register(
|
||||
DOMAIN,
|
||||
"get_processes_by_name",
|
||||
handle_get_processes_by_name,
|
||||
schema=vol.Schema(
|
||||
{
|
||||
vol.Required(CONF_BRIDGE): cv.string,
|
||||
vol.Required(CONF_NAME): cv.string,
|
||||
},
|
||||
),
|
||||
supports_response=SupportsResponse.ONLY,
|
||||
)
|
||||
|
||||
hass.services.async_register(
|
||||
DOMAIN,
|
||||
"open_path",
|
||||
handle_open_path,
|
||||
schema=vol.Schema(
|
||||
{
|
||||
vol.Required(CONF_BRIDGE): cv.string,
|
||||
vol.Required(CONF_PATH): cv.string,
|
||||
},
|
||||
),
|
||||
supports_response=SupportsResponse.ONLY,
|
||||
)
|
||||
|
||||
hass.services.async_register(
|
||||
DOMAIN,
|
||||
"power_command",
|
||||
handle_power_command,
|
||||
schema=vol.Schema(
|
||||
{
|
||||
vol.Required(CONF_BRIDGE): cv.string,
|
||||
vol.Required(CONF_COMMAND): vol.In(POWER_COMMAND_MAP),
|
||||
},
|
||||
),
|
||||
supports_response=SupportsResponse.ONLY,
|
||||
)
|
||||
|
||||
hass.services.async_register(
|
||||
DOMAIN,
|
||||
"open_url",
|
||||
handle_open_url,
|
||||
schema=vol.Schema(
|
||||
{
|
||||
vol.Required(CONF_BRIDGE): cv.string,
|
||||
vol.Required(CONF_URL): cv.string,
|
||||
},
|
||||
),
|
||||
supports_response=SupportsResponse.ONLY,
|
||||
)
|
||||
|
||||
hass.services.async_register(
|
||||
DOMAIN,
|
||||
"send_keypress",
|
||||
handle_send_keypress,
|
||||
schema=vol.Schema(
|
||||
{
|
||||
vol.Required(CONF_BRIDGE): cv.string,
|
||||
vol.Required(CONF_KEY): cv.string,
|
||||
},
|
||||
),
|
||||
supports_response=SupportsResponse.ONLY,
|
||||
description_placeholders={
|
||||
"syntax_keys_documentation_url": "https://robotjs.dev/docs/syntax#keys"
|
||||
},
|
||||
)
|
||||
|
||||
hass.services.async_register(
|
||||
DOMAIN,
|
||||
"send_text",
|
||||
handle_send_text,
|
||||
schema=vol.Schema(
|
||||
{
|
||||
vol.Required(CONF_BRIDGE): cv.string,
|
||||
vol.Required(CONF_TEXT): cv.string,
|
||||
},
|
||||
),
|
||||
supports_response=SupportsResponse.ONLY,
|
||||
)
|
||||
|
||||
|
||||
def _get_coordinator(
|
||||
hass: HomeAssistant, device_id: str
|
||||
) -> SystemBridgeDataUpdateCoordinator:
|
||||
"""Return the coordinator for a device id."""
|
||||
|
||||
device_registry = dr.async_get(hass)
|
||||
device_entry = device_registry.async_get(device_id)
|
||||
|
||||
if device_entry is None:
|
||||
raise ServiceValidationError(
|
||||
translation_domain=DOMAIN,
|
||||
translation_key="device_not_found",
|
||||
translation_placeholders={"device": device_id},
|
||||
)
|
||||
try:
|
||||
entry_id = next(
|
||||
entry.entry_id
|
||||
for entry in hass.config_entries.async_entries(DOMAIN)
|
||||
if entry.entry_id in device_entry.config_entries
|
||||
)
|
||||
except StopIteration as e:
|
||||
raise ServiceValidationError(
|
||||
translation_domain=DOMAIN,
|
||||
translation_key="device_not_found",
|
||||
translation_placeholders={"device": device_id},
|
||||
) from e
|
||||
entry: SystemBridgeConfigEntry = service.async_get_config_entry(
|
||||
hass, DOMAIN, entry_id
|
||||
)
|
||||
return entry.runtime_data
|
||||
|
||||
|
||||
async def handle_get_process_by_id(service_call: ServiceCall) -> ServiceResponse:
|
||||
"""Handle the get process by id service call."""
|
||||
_LOGGER.debug("Get process by id: %s", service_call.data)
|
||||
coordinator = _get_coordinator(service_call.hass, service_call.data[CONF_BRIDGE])
|
||||
processes: list[Process] = coordinator.data.processes
|
||||
|
||||
# Find process.id from list, raise ServiceValidationError if not found
|
||||
try:
|
||||
return asdict(
|
||||
next(
|
||||
process
|
||||
for process in processes
|
||||
if process.id == service_call.data[CONF_ID]
|
||||
)
|
||||
)
|
||||
except StopIteration as e:
|
||||
raise ServiceValidationError(
|
||||
translation_domain=DOMAIN,
|
||||
translation_key="process_not_found",
|
||||
translation_placeholders={"id": service_call.data[CONF_ID]},
|
||||
) from e
|
||||
|
||||
|
||||
async def handle_get_processes_by_name(
|
||||
service_call: ServiceCall,
|
||||
) -> ServiceResponse:
|
||||
"""Handle the get process by name service call."""
|
||||
_LOGGER.debug("Get process by name: %s", service_call.data)
|
||||
coordinator = _get_coordinator(service_call.hass, service_call.data[CONF_BRIDGE])
|
||||
|
||||
# Find processes from list
|
||||
items: list[dict[str, Any]] = [
|
||||
asdict(process)
|
||||
for process in coordinator.data.processes
|
||||
if process.name is not None
|
||||
and service_call.data[CONF_NAME].lower() in process.name.lower()
|
||||
]
|
||||
|
||||
return {
|
||||
"count": len(items),
|
||||
"processes": list(items),
|
||||
}
|
||||
|
||||
|
||||
async def handle_open_path(service_call: ServiceCall) -> ServiceResponse:
|
||||
"""Handle the open path service call."""
|
||||
_LOGGER.debug("Open path: %s", service_call.data)
|
||||
coordinator = _get_coordinator(service_call.hass, service_call.data[CONF_BRIDGE])
|
||||
response = await coordinator.websocket_client.open_path(
|
||||
OpenPath(path=service_call.data[CONF_PATH])
|
||||
)
|
||||
return asdict(response)
|
||||
|
||||
|
||||
async def handle_power_command(service_call: ServiceCall) -> ServiceResponse:
|
||||
"""Handle the power command service call."""
|
||||
_LOGGER.debug("Power command: %s", service_call.data)
|
||||
coordinator = _get_coordinator(service_call.hass, service_call.data[CONF_BRIDGE])
|
||||
response = await getattr(
|
||||
coordinator.websocket_client,
|
||||
POWER_COMMAND_MAP[service_call.data[CONF_COMMAND]],
|
||||
)()
|
||||
return asdict(response)
|
||||
|
||||
|
||||
async def handle_open_url(service_call: ServiceCall) -> ServiceResponse:
|
||||
"""Handle the open url service call."""
|
||||
_LOGGER.debug("Open URL: %s", service_call.data)
|
||||
coordinator = _get_coordinator(service_call.hass, service_call.data[CONF_BRIDGE])
|
||||
response = await coordinator.websocket_client.open_url(
|
||||
OpenUrl(url=service_call.data[CONF_URL])
|
||||
)
|
||||
return asdict(response)
|
||||
|
||||
|
||||
async def handle_send_keypress(service_call: ServiceCall) -> ServiceResponse:
|
||||
"""Handle the send_keypress service call."""
|
||||
coordinator = _get_coordinator(service_call.hass, service_call.data[CONF_BRIDGE])
|
||||
response = await coordinator.websocket_client.keyboard_keypress(
|
||||
KeyboardKey(key=service_call.data[CONF_KEY])
|
||||
)
|
||||
return asdict(response)
|
||||
|
||||
|
||||
async def handle_send_text(service_call: ServiceCall) -> ServiceResponse:
|
||||
"""Handle the send_text service call."""
|
||||
coordinator = _get_coordinator(service_call.hass, service_call.data[CONF_BRIDGE])
|
||||
response = await coordinator.websocket_client.keyboard_text(
|
||||
KeyboardText(text=service_call.data[CONF_TEXT])
|
||||
)
|
||||
return asdict(response)
|
||||
@@ -89,3 +89,4 @@ power_command:
|
||||
- "restart"
|
||||
- "shutdown"
|
||||
- "sleep"
|
||||
translation_key: "power_command"
|
||||
|
||||
@@ -178,6 +178,18 @@
|
||||
"title": "System Bridge upgrade required"
|
||||
}
|
||||
},
|
||||
"selector": {
|
||||
"power_command": {
|
||||
"options": {
|
||||
"hibernate": "Hibernate",
|
||||
"lock": "Lock",
|
||||
"logout": "Logout",
|
||||
"restart": "[%key:common::action::restart%]",
|
||||
"shutdown": "Shutdown",
|
||||
"sleep": "Sleep"
|
||||
}
|
||||
}
|
||||
},
|
||||
"services": {
|
||||
"get_process_by_id": {
|
||||
"description": "Gets a process by the ID.",
|
||||
|
||||
@@ -21,4 +21,4 @@ CONF_INSTANCE_ID = "instance_id"
|
||||
# Polling interval (seconds)
|
||||
DEFAULT_SCAN_INTERVAL = 1800
|
||||
|
||||
PLATFORMS: list[Platform] = [Platform.LIGHT, Platform.SWITCH]
|
||||
PLATFORMS: list[Platform] = [Platform.LIGHT, Platform.LOCK, Platform.SWITCH]
|
||||
|
||||
@@ -0,0 +1,47 @@
|
||||
"""Lock platform for Xthings Cloud."""
|
||||
|
||||
from typing import Any
|
||||
|
||||
from homeassistant.components.lock import LockEntity
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
|
||||
|
||||
from .coordinator import XthingsCloudConfigEntry
|
||||
from .entity import XthingsCloudEntity
|
||||
|
||||
|
||||
async def async_setup_entry(
|
||||
hass: HomeAssistant,
|
||||
entry: XthingsCloudConfigEntry,
|
||||
async_add_entities: AddConfigEntryEntitiesCallback,
|
||||
) -> None:
|
||||
"""Set up lock platform."""
|
||||
coordinator = entry.runtime_data
|
||||
entities = [
|
||||
XthingsCloudLock(coordinator, device_id, device_data)
|
||||
for device_id, device_data in coordinator.data.items()
|
||||
if device_data["type"] == "lock"
|
||||
]
|
||||
async_add_entities(entities)
|
||||
|
||||
|
||||
class XthingsCloudLock(XthingsCloudEntity, LockEntity):
|
||||
"""Xthings Cloud lock entity."""
|
||||
|
||||
@property
|
||||
def is_locked(self) -> bool | None:
|
||||
"""Return true if lock is locked."""
|
||||
return self.device_data["status"].get("locked")
|
||||
|
||||
@property
|
||||
def is_jammed(self) -> bool | None:
|
||||
"""Return true if lock is jammed."""
|
||||
return self.device_data["status"].get("jammed")
|
||||
|
||||
async def async_lock(self, **kwargs: Any) -> None:
|
||||
"""Lock the device."""
|
||||
await self.coordinator.client.async_lock_lock(self._device_id)
|
||||
|
||||
async def async_unlock(self, **kwargs: Any) -> None:
|
||||
"""Unlock the device."""
|
||||
await self.coordinator.client.async_lock_unlock(self._device_id)
|
||||
Generated
+1
@@ -59,6 +59,7 @@ FLOWS = {
|
||||
"amberelectric",
|
||||
"ambient_network",
|
||||
"ambient_station",
|
||||
"analytics",
|
||||
"analytics_insights",
|
||||
"android_ip_webcam",
|
||||
"androidtv",
|
||||
|
||||
Generated
+1
-1
@@ -1516,7 +1516,7 @@ lxml==6.0.1
|
||||
matrix-nio==0.25.2
|
||||
|
||||
# homeassistant.components.matter
|
||||
matter-python-client==0.6.0
|
||||
matter-python-client==0.7.1
|
||||
|
||||
# homeassistant.components.maxcube
|
||||
maxcube-api==0.4.3
|
||||
|
||||
@@ -6,15 +6,18 @@ from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
from homeassistant.components.analytics import LABS_SNAPSHOT_FEATURE
|
||||
from homeassistant.components.analytics import CONF_SNAPSHOTS_URL, LABS_SNAPSHOT_FEATURE
|
||||
from homeassistant.components.analytics.const import (
|
||||
BASIC_ENDPOINT_URL,
|
||||
BASIC_ENDPOINT_URL_DEV,
|
||||
DOMAIN,
|
||||
SNAPSHOT_DEFAULT_URL,
|
||||
SNAPSHOT_URL_PATH,
|
||||
STORAGE_KEY,
|
||||
)
|
||||
from homeassistant.components.hassio import HassioNotReadyError
|
||||
from homeassistant.components.labs import async_update_preview_feature
|
||||
from homeassistant.config_entries import ConfigEntryState
|
||||
from homeassistant.const import EVENT_HOMEASSISTANT_STARTED
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.setup import async_setup_component
|
||||
@@ -37,6 +40,175 @@ async def test_setup(hass: HomeAssistant) -> None:
|
||||
assert DOMAIN in hass.data
|
||||
|
||||
|
||||
async def test_setup_with_snapshots_url(
|
||||
hass: HomeAssistant,
|
||||
hass_storage: dict[str, Any],
|
||||
hass_ws_client: WebSocketGenerator,
|
||||
aioclient_mock: AiohttpClientMocker,
|
||||
) -> None:
|
||||
"""Test setup with snapshots_url in YAML config sends snapshots to that URL."""
|
||||
custom_url = "https://custom-snapshot-endpoint.example.com"
|
||||
snapshot_endpoint = custom_url + SNAPSHOT_URL_PATH
|
||||
aioclient_mock.post(snapshot_endpoint, status=200, json={})
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.analytics.analytics._async_snapshot_payload",
|
||||
return_value={"mock": {}},
|
||||
):
|
||||
assert await async_setup_component(hass, "labs", {})
|
||||
assert await async_setup_component(
|
||||
hass, DOMAIN, {DOMAIN: {CONF_SNAPSHOTS_URL: custom_url}}
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
ws_client = await hass_ws_client(hass)
|
||||
await ws_client.send_json_auto_id(
|
||||
{"type": "analytics/preferences", "preferences": {"snapshots": True}}
|
||||
)
|
||||
assert (await ws_client.receive_json())["success"]
|
||||
|
||||
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(hours=25))
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert any(str(call[1]) == snapshot_endpoint for call in aioclient_mock.mock_calls)
|
||||
|
||||
|
||||
async def test_setup_entry_supervisor_not_ready(hass: HomeAssistant) -> None:
|
||||
"""Test that HassioNotReadyError raises ConfigEntryNotReady."""
|
||||
with (
|
||||
patch(
|
||||
"homeassistant.components.analytics.analytics.is_hassio",
|
||||
return_value=True,
|
||||
),
|
||||
patch(
|
||||
"homeassistant.components.hassio.get_supervisor_info",
|
||||
side_effect=HassioNotReadyError,
|
||||
),
|
||||
):
|
||||
assert await async_setup_component(hass, DOMAIN, {DOMAIN: {}})
|
||||
await hass.async_block_till_done()
|
||||
|
||||
entry = hass.config_entries.async_entries(DOMAIN)[0]
|
||||
assert entry.state is ConfigEntryState.SETUP_RETRY
|
||||
|
||||
|
||||
async def test_schedule_starts_and_sends_analytics(
|
||||
hass: HomeAssistant,
|
||||
hass_storage: dict[str, Any],
|
||||
hass_ws_client: WebSocketGenerator,
|
||||
aioclient_mock: AiohttpClientMocker,
|
||||
) -> None:
|
||||
"""Test that the analytics schedule fires and sends analytics after time travel."""
|
||||
aioclient_mock.post(BASIC_ENDPOINT_URL, status=200)
|
||||
aioclient_mock.post(BASIC_ENDPOINT_URL_DEV, status=200)
|
||||
|
||||
assert await async_setup_component(hass, DOMAIN, {DOMAIN: {}})
|
||||
await hass.async_block_till_done()
|
||||
|
||||
ws_client = await hass_ws_client(hass)
|
||||
with patch("homeassistant.components.analytics.analytics.HA_VERSION", MOCK_VERSION):
|
||||
await ws_client.send_json_auto_id(
|
||||
{"type": "analytics/preferences", "preferences": {"base": True}}
|
||||
)
|
||||
assert (await ws_client.receive_json())["success"]
|
||||
|
||||
assert len(aioclient_mock.mock_calls) == 0
|
||||
|
||||
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=901))
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert len(aioclient_mock.mock_calls) == 1
|
||||
|
||||
|
||||
async def test_unload_entry(
|
||||
hass: HomeAssistant,
|
||||
hass_storage: dict[str, Any],
|
||||
hass_ws_client: WebSocketGenerator,
|
||||
aioclient_mock: AiohttpClientMocker,
|
||||
) -> None:
|
||||
"""Test that unloading the config entry stops the analytics schedule."""
|
||||
aioclient_mock.post(BASIC_ENDPOINT_URL, status=200)
|
||||
aioclient_mock.post(BASIC_ENDPOINT_URL_DEV, status=200)
|
||||
|
||||
assert await async_setup_component(hass, DOMAIN, {DOMAIN: {}})
|
||||
await hass.async_block_till_done()
|
||||
|
||||
ws_client = await hass_ws_client(hass)
|
||||
with patch("homeassistant.components.analytics.analytics.HA_VERSION", MOCK_VERSION):
|
||||
await ws_client.send_json_auto_id(
|
||||
{"type": "analytics/preferences", "preferences": {"base": True}}
|
||||
)
|
||||
await ws_client.receive_json()
|
||||
|
||||
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=901))
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert len(aioclient_mock.mock_calls) == 1
|
||||
|
||||
entry = hass.config_entries.async_entries(DOMAIN)[0]
|
||||
await hass.config_entries.async_unload(entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(days=2))
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert len(aioclient_mock.mock_calls) == 1
|
||||
|
||||
|
||||
async def test_websocket_not_loaded(
|
||||
hass: HomeAssistant,
|
||||
hass_ws_client: WebSocketGenerator,
|
||||
) -> None:
|
||||
"""Test websocket returns error when analytics entry failed to load."""
|
||||
with (
|
||||
patch(
|
||||
"homeassistant.components.analytics.analytics.is_hassio",
|
||||
return_value=True,
|
||||
),
|
||||
patch(
|
||||
"homeassistant.components.hassio.get_supervisor_info",
|
||||
side_effect=HassioNotReadyError,
|
||||
),
|
||||
):
|
||||
assert await async_setup_component(hass, DOMAIN, {DOMAIN: {}})
|
||||
await hass.async_block_till_done()
|
||||
|
||||
ws_client = await hass_ws_client(hass)
|
||||
await ws_client.send_json_auto_id({"type": "analytics"})
|
||||
response = await ws_client.receive_json()
|
||||
|
||||
assert not response["success"]
|
||||
assert response["error"]["code"] == "not_found"
|
||||
|
||||
|
||||
async def test_websocket_preferences_not_loaded(
|
||||
hass: HomeAssistant,
|
||||
hass_ws_client: WebSocketGenerator,
|
||||
) -> None:
|
||||
"""Test preferences websocket returns error when analytics entry failed to load."""
|
||||
with (
|
||||
patch(
|
||||
"homeassistant.components.analytics.analytics.is_hassio",
|
||||
return_value=True,
|
||||
),
|
||||
patch(
|
||||
"homeassistant.components.hassio.get_supervisor_info",
|
||||
side_effect=HassioNotReadyError,
|
||||
),
|
||||
):
|
||||
assert await async_setup_component(hass, DOMAIN, {DOMAIN: {}})
|
||||
await hass.async_block_till_done()
|
||||
|
||||
ws_client = await hass_ws_client(hass)
|
||||
await ws_client.send_json_auto_id(
|
||||
{"type": "analytics/preferences", "preferences": {"base": True}}
|
||||
)
|
||||
response = await ws_client.receive_json()
|
||||
|
||||
assert not response["success"]
|
||||
assert response["error"]["code"] == "not_found"
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("mock_snapshot_payload")
|
||||
async def test_labs_feature_toggle(
|
||||
hass: HomeAssistant,
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
from homeassistant.components.homeassistant_connect_zbt2.const import DOMAIN
|
||||
from homeassistant.components.usb import DOMAIN as USB_DOMAIN
|
||||
from homeassistant.config_entries import ConfigEntryState
|
||||
from homeassistant.const import EVENT_HOMEASSISTANT_STARTED
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.setup import async_setup_component
|
||||
@@ -65,3 +66,66 @@ async def test_hardware_info(
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
async def test_hardware_info_ignored_entry(
|
||||
hass: HomeAssistant, hass_ws_client: WebSocketGenerator, addon_store_info
|
||||
) -> None:
|
||||
"""Test ignored discovery entries don't crash hardware info.
|
||||
|
||||
Regression test for https://github.com/home-assistant/core/issues/170270
|
||||
"""
|
||||
assert await async_setup_component(hass, USB_DOMAIN, {})
|
||||
hass.bus.async_fire(EVENT_HOMEASSISTANT_STARTED)
|
||||
|
||||
# Setup the normal entry so the hardware platform is loaded
|
||||
normal_entry = MockConfigEntry(
|
||||
data=CONFIG_ENTRY_DATA,
|
||||
domain=DOMAIN,
|
||||
options={},
|
||||
title="Home Assistant Connect ZBT-2",
|
||||
unique_id="normal_1",
|
||||
version=1,
|
||||
minor_version=1,
|
||||
)
|
||||
normal_entry.add_to_hass(hass)
|
||||
assert await hass.config_entries.async_setup(normal_entry.entry_id)
|
||||
|
||||
# Setup an ignored config entry without USB data
|
||||
ignored_entry = MockConfigEntry(
|
||||
data={},
|
||||
domain=DOMAIN,
|
||||
options={},
|
||||
title="Home Assistant Connect ZBT-2",
|
||||
unique_id="ignored_1",
|
||||
version=1,
|
||||
minor_version=2,
|
||||
source="ignore",
|
||||
)
|
||||
ignored_entry.add_to_hass(hass)
|
||||
assert ignored_entry.state is ConfigEntryState.NOT_LOADED
|
||||
|
||||
client = await hass_ws_client(hass)
|
||||
|
||||
await client.send_json({"id": 1, "type": "hardware/info"})
|
||||
msg = await client.receive_json()
|
||||
|
||||
assert msg["id"] == 1
|
||||
assert msg["success"]
|
||||
assert msg["result"] == {
|
||||
"hardware": [
|
||||
{
|
||||
"board": None,
|
||||
"config_entries": [normal_entry.entry_id],
|
||||
"dongle": {
|
||||
"vid": "303A",
|
||||
"pid": "4001",
|
||||
"serial_number": "80B54EEFAE18",
|
||||
"manufacturer": "Nabu Casa",
|
||||
"description": "ZBT-2",
|
||||
},
|
||||
"name": "Home Assistant Connect ZBT-2",
|
||||
"url": "https://support.nabucasa.com/hc/en-us/categories/24734620813469-Home-Assistant-Connect-ZBT-1",
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
@@ -798,7 +798,7 @@
|
||||
'object_id_base': 'Uptime',
|
||||
'options': dict({
|
||||
}),
|
||||
'original_device_class': <SensorDeviceClass.TIMESTAMP: 'timestamp'>,
|
||||
'original_device_class': <SensorDeviceClass.UPTIME: 'uptime'>,
|
||||
'original_icon': None,
|
||||
'original_name': 'Uptime',
|
||||
'platform': 'homewizard',
|
||||
@@ -813,7 +813,7 @@
|
||||
# name: test_sensors[HWE-BAT-entity_ids10][sensor.device_uptime:state]
|
||||
StateSnapshot({
|
||||
'attributes': ReadOnlyDict({
|
||||
'device_class': 'timestamp',
|
||||
'device_class': 'uptime',
|
||||
'friendly_name': 'Device Uptime',
|
||||
}),
|
||||
'context': <ANY>,
|
||||
|
||||
@@ -116,3 +116,30 @@ async def test_get_triggers(
|
||||
]
|
||||
|
||||
assert triggers == unordered(expected_triggers)
|
||||
|
||||
|
||||
async def test_get_triggers_for_removed_device(
|
||||
hass: HomeAssistant,
|
||||
mock_bridge_v2: Mock,
|
||||
v2_resources_test_data: JsonArrayType,
|
||||
device_registry: dr.DeviceRegistry,
|
||||
) -> None:
|
||||
"""Test triggers for a device removed from the bridge.
|
||||
|
||||
Regression test for https://github.com/home-assistant/core/issues/152937
|
||||
"""
|
||||
await mock_bridge_v2.api.load_test_data(v2_resources_test_data)
|
||||
await setup_platform(
|
||||
hass, mock_bridge_v2, [Platform.BINARY_SENSOR, Platform.SENSOR]
|
||||
)
|
||||
|
||||
# Create a device entry with a Hue ID that doesn't exist on the bridge
|
||||
orphaned_device = device_registry.async_get_or_create(
|
||||
config_entry_id=mock_bridge_v2.config_entry.entry_id,
|
||||
identifiers={(hue.DOMAIN, "non-existent-hue-device-id")},
|
||||
)
|
||||
|
||||
triggers = await async_get_device_automations(
|
||||
hass, DeviceAutomationType.TRIGGER, orphaned_device.id
|
||||
)
|
||||
assert triggers == []
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
"""The tests for the Light component."""
|
||||
|
||||
from typing import Any
|
||||
from unittest.mock import MagicMock, mock_open, patch
|
||||
|
||||
import pytest
|
||||
@@ -1708,42 +1709,50 @@ async def test_light_service_call_color_conversion(hass: HomeAssistant) -> None:
|
||||
assert data == {"brightness": 128, "color_temp_kelvin": 3451}
|
||||
|
||||
|
||||
async def test_light_service_call_color_conversion_named_tuple(
|
||||
@pytest.mark.parametrize(
|
||||
"color_input",
|
||||
[
|
||||
pytest.param(
|
||||
{"color_name": "maroon"},
|
||||
id="color_name",
|
||||
),
|
||||
pytest.param(
|
||||
{"rgb_color": color_util.RGBColor(128, 0, 0)},
|
||||
id="rgb_color_named_tuple",
|
||||
),
|
||||
],
|
||||
)
|
||||
async def test_light_turn_on_rgb_color_is_plain_tuple(
|
||||
hass: HomeAssistant,
|
||||
color_input: dict[str, Any],
|
||||
) -> None:
|
||||
"""Test a named tuple (RGBColor) is handled correctly."""
|
||||
"""Test that rgb_color passed to entity turn_on is always a plain tuple.
|
||||
|
||||
Covers two input paths that both resolve to the same RGB value (128, 0, 0):
|
||||
- color_name: goes through color_name_to_rgb (returns RGBColor NamedTuple),
|
||||
bypassing the service schema vol.Coerce(tuple) coercion.
|
||||
- rgb_color: RGBColor NamedTuple passed directly, converted by the schema.
|
||||
"""
|
||||
entities = [
|
||||
MockLight("Test_hs", STATE_ON),
|
||||
MockLight("Test_rgb", STATE_ON),
|
||||
MockLight("Test_xy", STATE_ON),
|
||||
MockLight("Test_all", STATE_ON),
|
||||
MockLight("Test_rgbw", STATE_ON),
|
||||
MockLight("Test_rgbww", STATE_ON),
|
||||
MockLight("Test_hs", STATE_ON, supported_color_modes={light.ColorMode.HS}),
|
||||
MockLight("Test_rgb", STATE_ON, supported_color_modes={light.ColorMode.RGB}),
|
||||
MockLight("Test_xy", STATE_ON, supported_color_modes={light.ColorMode.XY}),
|
||||
MockLight(
|
||||
"Test_all",
|
||||
STATE_ON,
|
||||
supported_color_modes={
|
||||
light.ColorMode.HS,
|
||||
light.ColorMode.RGB,
|
||||
light.ColorMode.XY,
|
||||
},
|
||||
),
|
||||
MockLight("Test_rgbw", STATE_ON, supported_color_modes={light.ColorMode.RGBW}),
|
||||
MockLight(
|
||||
"Test_rgbww", STATE_ON, supported_color_modes={light.ColorMode.RGBWW}
|
||||
),
|
||||
]
|
||||
setup_test_component_platform(hass, light.DOMAIN, entities)
|
||||
|
||||
entity0 = entities[0]
|
||||
entity0.supported_color_modes = {light.ColorMode.HS}
|
||||
|
||||
entity1 = entities[1]
|
||||
entity1.supported_color_modes = {light.ColorMode.RGB}
|
||||
|
||||
entity2 = entities[2]
|
||||
entity2.supported_color_modes = {light.ColorMode.XY}
|
||||
|
||||
entity3 = entities[3]
|
||||
entity3.supported_color_modes = {
|
||||
light.ColorMode.HS,
|
||||
light.ColorMode.RGB,
|
||||
light.ColorMode.XY,
|
||||
}
|
||||
|
||||
entity4 = entities[4]
|
||||
entity4.supported_color_modes = {light.ColorMode.RGBW}
|
||||
|
||||
entity5 = entities[5]
|
||||
entity5.supported_color_modes = {light.ColorMode.RGBWW}
|
||||
|
||||
assert await async_setup_component(hass, "light", {"light": {"platform": "test"}})
|
||||
await hass.async_block_till_done()
|
||||
|
||||
@@ -1751,30 +1760,25 @@ async def test_light_service_call_color_conversion_named_tuple(
|
||||
"light",
|
||||
"turn_on",
|
||||
{
|
||||
"entity_id": [
|
||||
entity0.entity_id,
|
||||
entity1.entity_id,
|
||||
entity2.entity_id,
|
||||
entity3.entity_id,
|
||||
entity4.entity_id,
|
||||
entity5.entity_id,
|
||||
],
|
||||
"entity_id": [entity.entity_id for entity in entities],
|
||||
"brightness_pct": 25,
|
||||
"rgb_color": color_util.RGBColor(128, 0, 0),
|
||||
**color_input,
|
||||
},
|
||||
blocking=True,
|
||||
)
|
||||
_, data = entity0.last_call("turn_on")
|
||||
_, data = entities[0].last_call("turn_on")
|
||||
assert data == {"brightness": 64, "hs_color": (0.0, 100.0)}
|
||||
_, data = entity1.last_call("turn_on")
|
||||
_, data = entities[1].last_call("turn_on")
|
||||
assert data == {"brightness": 64, "rgb_color": (128, 0, 0)}
|
||||
_, data = entity2.last_call("turn_on")
|
||||
assert type(data["rgb_color"]) is tuple
|
||||
_, data = entities[2].last_call("turn_on")
|
||||
assert data == {"brightness": 64, "xy_color": (0.701, 0.299)}
|
||||
_, data = entity3.last_call("turn_on")
|
||||
_, data = entities[3].last_call("turn_on")
|
||||
assert data == {"brightness": 64, "rgb_color": (128, 0, 0)}
|
||||
_, data = entity4.last_call("turn_on")
|
||||
assert type(data["rgb_color"]) is tuple
|
||||
_, data = entities[4].last_call("turn_on")
|
||||
assert data == {"brightness": 64, "rgbw_color": (128, 0, 0, 0)}
|
||||
_, data = entity5.last_call("turn_on")
|
||||
_, data = entities[5].last_call("turn_on")
|
||||
assert data == {"brightness": 64, "rgbww_color": (128, 0, 0, 0, 0)}
|
||||
|
||||
|
||||
|
||||
@@ -7,7 +7,9 @@
|
||||
"wifi_credentials_set": true,
|
||||
"thread_credentials_set": false,
|
||||
"min_supported_schema_version": 1,
|
||||
"bluetooth_enabled": false
|
||||
"bluetooth_enabled": false,
|
||||
"wifi_ssid": "test_ssid",
|
||||
"ble_proxy_enabled": false
|
||||
},
|
||||
"nodes": [
|
||||
{
|
||||
|
||||
@@ -8,7 +8,9 @@
|
||||
"wifi_credentials_set": true,
|
||||
"thread_credentials_set": false,
|
||||
"min_supported_schema_version": 1,
|
||||
"bluetooth_enabled": false
|
||||
"bluetooth_enabled": false,
|
||||
"wifi_ssid": "**REDACTED**",
|
||||
"ble_proxy_enabled": false
|
||||
},
|
||||
"nodes": [
|
||||
{
|
||||
|
||||
@@ -9,8 +9,12 @@ from matter_server.common.helpers.util import dataclass_from_dict
|
||||
from matter_server.common.models import ServerDiagnostics
|
||||
import pytest
|
||||
|
||||
from homeassistant.components.diagnostics import async_redact_data
|
||||
from homeassistant.components.matter.const import DOMAIN
|
||||
from homeassistant.components.matter.diagnostics import redact_matter_attributes
|
||||
from homeassistant.components.matter.diagnostics import (
|
||||
SERVER_INFO_TO_REDACT,
|
||||
redact_matter_attributes,
|
||||
)
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers import device_registry as dr
|
||||
|
||||
@@ -85,7 +89,7 @@ async def test_device_diagnostics(
|
||||
"""Test the device diagnostics."""
|
||||
system_info_dict = config_entry_diagnostics["info"]
|
||||
device_diagnostics_redacted = {
|
||||
"server_info": system_info_dict,
|
||||
"server_info": async_redact_data(system_info_dict, SERVER_INFO_TO_REDACT),
|
||||
"node": redact_matter_attributes(device_diagnostics),
|
||||
}
|
||||
server_diagnostics_response = {
|
||||
|
||||
@@ -749,7 +749,18 @@ MOCK_SUBENTRY_DEVICE_DATA = {
|
||||
}
|
||||
|
||||
MOCK_NOTIFY_SUBENTRY_DATA_MULTI = {
|
||||
"device": MOCK_SUBENTRY_DEVICE_DATA | {"mqtt_settings": {"qos": 2}},
|
||||
"device": MOCK_SUBENTRY_DEVICE_DATA
|
||||
| {
|
||||
"mqtt_settings": {
|
||||
"qos": 2.0,
|
||||
"message_expiry_interval": {
|
||||
"days": 0,
|
||||
"hours": 0,
|
||||
"minutes": 1,
|
||||
"seconds": 30,
|
||||
},
|
||||
}
|
||||
},
|
||||
"components": MOCK_SUBENTRY_NOTIFY_COMPONENT1 | MOCK_SUBENTRY_NOTIFY_COMPONENT2,
|
||||
} | MOCK_SUBENTRY_AVAILABILITY_DATA
|
||||
|
||||
@@ -882,7 +893,18 @@ MOCK_SUBENTRY_DATA_BAD_COMPONENT_SCHEMA = {
|
||||
"components": MOCK_SUBENTRY_NOTIFY_BAD_SCHEMA,
|
||||
}
|
||||
MOCK_SUBENTRY_DATA_SET_MIX = {
|
||||
"device": MOCK_SUBENTRY_DEVICE_DATA | {"mqtt_settings": {"qos": 0}},
|
||||
"device": MOCK_SUBENTRY_DEVICE_DATA
|
||||
| {
|
||||
"mqtt_settings": {
|
||||
"qos": 0,
|
||||
"message_expiry_interval": {
|
||||
"days": 0,
|
||||
"hours": 0,
|
||||
"minutes": 1,
|
||||
"seconds": 30,
|
||||
},
|
||||
}
|
||||
},
|
||||
"components": MOCK_SUBENTRY_NOTIFY_COMPONENT1
|
||||
| MOCK_SUBENTRY_NOTIFY_COMPONENT2
|
||||
| MOCK_SUBENTRY_LIGHT_BASIC_KELVIN_COMPONENT
|
||||
|
||||
@@ -88,6 +88,66 @@ async def test_sending_mqtt_commands(
|
||||
assert state.state == "2021-11-08T13:31:44+00:00"
|
||||
|
||||
|
||||
@pytest.mark.freeze_time("2021-11-08 13:31:44+00:00")
|
||||
@pytest.mark.parametrize(
|
||||
"hass_config",
|
||||
[
|
||||
{
|
||||
mqtt.DOMAIN: {
|
||||
button.DOMAIN: {
|
||||
"command_topic": "command-topic",
|
||||
"name": "test",
|
||||
"default_entity_id": "button.test_button",
|
||||
"payload_press": "beer press",
|
||||
"qos": "2",
|
||||
"message_expiry_interval": {
|
||||
"days": 0,
|
||||
"hours": 0,
|
||||
"minutes": 1,
|
||||
"seconds": 30,
|
||||
},
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
mqtt.DOMAIN: {
|
||||
button.DOMAIN: {
|
||||
"command_topic": "command-topic",
|
||||
"name": "test",
|
||||
"default_entity_id": "button.test_button",
|
||||
"payload_press": "beer press",
|
||||
"qos": "2",
|
||||
"message_expiry_interval": 90,
|
||||
}
|
||||
}
|
||||
},
|
||||
],
|
||||
)
|
||||
async def test_sending_mqtt_commands_with_message_expiry_interval(
|
||||
hass: HomeAssistant, mqtt_mock_entry: MqttMockHAClientGenerator
|
||||
) -> None:
|
||||
"""Test the sending MQTT command with message expiry interval."""
|
||||
mqtt_mock = await mqtt_mock_entry()
|
||||
|
||||
state = hass.states.get("button.test_button")
|
||||
assert state.state == STATE_UNKNOWN
|
||||
assert state.attributes.get(ATTR_FRIENDLY_NAME) == "test"
|
||||
|
||||
await hass.services.async_call(
|
||||
button.DOMAIN,
|
||||
button.SERVICE_PRESS,
|
||||
{ATTR_ENTITY_ID: "button.test_button"},
|
||||
blocking=True,
|
||||
)
|
||||
|
||||
mqtt_mock.async_publish.assert_called_once_with(
|
||||
"command-topic", "beer press", 2, False, message_expiry_interval=90
|
||||
)
|
||||
mqtt_mock.async_publish.reset_mock()
|
||||
state = hass.states.get("button.test_button")
|
||||
assert state.state == "2021-11-08T13:31:44+00:00"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"hass_config",
|
||||
[
|
||||
|
||||
@@ -1502,12 +1502,13 @@ async def test_publish_error(
|
||||
|
||||
async def test_subscribe_error(
|
||||
hass: HomeAssistant,
|
||||
setup_with_birth_msg_client_mock: MqttMockPahoClient,
|
||||
mqtt_mock_entry: MqttMockHAClientGenerator,
|
||||
mqtt_client_mock: MqttMockPahoClient,
|
||||
record_calls: MessageCallbackType,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
"""Test publish error."""
|
||||
mqtt_client_mock = setup_with_birth_msg_client_mock
|
||||
await mqtt_mock_entry()
|
||||
mqtt_client_mock.reset_mock()
|
||||
# simulate client is not connected error before subscribing
|
||||
mqtt_client_mock.subscribe.side_effect = lambda *args, **kwargs: (4, None)
|
||||
|
||||
@@ -5196,7 +5196,14 @@ async def test_subentry_reconfigure_update_device_properties(
|
||||
.schema["mqtt_settings"]
|
||||
.schema.schema.items()
|
||||
}
|
||||
assert mqtt_settings_key_descriptions == {"qos": {"suggested_value": 2}}
|
||||
assert mqtt_settings_key_descriptions == {
|
||||
"qos": {
|
||||
"suggested_value": 2,
|
||||
},
|
||||
"message_expiry_interval": {
|
||||
"suggested_value": {"days": 0, "hours": 0, "minutes": 1, "seconds": 30}
|
||||
},
|
||||
}
|
||||
assert result["data_schema"].schema["mqtt_settings"].options == {"collapsed": False}
|
||||
|
||||
# Update the device details
|
||||
@@ -5209,7 +5216,15 @@ async def test_subentry_reconfigure_update_device_properties(
|
||||
"model_id": "bn003",
|
||||
"manufacturer": "Beer Masters",
|
||||
"configuration_url": "https://example.com",
|
||||
"mqtt_settings": {"qos": 1},
|
||||
"mqtt_settings": {
|
||||
"qos": 1,
|
||||
"message_expiry_interval": {
|
||||
"days": 0,
|
||||
"hours": 0,
|
||||
"minutes": 0,
|
||||
"seconds": 30,
|
||||
},
|
||||
},
|
||||
},
|
||||
)
|
||||
assert result["type"] is FlowResultType.MENU
|
||||
@@ -5232,6 +5247,12 @@ async def test_subentry_reconfigure_update_device_properties(
|
||||
assert device["sw_version"] == "1.1"
|
||||
assert device["manufacturer"] == "Beer Masters"
|
||||
assert device["mqtt_settings"]["qos"] == 1
|
||||
assert device["mqtt_settings"]["message_expiry_interval"] == {
|
||||
"days": 0,
|
||||
"hours": 0,
|
||||
"minutes": 0,
|
||||
"seconds": 30,
|
||||
}
|
||||
assert "qos" not in device
|
||||
|
||||
|
||||
|
||||
@@ -116,3 +116,54 @@ async def test_host_updated(hass: HomeAssistant) -> None:
|
||||
assert result["reason"] == "already_configured"
|
||||
|
||||
assert entry.data[CONF_HOST] == MOCK_SSDP_LOCATION
|
||||
|
||||
|
||||
async def test_ssdp_udn_as_list(hass: HomeAssistant) -> None:
|
||||
"""Test SSDP discovery when UDN is a list instead of a string.
|
||||
|
||||
Regression test for https://github.com/home-assistant/core/issues/171837
|
||||
"""
|
||||
list_udn_discovery = SsdpServiceInfo(
|
||||
ssdp_usn="usn",
|
||||
ssdp_st="st",
|
||||
ssdp_location=MOCK_SSDP_LOCATION,
|
||||
upnp={
|
||||
ATTR_UPNP_FRIENDLY_NAME: MOCK_FRIENDLY_NAME,
|
||||
ATTR_UPNP_UDN: [MOCK_UDN, "uuid:other"],
|
||||
},
|
||||
)
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN,
|
||||
context={CONF_SOURCE: SOURCE_SSDP},
|
||||
data=list_udn_discovery,
|
||||
)
|
||||
|
||||
assert result["type"] is FlowResultType.FORM
|
||||
assert result["step_id"] == "confirm"
|
||||
assert result["description_placeholders"] == {CONF_NAME: MOCK_FRIENDLY_NAME}
|
||||
|
||||
result2 = await hass.config_entries.flow.async_configure(result["flow_id"], {})
|
||||
assert result2["type"] is FlowResultType.CREATE_ENTRY
|
||||
assert result2["title"] == MOCK_FRIENDLY_NAME
|
||||
assert result2["data"] == {CONF_HOST: MOCK_SSDP_LOCATION}
|
||||
|
||||
|
||||
async def test_ssdp_udn_as_empty_list(hass: HomeAssistant) -> None:
|
||||
"""Test SSDP discovery when UDN is an empty list."""
|
||||
empty_udn_discovery = SsdpServiceInfo(
|
||||
ssdp_usn="usn",
|
||||
ssdp_st="st",
|
||||
ssdp_location=MOCK_SSDP_LOCATION,
|
||||
upnp={
|
||||
ATTR_UPNP_FRIENDLY_NAME: MOCK_FRIENDLY_NAME,
|
||||
ATTR_UPNP_UDN: [],
|
||||
},
|
||||
)
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN,
|
||||
context={CONF_SOURCE: SOURCE_SSDP},
|
||||
data=empty_udn_discovery,
|
||||
)
|
||||
|
||||
assert result["type"] is FlowResultType.ABORT
|
||||
assert result["reason"] == "incomplete_discovery"
|
||||
|
||||
@@ -126,6 +126,36 @@ def mock_websocket_client(
|
||||
message="Data listener registered",
|
||||
data={EventKey.MODULES: register_data_listener_model.modules},
|
||||
)
|
||||
websocket_client.open_url.return_value = Response(
|
||||
id=FIXTURE_REQUEST_ID,
|
||||
type=EventType.OPENED,
|
||||
message="Opened url",
|
||||
data={"url": "https://example.com"},
|
||||
)
|
||||
websocket_client.open_path.return_value = Response(
|
||||
id=FIXTURE_REQUEST_ID,
|
||||
type=EventType.OPENED,
|
||||
message="Opened file",
|
||||
data={"path": "/home/user/documents"},
|
||||
)
|
||||
websocket_client.power_shutdown.return_value = Response(
|
||||
id=FIXTURE_REQUEST_ID,
|
||||
type=EventType.POWER_SHUTDOWN,
|
||||
message="Shutdown",
|
||||
data={},
|
||||
)
|
||||
websocket_client.keyboard_keypress.return_value = Response(
|
||||
id=FIXTURE_REQUEST_ID,
|
||||
type=EventType.KEYBOARD_KEY_PRESSED,
|
||||
message="Keyboard key pressed",
|
||||
data={"key": "backspace"},
|
||||
)
|
||||
websocket_client.keyboard_text.return_value = Response(
|
||||
id=FIXTURE_REQUEST_ID,
|
||||
type=EventType.KEYBOARD_TEXT_SENT,
|
||||
message="Keyboard text sent",
|
||||
data={"text": "Hello world"},
|
||||
)
|
||||
# Trigger callback when listener is registered
|
||||
websocket_client.listen.side_effect = mock_data_listener
|
||||
|
||||
|
||||
@@ -0,0 +1,91 @@
|
||||
# serializer version: 1
|
||||
# name: test_get_process_services[get_process_by_id]
|
||||
dict({
|
||||
'cpu_usage': 12.3,
|
||||
'created': 12.3,
|
||||
'id': 1234,
|
||||
'memory_usage': 12.3,
|
||||
'name': 'name',
|
||||
'path': '/path',
|
||||
'status': 'running',
|
||||
'username': 'username',
|
||||
'working_directory': '/working/directory',
|
||||
})
|
||||
# ---
|
||||
# name: test_get_process_services[get_processes_by_name]
|
||||
dict({
|
||||
'count': 1,
|
||||
'processes': list([
|
||||
dict({
|
||||
'cpu_usage': 12.3,
|
||||
'created': 12.3,
|
||||
'id': 1234,
|
||||
'memory_usage': 12.3,
|
||||
'name': 'name',
|
||||
'path': '/path',
|
||||
'status': 'running',
|
||||
'username': 'username',
|
||||
'working_directory': '/working/directory',
|
||||
}),
|
||||
]),
|
||||
})
|
||||
# ---
|
||||
# name: test_services[open_path]
|
||||
dict({
|
||||
'data': dict({
|
||||
'path': '/home/user/documents',
|
||||
}),
|
||||
'id': 'test',
|
||||
'message': 'Opened file',
|
||||
'module': None,
|
||||
'subtype': None,
|
||||
'type': <EventType.OPENED: 'OPENED'>,
|
||||
})
|
||||
# ---
|
||||
# name: test_services[open_url]
|
||||
dict({
|
||||
'data': dict({
|
||||
'url': 'https://example.com',
|
||||
}),
|
||||
'id': 'test',
|
||||
'message': 'Opened url',
|
||||
'module': None,
|
||||
'subtype': None,
|
||||
'type': <EventType.OPENED: 'OPENED'>,
|
||||
})
|
||||
# ---
|
||||
# name: test_services[power_command_shutdown]
|
||||
dict({
|
||||
'data': dict({
|
||||
}),
|
||||
'id': 'test',
|
||||
'message': 'Shutdown',
|
||||
'module': None,
|
||||
'subtype': None,
|
||||
'type': <EventType.POWER_SHUTDOWN: 'POWER_SHUTDOWN'>,
|
||||
})
|
||||
# ---
|
||||
# name: test_services[send_keypress]
|
||||
dict({
|
||||
'data': dict({
|
||||
'key': 'backspace',
|
||||
}),
|
||||
'id': 'test',
|
||||
'message': 'Keyboard key pressed',
|
||||
'module': None,
|
||||
'subtype': None,
|
||||
'type': <EventType.KEYBOARD_KEY_PRESSED: 'KEYBOARD_KEY_PRESSED'>,
|
||||
})
|
||||
# ---
|
||||
# name: test_services[send_text]
|
||||
dict({
|
||||
'data': dict({
|
||||
'text': 'Hello world',
|
||||
}),
|
||||
'id': 'test',
|
||||
'message': 'Keyboard text sent',
|
||||
'module': None,
|
||||
'subtype': None,
|
||||
'type': <EventType.KEYBOARD_TEXT_SENT: 'KEYBOARD_TEXT_SENT'>,
|
||||
})
|
||||
# ---
|
||||
@@ -0,0 +1,155 @@
|
||||
"""Tests for System Bridge actions."""
|
||||
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
from syrupy.assertion import SnapshotAssertion
|
||||
from systembridgeconnector.models.keyboard_key import KeyboardKey
|
||||
from systembridgeconnector.models.keyboard_text import KeyboardText
|
||||
from systembridgeconnector.models.open_path import OpenPath
|
||||
from systembridgeconnector.models.open_url import OpenUrl
|
||||
|
||||
from homeassistant.components.system_bridge.const import DOMAIN
|
||||
from homeassistant.components.system_bridge.services import (
|
||||
CONF_BRIDGE,
|
||||
CONF_KEY,
|
||||
CONF_TEXT,
|
||||
)
|
||||
from homeassistant.config_entries import ConfigEntryState
|
||||
from homeassistant.const import CONF_COMMAND, CONF_ID, CONF_NAME, CONF_PATH, CONF_URL
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers import device_registry as dr
|
||||
|
||||
from . import FIXTURE_UUID
|
||||
|
||||
from tests.common import AsyncMock, MockConfigEntry
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("service", "service_data", "call_method", "call_args"),
|
||||
[
|
||||
(
|
||||
"open_path",
|
||||
{CONF_PATH: "/home/user/documents"},
|
||||
"open_path",
|
||||
[OpenPath(path="/home/user/documents")],
|
||||
),
|
||||
(
|
||||
"open_url",
|
||||
{CONF_URL: "https://example.com"},
|
||||
"open_url",
|
||||
[OpenUrl(url="https://example.com")],
|
||||
),
|
||||
(
|
||||
"power_command",
|
||||
{CONF_COMMAND: "shutdown"},
|
||||
"power_shutdown",
|
||||
[],
|
||||
),
|
||||
(
|
||||
"send_keypress",
|
||||
{CONF_KEY: "backspace"},
|
||||
"keyboard_keypress",
|
||||
[KeyboardKey(key="backspace")],
|
||||
),
|
||||
(
|
||||
"send_text",
|
||||
{CONF_TEXT: "Hello world"},
|
||||
"keyboard_text",
|
||||
[KeyboardText(text="Hello world")],
|
||||
),
|
||||
],
|
||||
ids=[
|
||||
"open_path",
|
||||
"open_url",
|
||||
"power_command_shutdown",
|
||||
"send_keypress",
|
||||
"send_text",
|
||||
],
|
||||
)
|
||||
@pytest.mark.usefixtures("mock_version")
|
||||
async def test_services(
|
||||
hass: HomeAssistant,
|
||||
mock_config_entry: MockConfigEntry,
|
||||
mock_websocket_client: AsyncMock,
|
||||
snapshot: SnapshotAssertion,
|
||||
device_registry: dr.DeviceRegistry,
|
||||
service: str,
|
||||
service_data: dict[str, Any],
|
||||
call_method: str,
|
||||
call_args: list[Any],
|
||||
) -> None:
|
||||
"""Test System Bridge service action calls."""
|
||||
mock_config_entry.add_to_hass(hass)
|
||||
await hass.config_entries.async_setup(mock_config_entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert mock_config_entry.state is ConfigEntryState.LOADED
|
||||
|
||||
device_entry = device_registry.async_get_device(
|
||||
identifiers={(DOMAIN, FIXTURE_UUID)}
|
||||
)
|
||||
assert device_entry
|
||||
|
||||
resp = await hass.services.async_call(
|
||||
DOMAIN,
|
||||
service,
|
||||
{
|
||||
CONF_BRIDGE: device_entry.id,
|
||||
**service_data,
|
||||
},
|
||||
blocking=True,
|
||||
return_response=True,
|
||||
)
|
||||
|
||||
getattr(mock_websocket_client, call_method).assert_awaited_once_with(*call_args)
|
||||
assert resp == snapshot
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("service", "service_data"),
|
||||
[
|
||||
(
|
||||
"get_process_by_id",
|
||||
{CONF_ID: 1234},
|
||||
),
|
||||
(
|
||||
"get_processes_by_name",
|
||||
{CONF_NAME: "name"},
|
||||
),
|
||||
],
|
||||
ids=["get_process_by_id", "get_processes_by_name"],
|
||||
)
|
||||
@pytest.mark.usefixtures("mock_version", "mock_websocket_client")
|
||||
async def test_get_process_services(
|
||||
hass: HomeAssistant,
|
||||
mock_config_entry: MockConfigEntry,
|
||||
snapshot: SnapshotAssertion,
|
||||
device_registry: dr.DeviceRegistry,
|
||||
service: str,
|
||||
service_data: dict[str, Any],
|
||||
) -> None:
|
||||
"""Test System Bridge get process service action calls."""
|
||||
mock_config_entry.add_to_hass(hass)
|
||||
await hass.config_entries.async_setup(mock_config_entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert mock_config_entry.state is ConfigEntryState.LOADED
|
||||
|
||||
device_entry = device_registry.async_get_device(
|
||||
identifiers={(DOMAIN, FIXTURE_UUID)}
|
||||
)
|
||||
assert device_entry
|
||||
|
||||
resp = await hass.services.async_call(
|
||||
DOMAIN,
|
||||
service,
|
||||
{
|
||||
CONF_BRIDGE: device_entry.id,
|
||||
**service_data,
|
||||
},
|
||||
blocking=True,
|
||||
return_response=True,
|
||||
)
|
||||
|
||||
assert resp == snapshot
|
||||
@@ -50,6 +50,7 @@ def device_fixtures() -> list[str]:
|
||||
"XT-LT200",
|
||||
"XT-PL50",
|
||||
"XT-PL100",
|
||||
"XT-LK50",
|
||||
]
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,13 @@
|
||||
{
|
||||
"id": "dev_lock_001",
|
||||
"name": "Front Door Lock",
|
||||
"type": "lock",
|
||||
"model": "XT-LK50",
|
||||
"version": "1.0.0",
|
||||
"online": true,
|
||||
"status": {
|
||||
"locked": true,
|
||||
"jammed": false,
|
||||
"battery": 85
|
||||
}
|
||||
}
|
||||
@@ -154,3 +154,34 @@
|
||||
'via_device_id': None,
|
||||
})
|
||||
# ---
|
||||
# name: test_devices[XT-LK50]
|
||||
DeviceRegistryEntrySnapshot({
|
||||
'area_id': None,
|
||||
'config_entries': <ANY>,
|
||||
'config_entries_subentries': <ANY>,
|
||||
'configuration_url': None,
|
||||
'connections': set({
|
||||
}),
|
||||
'disabled_by': None,
|
||||
'entry_type': None,
|
||||
'hw_version': None,
|
||||
'id': <ANY>,
|
||||
'identifiers': set({
|
||||
tuple(
|
||||
'xthings_cloud',
|
||||
'dev_lock_001',
|
||||
),
|
||||
}),
|
||||
'labels': set({
|
||||
}),
|
||||
'manufacturer': 'Xthings',
|
||||
'model': 'XT-LK50',
|
||||
'model_id': None,
|
||||
'name': 'Front Door Lock',
|
||||
'name_by_user': None,
|
||||
'primary_config_entry': <ANY>,
|
||||
'serial_number': None,
|
||||
'sw_version': '1.0.0',
|
||||
'via_device_id': None,
|
||||
})
|
||||
# ---
|
||||
|
||||
@@ -0,0 +1,52 @@
|
||||
# serializer version: 1
|
||||
# name: test_locks[lock.front_door_lock-entry]
|
||||
EntityRegistryEntrySnapshot({
|
||||
'aliases': list([
|
||||
None,
|
||||
]),
|
||||
'area_id': None,
|
||||
'capabilities': None,
|
||||
'config_entry_id': <ANY>,
|
||||
'config_subentry_id': <ANY>,
|
||||
'device_class': None,
|
||||
'device_id': <ANY>,
|
||||
'disabled_by': None,
|
||||
'domain': 'lock',
|
||||
'entity_category': None,
|
||||
'entity_id': 'lock.front_door_lock',
|
||||
'has_entity_name': True,
|
||||
'hidden_by': None,
|
||||
'icon': None,
|
||||
'id': <ANY>,
|
||||
'labels': set({
|
||||
}),
|
||||
'name': None,
|
||||
'object_id_base': None,
|
||||
'options': dict({
|
||||
}),
|
||||
'original_device_class': None,
|
||||
'original_icon': None,
|
||||
'original_name': None,
|
||||
'platform': 'xthings_cloud',
|
||||
'previous_unique_id': None,
|
||||
'suggested_object_id': None,
|
||||
'supported_features': 0,
|
||||
'translation_key': None,
|
||||
'unique_id': 'dev_lock_001',
|
||||
'unit_of_measurement': None,
|
||||
})
|
||||
# ---
|
||||
# name: test_locks[lock.front_door_lock-state]
|
||||
StateSnapshot({
|
||||
'attributes': ReadOnlyDict({
|
||||
'friendly_name': 'Front Door Lock',
|
||||
'supported_features': <LockEntityFeature: 0>,
|
||||
}),
|
||||
'context': <ANY>,
|
||||
'entity_id': 'lock.front_door_lock',
|
||||
'last_changed': <ANY>,
|
||||
'last_reported': <ANY>,
|
||||
'last_updated': <ANY>,
|
||||
'state': 'locked',
|
||||
})
|
||||
# ---
|
||||
@@ -27,9 +27,10 @@ from .const import (
|
||||
from tests.common import MockConfigEntry
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("mock_setup_entry")
|
||||
async def test_user_flow_success(
|
||||
hass: HomeAssistant, mock_api_client: AsyncMock
|
||||
hass: HomeAssistant,
|
||||
mock_setup_entry: AsyncMock,
|
||||
mock_api_client: AsyncMock,
|
||||
) -> None:
|
||||
"""Test successful user login flow."""
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
@@ -61,9 +62,9 @@ async def test_user_flow_success(
|
||||
(RuntimeError("unexpected"), "unknown"),
|
||||
],
|
||||
)
|
||||
@pytest.mark.usefixtures("mock_setup_entry")
|
||||
async def test_user_flow_error_and_recover(
|
||||
hass: HomeAssistant,
|
||||
mock_setup_entry: AsyncMock,
|
||||
mock_api_client: AsyncMock,
|
||||
side_effect: Exception,
|
||||
expected_error: str,
|
||||
@@ -90,9 +91,11 @@ async def test_user_flow_error_and_recover(
|
||||
assert result["type"] is FlowResultType.CREATE_ENTRY
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("mock_setup_entry")
|
||||
async def test_user_flow_already_configured(
|
||||
hass: HomeAssistant, mock_api_client: AsyncMock, mock_config_entry: MockConfigEntry
|
||||
hass: HomeAssistant,
|
||||
mock_setup_entry: AsyncMock,
|
||||
mock_api_client: AsyncMock,
|
||||
mock_config_entry: MockConfigEntry,
|
||||
) -> None:
|
||||
"""Test user flow aborts if same account already configured."""
|
||||
mock_config_entry.add_to_hass(hass)
|
||||
|
||||
@@ -0,0 +1,105 @@
|
||||
"""Tests for Xthings Cloud lock platform."""
|
||||
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
import pytest
|
||||
from syrupy.assertion import SnapshotAssertion
|
||||
|
||||
from homeassistant.components.lock import (
|
||||
DOMAIN as LOCK_DOMAIN,
|
||||
SERVICE_LOCK,
|
||||
SERVICE_UNLOCK,
|
||||
)
|
||||
from homeassistant.const import ATTR_ENTITY_ID, STATE_UNAVAILABLE, Platform
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers import entity_registry as er
|
||||
|
||||
from . import get_device_by_id, setup_integration
|
||||
|
||||
from tests.common import MockConfigEntry, snapshot_platform
|
||||
|
||||
|
||||
async def test_locks(
|
||||
hass: HomeAssistant,
|
||||
mock_config_entry: MockConfigEntry,
|
||||
mock_api_client: AsyncMock,
|
||||
entity_registry: er.EntityRegistry,
|
||||
snapshot: SnapshotAssertion,
|
||||
) -> None:
|
||||
"""Test lock entities are created correctly."""
|
||||
with patch("homeassistant.components.xthings_cloud.PLATFORMS", [Platform.LOCK]):
|
||||
await setup_integration(hass, mock_config_entry)
|
||||
|
||||
await snapshot_platform(
|
||||
hass, entity_registry, snapshot, mock_config_entry.entry_id
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("service", "method"),
|
||||
[
|
||||
(SERVICE_LOCK, "async_lock_lock"),
|
||||
(SERVICE_UNLOCK, "async_lock_unlock"),
|
||||
],
|
||||
)
|
||||
async def test_lock_lock_unlock(
|
||||
hass: HomeAssistant,
|
||||
mock_config_entry: MockConfigEntry,
|
||||
mock_api_client: AsyncMock,
|
||||
service: str,
|
||||
method: str,
|
||||
) -> None:
|
||||
"""Test locking and unlocking a lock."""
|
||||
with patch("homeassistant.components.xthings_cloud.PLATFORMS", [Platform.LOCK]):
|
||||
await setup_integration(hass, mock_config_entry)
|
||||
|
||||
await hass.services.async_call(
|
||||
LOCK_DOMAIN,
|
||||
service,
|
||||
{ATTR_ENTITY_ID: "lock.front_door_lock"},
|
||||
blocking=True,
|
||||
)
|
||||
getattr(mock_api_client, method).assert_called_once_with("dev_lock_001")
|
||||
|
||||
|
||||
async def test_lock_unavailable_when_offline(
|
||||
hass: HomeAssistant,
|
||||
mock_config_entry: MockConfigEntry,
|
||||
mock_api_client: AsyncMock,
|
||||
) -> None:
|
||||
"""Test lock shows unavailable when device is offline."""
|
||||
get_device_by_id(mock_api_client, "dev_lock_001")["online"] = False
|
||||
with patch("homeassistant.components.xthings_cloud.PLATFORMS", [Platform.LOCK]):
|
||||
await setup_integration(hass, mock_config_entry)
|
||||
|
||||
state = hass.states.get("lock.front_door_lock")
|
||||
assert state is not None
|
||||
assert state.state == STATE_UNAVAILABLE
|
||||
|
||||
|
||||
async def test_updating_state(
|
||||
hass: HomeAssistant,
|
||||
mock_config_entry: MockConfigEntry,
|
||||
mock_api_client: AsyncMock,
|
||||
mock_websocket: AsyncMock,
|
||||
) -> None:
|
||||
"""Test updating state."""
|
||||
with patch("homeassistant.components.xthings_cloud.PLATFORMS", [Platform.LOCK]):
|
||||
await setup_integration(hass, mock_config_entry)
|
||||
|
||||
state = hass.states.get("lock.front_door_lock")
|
||||
assert state is not None
|
||||
assert state.state == "locked"
|
||||
|
||||
mock_websocket.call_args[1]["on_device_status"](
|
||||
"dev_lock_001",
|
||||
{
|
||||
"locked": False,
|
||||
"jammed": False,
|
||||
"battery": 80,
|
||||
},
|
||||
)
|
||||
|
||||
state = hass.states.get("lock.front_door_lock")
|
||||
assert state is not None
|
||||
assert state.state == "unlocked"
|
||||
+3
-1
@@ -1074,7 +1074,9 @@ def mqtt_client_mock(hass: HomeAssistant) -> Generator[MqttMockPahoClient]:
|
||||
|
||||
@ha.callback
|
||||
def _async_fire_mqtt_message(topic, payload, qos, retain, properties=None):
|
||||
async_fire_mqtt_message(hass, topic, payload or b"", qos, retain)
|
||||
async_fire_mqtt_message(
|
||||
hass, topic, payload or b"", qos, retain, properties=properties
|
||||
)
|
||||
mid = get_mid()
|
||||
hass.loop.call_soon(
|
||||
mock_client.on_publish, Mock(), 0, mid, MockMqttReasonCode(), None
|
||||
|
||||
Reference in New Issue
Block a user