mirror of
https://github.com/home-assistant/core.git
synced 2026-02-28 13:01:35 +01:00
Compare commits
14 Commits
remove-vol
...
dev
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ee05f14530 | ||
|
|
f0ba5178b7 | ||
|
|
df51ac932b | ||
|
|
e96b5f2eb1 | ||
|
|
4e59c89327 | ||
|
|
15676021a9 | ||
|
|
d3197a0d1e | ||
|
|
35692b335c | ||
|
|
cc5c810501 | ||
|
|
f2681f2dc8 | ||
|
|
fe0a22c790 | ||
|
|
186ab50458 | ||
|
|
b524c40176 | ||
|
|
642864959a |
@@ -545,6 +545,7 @@ homeassistant.components.tcp.*
|
||||
homeassistant.components.technove.*
|
||||
homeassistant.components.tedee.*
|
||||
homeassistant.components.telegram_bot.*
|
||||
homeassistant.components.teslemetry.*
|
||||
homeassistant.components.text.*
|
||||
homeassistant.components.thethingsnetwork.*
|
||||
homeassistant.components.threshold.*
|
||||
|
||||
@@ -93,7 +93,6 @@ class AirobotNumber(AirobotEntity, NumberEntity):
|
||||
raise ServiceValidationError(
|
||||
translation_domain=DOMAIN,
|
||||
translation_key="set_value_failed",
|
||||
translation_placeholders={"error": str(err)},
|
||||
) from err
|
||||
else:
|
||||
await self.coordinator.async_request_refresh()
|
||||
|
||||
@@ -112,7 +112,7 @@
|
||||
"message": "Failed to set temperature to {temperature}."
|
||||
},
|
||||
"set_value_failed": {
|
||||
"message": "Failed to set value: {error}"
|
||||
"message": "Failed to set value."
|
||||
},
|
||||
"switch_turn_off_failed": {
|
||||
"message": "Failed to turn off {switch}."
|
||||
|
||||
@@ -400,8 +400,8 @@ def _convert_content(
|
||||
# If there is only one text block, simplify the content to a string
|
||||
messages[-1]["content"] = messages[-1]["content"][0]["text"]
|
||||
else:
|
||||
# Note: We don't pass SystemContent here as its passed to the API as the prompt
|
||||
raise TypeError(f"Unexpected content type: {type(content)}")
|
||||
# Note: We don't pass SystemContent here as it's passed to the API as the prompt
|
||||
raise HomeAssistantError("Unexpected content type in chat log")
|
||||
|
||||
return messages, container_id
|
||||
|
||||
@@ -442,8 +442,8 @@ async def _transform_stream( # noqa: C901 - This is complex, but better to have
|
||||
|
||||
Each message could contain multiple blocks of the same type.
|
||||
"""
|
||||
if stream is None:
|
||||
raise TypeError("Expected a stream of messages")
|
||||
if stream is None or not hasattr(stream, "__aiter__"):
|
||||
raise HomeAssistantError("Expected a stream of messages")
|
||||
|
||||
current_tool_block: ToolUseBlockParam | ServerToolUseBlockParam | None = None
|
||||
current_tool_args: str
|
||||
@@ -456,8 +456,6 @@ async def _transform_stream( # noqa: C901 - This is complex, but better to have
|
||||
LOGGER.debug("Received response: %s", response)
|
||||
|
||||
if isinstance(response, RawMessageStartEvent):
|
||||
if response.message.role != "assistant":
|
||||
raise ValueError("Unexpected message role")
|
||||
input_usage = response.message.usage
|
||||
first_block = True
|
||||
elif isinstance(response, RawContentBlockStartEvent):
|
||||
@@ -666,7 +664,7 @@ class AnthropicBaseLLMEntity(Entity):
|
||||
|
||||
system = chat_log.content[0]
|
||||
if not isinstance(system, conversation.SystemContent):
|
||||
raise TypeError("First message must be a system message")
|
||||
raise HomeAssistantError("First message must be a system message")
|
||||
|
||||
# System prompt with caching enabled
|
||||
system_prompt: list[TextBlockParam] = [
|
||||
|
||||
@@ -31,10 +31,7 @@ rules:
|
||||
test-before-setup: done
|
||||
unique-config-entry: done
|
||||
# Silver
|
||||
action-exceptions:
|
||||
status: todo
|
||||
comment: |
|
||||
Reevaluate exceptions for entity services.
|
||||
action-exceptions: done
|
||||
config-entry-unloading: done
|
||||
docs-configuration-parameters: done
|
||||
docs-installation-parameters: done
|
||||
|
||||
@@ -117,6 +117,7 @@ class SharpAquosTVDevice(MediaPlayerEntity):
|
||||
| MediaPlayerEntityFeature.VOLUME_SET
|
||||
| MediaPlayerEntityFeature.PLAY
|
||||
)
|
||||
_attr_volume_step = 2 / 60
|
||||
|
||||
def __init__(
|
||||
self, name: str, remote: sharp_aquos_rc.TV, power_on_enabled: bool = False
|
||||
@@ -161,22 +162,6 @@ class SharpAquosTVDevice(MediaPlayerEntity):
|
||||
"""Turn off tvplayer."""
|
||||
self._remote.power(0)
|
||||
|
||||
@_retry
|
||||
def volume_up(self) -> None:
|
||||
"""Volume up the media player."""
|
||||
if self.volume_level is None:
|
||||
_LOGGER.debug("Unknown volume in volume_up")
|
||||
return
|
||||
self._remote.volume(int(self.volume_level * 60) + 2)
|
||||
|
||||
@_retry
|
||||
def volume_down(self) -> None:
|
||||
"""Volume down media player."""
|
||||
if self.volume_level is None:
|
||||
_LOGGER.debug("Unknown volume in volume_down")
|
||||
return
|
||||
self._remote.volume(int(self.volume_level * 60) - 2)
|
||||
|
||||
@_retry
|
||||
def set_volume_level(self, volume: float) -> None:
|
||||
"""Set Volume media player."""
|
||||
|
||||
@@ -85,6 +85,7 @@ class BluesoundPlayer(CoordinatorEntity[BluesoundCoordinator], MediaPlayerEntity
|
||||
_attr_media_content_type = MediaType.MUSIC
|
||||
_attr_has_entity_name = True
|
||||
_attr_name = None
|
||||
_attr_volume_step = 0.01
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -688,24 +689,6 @@ class BluesoundPlayer(CoordinatorEntity[BluesoundCoordinator], MediaPlayerEntity
|
||||
|
||||
await self._player.play_url(url)
|
||||
|
||||
async def async_volume_up(self) -> None:
|
||||
"""Volume up the media player."""
|
||||
if self.volume_level is None:
|
||||
return
|
||||
|
||||
new_volume = self.volume_level + 0.01
|
||||
new_volume = min(1, new_volume)
|
||||
await self.async_set_volume_level(new_volume)
|
||||
|
||||
async def async_volume_down(self) -> None:
|
||||
"""Volume down the media player."""
|
||||
if self.volume_level is None:
|
||||
return
|
||||
|
||||
new_volume = self.volume_level - 0.01
|
||||
new_volume = max(0, new_volume)
|
||||
await self.async_set_volume_level(new_volume)
|
||||
|
||||
async def async_set_volume_level(self, volume: float) -> None:
|
||||
"""Send volume_up command to media player."""
|
||||
volume = int(round(volume * 100))
|
||||
|
||||
@@ -139,18 +139,6 @@ class AbstractDemoPlayer(MediaPlayerEntity):
|
||||
self._attr_is_volume_muted = mute
|
||||
self.schedule_update_ha_state()
|
||||
|
||||
def volume_up(self) -> None:
|
||||
"""Increase volume."""
|
||||
assert self.volume_level is not None
|
||||
self._attr_volume_level = min(1.0, self.volume_level + 0.1)
|
||||
self.schedule_update_ha_state()
|
||||
|
||||
def volume_down(self) -> None:
|
||||
"""Decrease volume."""
|
||||
assert self.volume_level is not None
|
||||
self._attr_volume_level = max(0.0, self.volume_level - 0.1)
|
||||
self.schedule_update_ha_state()
|
||||
|
||||
def set_volume_level(self, volume: float) -> None:
|
||||
"""Set the volume level, range 0..1."""
|
||||
self._attr_volume_level = volume
|
||||
|
||||
@@ -151,6 +151,8 @@ class AFSAPIDevice(MediaPlayerEntity):
|
||||
# If call to get_volume fails set to 0 and try again next time.
|
||||
if not self._max_volume:
|
||||
self._max_volume = int(await afsapi.get_volume_steps() or 1) - 1
|
||||
if self._max_volume:
|
||||
self._attr_volume_step = 1 / self._max_volume
|
||||
|
||||
if self._attr_state != MediaPlayerState.OFF:
|
||||
info_name = await afsapi.get_play_name()
|
||||
@@ -239,18 +241,6 @@ class AFSAPIDevice(MediaPlayerEntity):
|
||||
await self.fs_device.set_mute(mute)
|
||||
|
||||
# volume
|
||||
async def async_volume_up(self) -> None:
|
||||
"""Send volume up command."""
|
||||
volume = await self.fs_device.get_volume()
|
||||
volume = int(volume or 0) + 1
|
||||
await self.fs_device.set_volume(min(volume, self._max_volume or 1))
|
||||
|
||||
async def async_volume_down(self) -> None:
|
||||
"""Send volume down command."""
|
||||
volume = await self.fs_device.get_volume()
|
||||
volume = int(volume or 0) - 1
|
||||
await self.fs_device.set_volume(max(volume, 0))
|
||||
|
||||
async def async_set_volume_level(self, volume: float) -> None:
|
||||
"""Set volume command."""
|
||||
if self._max_volume: # Can't do anything sensible if not set
|
||||
|
||||
@@ -140,5 +140,5 @@
|
||||
"documentation": "https://www.home-assistant.io/integrations/govee_ble",
|
||||
"integration_type": "device",
|
||||
"iot_class": "local_push",
|
||||
"requirements": ["govee-ble==0.44.0"]
|
||||
"requirements": ["govee-ble==1.2.0"]
|
||||
}
|
||||
|
||||
@@ -2,6 +2,8 @@
|
||||
|
||||
import logging
|
||||
|
||||
from chip.clusters import Objects as clusters
|
||||
|
||||
ADDON_SLUG = "core_matter_server"
|
||||
|
||||
CONF_INTEGRATION_CREATED_ADDON = "integration_created_addon"
|
||||
@@ -15,3 +17,100 @@ ID_TYPE_DEVICE_ID = "deviceid"
|
||||
ID_TYPE_SERIAL = "serial"
|
||||
|
||||
FEATUREMAP_ATTRIBUTE_ID = 65532
|
||||
|
||||
# --- Lock domain constants ---
|
||||
|
||||
# Shared field keys
|
||||
ATTR_CREDENTIAL_RULE = "credential_rule"
|
||||
ATTR_MAX_CREDENTIALS_PER_USER = "max_credentials_per_user"
|
||||
ATTR_MAX_PIN_USERS = "max_pin_users"
|
||||
ATTR_MAX_RFID_USERS = "max_rfid_users"
|
||||
ATTR_MAX_USERS = "max_users"
|
||||
ATTR_SUPPORTS_USER_MGMT = "supports_user_management"
|
||||
ATTR_USER_INDEX = "user_index"
|
||||
ATTR_USER_NAME = "user_name"
|
||||
ATTR_USER_STATUS = "user_status"
|
||||
ATTR_USER_TYPE = "user_type"
|
||||
|
||||
# Magic values
|
||||
CLEAR_ALL_INDEX = 0xFFFE # Matter spec: pass to ClearUser/ClearCredential to clear all
|
||||
|
||||
# Timed request timeout for lock commands that modify state.
|
||||
# 10 seconds accounts for Thread network latency and retransmissions.
|
||||
LOCK_TIMED_REQUEST_TIMEOUT_MS = 10000
|
||||
|
||||
# Credential field keys
|
||||
ATTR_CREDENTIAL_DATA = "credential_data"
|
||||
ATTR_CREDENTIAL_INDEX = "credential_index"
|
||||
ATTR_CREDENTIAL_TYPE = "credential_type"
|
||||
|
||||
# Credential type strings
|
||||
CRED_TYPE_FACE = "face"
|
||||
CRED_TYPE_FINGERPRINT = "fingerprint"
|
||||
CRED_TYPE_FINGER_VEIN = "finger_vein"
|
||||
CRED_TYPE_PIN = "pin"
|
||||
CRED_TYPE_RFID = "rfid"
|
||||
|
||||
# User status mapping (Matter DoorLock UserStatusEnum)
|
||||
_UserStatus = clusters.DoorLock.Enums.UserStatusEnum
|
||||
USER_STATUS_MAP: dict[int, str] = {
|
||||
_UserStatus.kAvailable: "available",
|
||||
_UserStatus.kOccupiedEnabled: "occupied_enabled",
|
||||
_UserStatus.kOccupiedDisabled: "occupied_disabled",
|
||||
}
|
||||
USER_STATUS_REVERSE_MAP: dict[str, int] = {v: k for k, v in USER_STATUS_MAP.items()}
|
||||
|
||||
# User type mapping (Matter DoorLock UserTypeEnum)
|
||||
_UserType = clusters.DoorLock.Enums.UserTypeEnum
|
||||
USER_TYPE_MAP: dict[int, str] = {
|
||||
_UserType.kUnrestrictedUser: "unrestricted_user",
|
||||
_UserType.kYearDayScheduleUser: "year_day_schedule_user",
|
||||
_UserType.kWeekDayScheduleUser: "week_day_schedule_user",
|
||||
_UserType.kProgrammingUser: "programming_user",
|
||||
_UserType.kNonAccessUser: "non_access_user",
|
||||
_UserType.kForcedUser: "forced_user",
|
||||
_UserType.kDisposableUser: "disposable_user",
|
||||
_UserType.kExpiringUser: "expiring_user",
|
||||
_UserType.kScheduleRestrictedUser: "schedule_restricted_user",
|
||||
_UserType.kRemoteOnlyUser: "remote_only_user",
|
||||
}
|
||||
USER_TYPE_REVERSE_MAP: dict[str, int] = {v: k for k, v in USER_TYPE_MAP.items()}
|
||||
|
||||
# Credential type mapping (Matter DoorLock CredentialTypeEnum)
|
||||
_CredentialType = clusters.DoorLock.Enums.CredentialTypeEnum
|
||||
CREDENTIAL_TYPE_MAP: dict[int, str] = {
|
||||
_CredentialType.kProgrammingPIN: "programming_pin",
|
||||
_CredentialType.kPin: CRED_TYPE_PIN,
|
||||
_CredentialType.kRfid: CRED_TYPE_RFID,
|
||||
_CredentialType.kFingerprint: CRED_TYPE_FINGERPRINT,
|
||||
_CredentialType.kFingerVein: CRED_TYPE_FINGER_VEIN,
|
||||
_CredentialType.kFace: CRED_TYPE_FACE,
|
||||
_CredentialType.kAliroCredentialIssuerKey: "aliro_credential_issuer_key",
|
||||
_CredentialType.kAliroEvictableEndpointKey: "aliro_evictable_endpoint_key",
|
||||
_CredentialType.kAliroNonEvictableEndpointKey: "aliro_non_evictable_endpoint_key",
|
||||
}
|
||||
|
||||
# Credential rule mapping (Matter DoorLock CredentialRuleEnum)
|
||||
_CredentialRule = clusters.DoorLock.Enums.CredentialRuleEnum
|
||||
CREDENTIAL_RULE_MAP: dict[int, str] = {
|
||||
_CredentialRule.kSingle: "single",
|
||||
_CredentialRule.kDual: "dual",
|
||||
_CredentialRule.kTri: "tri",
|
||||
}
|
||||
CREDENTIAL_RULE_REVERSE_MAP: dict[str, int] = {
|
||||
v: k for k, v in CREDENTIAL_RULE_MAP.items()
|
||||
}
|
||||
|
||||
# Reverse mapping for credential types (str -> int)
|
||||
CREDENTIAL_TYPE_REVERSE_MAP: dict[str, int] = {
|
||||
v: k for k, v in CREDENTIAL_TYPE_MAP.items()
|
||||
}
|
||||
|
||||
# Credential types allowed in set/clear services (excludes programming_pin, aliro_*)
|
||||
SERVICE_CREDENTIAL_TYPES = [
|
||||
CRED_TYPE_PIN,
|
||||
CRED_TYPE_RFID,
|
||||
CRED_TYPE_FINGERPRINT,
|
||||
CRED_TYPE_FINGER_VEIN,
|
||||
CRED_TYPE_FACE,
|
||||
]
|
||||
|
||||
@@ -174,6 +174,27 @@
|
||||
}
|
||||
},
|
||||
"services": {
|
||||
"clear_lock_credential": {
|
||||
"service": "mdi:key-remove"
|
||||
},
|
||||
"clear_lock_user": {
|
||||
"service": "mdi:account-remove"
|
||||
},
|
||||
"get_lock_credential_status": {
|
||||
"service": "mdi:key-chain"
|
||||
},
|
||||
"get_lock_info": {
|
||||
"service": "mdi:lock-question"
|
||||
},
|
||||
"get_lock_users": {
|
||||
"service": "mdi:account-multiple"
|
||||
},
|
||||
"set_lock_credential": {
|
||||
"service": "mdi:key-plus"
|
||||
},
|
||||
"set_lock_user": {
|
||||
"service": "mdi:account-lock"
|
||||
},
|
||||
"water_heater_boost": {
|
||||
"service": "mdi:water-boiler"
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ from dataclasses import dataclass
|
||||
from typing import Any
|
||||
|
||||
from chip.clusters import Objects as clusters
|
||||
from matter_server.common.errors import MatterError
|
||||
from matter_server.common.models import EventType, MatterNodeEvent
|
||||
|
||||
from homeassistant.components.lock import (
|
||||
@@ -17,32 +18,56 @@ from homeassistant.components.lock import (
|
||||
from homeassistant.config_entries import ConfigEntry
|
||||
from homeassistant.const import ATTR_CODE, Platform
|
||||
from homeassistant.core import HomeAssistant, callback
|
||||
from homeassistant.exceptions import HomeAssistantError
|
||||
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
|
||||
|
||||
from .const import LOGGER
|
||||
from .const import (
|
||||
ATTR_CREDENTIAL_DATA,
|
||||
ATTR_CREDENTIAL_INDEX,
|
||||
ATTR_CREDENTIAL_RULE,
|
||||
ATTR_CREDENTIAL_TYPE,
|
||||
ATTR_USER_INDEX,
|
||||
ATTR_USER_NAME,
|
||||
ATTR_USER_STATUS,
|
||||
ATTR_USER_TYPE,
|
||||
LOCK_TIMED_REQUEST_TIMEOUT_MS,
|
||||
LOGGER,
|
||||
)
|
||||
from .entity import MatterEntity, MatterEntityDescription
|
||||
from .helpers import get_matter
|
||||
from .lock_helpers import (
|
||||
DoorLockFeature,
|
||||
GetLockCredentialStatusResult,
|
||||
GetLockInfoResult,
|
||||
GetLockUsersResult,
|
||||
SetLockCredentialResult,
|
||||
clear_lock_credential,
|
||||
clear_lock_user,
|
||||
get_lock_credential_status,
|
||||
get_lock_info,
|
||||
get_lock_users,
|
||||
set_lock_credential,
|
||||
set_lock_user,
|
||||
)
|
||||
from .models import MatterDiscoverySchema
|
||||
|
||||
DOOR_LOCK_OPERATION_SOURCE = {
|
||||
# mapping from operation source id's to textual representation
|
||||
0: "Unspecified",
|
||||
1: "Manual", # [Optional]
|
||||
2: "Proprietary Remote", # [Optional]
|
||||
3: "Keypad", # [Optional]
|
||||
4: "Auto", # [Optional]
|
||||
5: "Button", # [Optional]
|
||||
6: "Schedule", # [HDSCH]
|
||||
7: "Remote", # [M]
|
||||
8: "RFID", # [RID]
|
||||
9: "Biometric", # [USR]
|
||||
10: "Aliro", # [Aliro]
|
||||
# Door lock operation source mapping (Matter DoorLock OperationSourceEnum)
|
||||
_OperationSource = clusters.DoorLock.Enums.OperationSourceEnum
|
||||
DOOR_LOCK_OPERATION_SOURCE: dict[int, str] = {
|
||||
_OperationSource.kUnspecified: "Unspecified",
|
||||
_OperationSource.kManual: "Manual",
|
||||
_OperationSource.kProprietaryRemote: "Proprietary Remote",
|
||||
_OperationSource.kKeypad: "Keypad",
|
||||
_OperationSource.kAuto: "Auto",
|
||||
_OperationSource.kButton: "Button",
|
||||
_OperationSource.kSchedule: "Schedule",
|
||||
_OperationSource.kRemote: "Remote",
|
||||
_OperationSource.kRfid: "RFID",
|
||||
_OperationSource.kBiometric: "Biometric",
|
||||
_OperationSource.kAliro: "Aliro",
|
||||
}
|
||||
|
||||
|
||||
DoorLockFeature = clusters.DoorLock.Bitmaps.Feature
|
||||
|
||||
|
||||
async def async_setup_entry(
|
||||
hass: HomeAssistant,
|
||||
config_entry: ConfigEntry,
|
||||
@@ -98,17 +123,15 @@ class MatterLock(MatterEntity, LockEntity):
|
||||
node_event.data,
|
||||
)
|
||||
|
||||
# handle the DoorLock events
|
||||
# Handle the DoorLock events
|
||||
node_event_data: dict[str, int] = node_event.data or {}
|
||||
match node_event.event_id:
|
||||
case (
|
||||
clusters.DoorLock.Events.LockOperation.event_id
|
||||
): # Lock cluster event 2
|
||||
# update the changed_by attribute to indicate lock operation source
|
||||
case clusters.DoorLock.Events.LockOperation.event_id:
|
||||
operation_source: int = node_event_data.get("operationSource", -1)
|
||||
self._attr_changed_by = DOOR_LOCK_OPERATION_SOURCE.get(
|
||||
source_name = DOOR_LOCK_OPERATION_SOURCE.get(
|
||||
operation_source, "Unknown"
|
||||
)
|
||||
self._attr_changed_by = source_name
|
||||
self.async_write_ha_state()
|
||||
|
||||
@property
|
||||
@@ -146,7 +169,7 @@ class MatterLock(MatterEntity, LockEntity):
|
||||
code_bytes = code.encode() if code else None
|
||||
await self.send_device_command(
|
||||
command=clusters.DoorLock.Commands.LockDoor(code_bytes),
|
||||
timed_request_timeout_ms=1000,
|
||||
timed_request_timeout_ms=LOCK_TIMED_REQUEST_TIMEOUT_MS,
|
||||
)
|
||||
|
||||
async def async_unlock(self, **kwargs: Any) -> None:
|
||||
@@ -168,12 +191,12 @@ class MatterLock(MatterEntity, LockEntity):
|
||||
# and unlatch on the HA 'open' command.
|
||||
await self.send_device_command(
|
||||
command=clusters.DoorLock.Commands.UnboltDoor(code_bytes),
|
||||
timed_request_timeout_ms=1000,
|
||||
timed_request_timeout_ms=LOCK_TIMED_REQUEST_TIMEOUT_MS,
|
||||
)
|
||||
else:
|
||||
await self.send_device_command(
|
||||
command=clusters.DoorLock.Commands.UnlockDoor(code_bytes),
|
||||
timed_request_timeout_ms=1000,
|
||||
timed_request_timeout_ms=LOCK_TIMED_REQUEST_TIMEOUT_MS,
|
||||
)
|
||||
|
||||
async def async_open(self, **kwargs: Any) -> None:
|
||||
@@ -190,7 +213,7 @@ class MatterLock(MatterEntity, LockEntity):
|
||||
code_bytes = code.encode() if code else None
|
||||
await self.send_device_command(
|
||||
command=clusters.DoorLock.Commands.UnlockDoor(code_bytes),
|
||||
timed_request_timeout_ms=1000,
|
||||
timed_request_timeout_ms=LOCK_TIMED_REQUEST_TIMEOUT_MS,
|
||||
)
|
||||
|
||||
@callback
|
||||
@@ -256,6 +279,109 @@ class MatterLock(MatterEntity, LockEntity):
|
||||
supported_features |= LockEntityFeature.OPEN
|
||||
self._attr_supported_features = supported_features
|
||||
|
||||
# --- Entity service methods ---
|
||||
|
||||
async def async_set_lock_user(self, **kwargs: Any) -> None:
|
||||
"""Set a lock user (full CRUD)."""
|
||||
try:
|
||||
await set_lock_user(
|
||||
self.matter_client,
|
||||
self._endpoint.node,
|
||||
user_index=kwargs.get(ATTR_USER_INDEX),
|
||||
user_name=kwargs.get(ATTR_USER_NAME),
|
||||
user_type=kwargs.get(ATTR_USER_TYPE),
|
||||
credential_rule=kwargs.get(ATTR_CREDENTIAL_RULE),
|
||||
)
|
||||
except MatterError as err:
|
||||
raise HomeAssistantError(
|
||||
f"Failed to set lock user on {self.entity_id}: {err}"
|
||||
) from err
|
||||
|
||||
async def async_clear_lock_user(self, **kwargs: Any) -> None:
|
||||
"""Clear a lock user."""
|
||||
try:
|
||||
await clear_lock_user(
|
||||
self.matter_client,
|
||||
self._endpoint.node,
|
||||
kwargs[ATTR_USER_INDEX],
|
||||
)
|
||||
except MatterError as err:
|
||||
raise HomeAssistantError(
|
||||
f"Failed to clear lock user on {self.entity_id}: {err}"
|
||||
) from err
|
||||
|
||||
async def async_get_lock_info(self) -> GetLockInfoResult:
|
||||
"""Get lock capabilities and configuration info."""
|
||||
try:
|
||||
return await get_lock_info(
|
||||
self.matter_client,
|
||||
self._endpoint.node,
|
||||
)
|
||||
except MatterError as err:
|
||||
raise HomeAssistantError(
|
||||
f"Failed to get lock info for {self.entity_id}: {err}"
|
||||
) from err
|
||||
|
||||
async def async_get_lock_users(self) -> GetLockUsersResult:
|
||||
"""Get all users from the lock."""
|
||||
try:
|
||||
return await get_lock_users(
|
||||
self.matter_client,
|
||||
self._endpoint.node,
|
||||
)
|
||||
except MatterError as err:
|
||||
raise HomeAssistantError(
|
||||
f"Failed to get lock users for {self.entity_id}: {err}"
|
||||
) from err
|
||||
|
||||
async def async_set_lock_credential(self, **kwargs: Any) -> SetLockCredentialResult:
|
||||
"""Set a credential on the lock."""
|
||||
try:
|
||||
return await set_lock_credential(
|
||||
self.matter_client,
|
||||
self._endpoint.node,
|
||||
credential_type=kwargs[ATTR_CREDENTIAL_TYPE],
|
||||
credential_data=kwargs[ATTR_CREDENTIAL_DATA],
|
||||
credential_index=kwargs.get(ATTR_CREDENTIAL_INDEX),
|
||||
user_index=kwargs.get(ATTR_USER_INDEX),
|
||||
user_status=kwargs.get(ATTR_USER_STATUS),
|
||||
user_type=kwargs.get(ATTR_USER_TYPE),
|
||||
)
|
||||
except MatterError as err:
|
||||
raise HomeAssistantError(
|
||||
f"Failed to set lock credential on {self.entity_id}: {err}"
|
||||
) from err
|
||||
|
||||
async def async_clear_lock_credential(self, **kwargs: Any) -> None:
|
||||
"""Clear a credential from the lock."""
|
||||
try:
|
||||
await clear_lock_credential(
|
||||
self.matter_client,
|
||||
self._endpoint.node,
|
||||
credential_type=kwargs[ATTR_CREDENTIAL_TYPE],
|
||||
credential_index=kwargs[ATTR_CREDENTIAL_INDEX],
|
||||
)
|
||||
except MatterError as err:
|
||||
raise HomeAssistantError(
|
||||
f"Failed to clear lock credential on {self.entity_id}: {err}"
|
||||
) from err
|
||||
|
||||
async def async_get_lock_credential_status(
|
||||
self, **kwargs: Any
|
||||
) -> GetLockCredentialStatusResult:
|
||||
"""Get the status of a credential slot on the lock."""
|
||||
try:
|
||||
return await get_lock_credential_status(
|
||||
self.matter_client,
|
||||
self._endpoint.node,
|
||||
credential_type=kwargs[ATTR_CREDENTIAL_TYPE],
|
||||
credential_index=kwargs[ATTR_CREDENTIAL_INDEX],
|
||||
)
|
||||
except MatterError as err:
|
||||
raise HomeAssistantError(
|
||||
f"Failed to get credential status for {self.entity_id}: {err}"
|
||||
) from err
|
||||
|
||||
|
||||
DISCOVERY_SCHEMAS = [
|
||||
MatterDiscoverySchema(
|
||||
|
||||
881
homeassistant/components/matter/lock_helpers.py
Normal file
881
homeassistant/components/matter/lock_helpers.py
Normal file
@@ -0,0 +1,881 @@
|
||||
"""Lock-specific helpers for the Matter integration.
|
||||
|
||||
Provides DoorLock cluster endpoint resolution, feature detection, and
|
||||
business logic for lock user/credential management.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING, Any, TypedDict
|
||||
|
||||
from chip.clusters import Objects as clusters
|
||||
|
||||
from homeassistant.exceptions import HomeAssistantError, ServiceValidationError
|
||||
|
||||
from .const import (
|
||||
CLEAR_ALL_INDEX,
|
||||
CRED_TYPE_FACE,
|
||||
CRED_TYPE_FINGER_VEIN,
|
||||
CRED_TYPE_FINGERPRINT,
|
||||
CRED_TYPE_PIN,
|
||||
CRED_TYPE_RFID,
|
||||
CREDENTIAL_RULE_MAP,
|
||||
CREDENTIAL_RULE_REVERSE_MAP,
|
||||
CREDENTIAL_TYPE_MAP,
|
||||
CREDENTIAL_TYPE_REVERSE_MAP,
|
||||
LOCK_TIMED_REQUEST_TIMEOUT_MS,
|
||||
USER_STATUS_MAP,
|
||||
USER_STATUS_REVERSE_MAP,
|
||||
USER_TYPE_MAP,
|
||||
USER_TYPE_REVERSE_MAP,
|
||||
)
|
||||
|
||||
# Error translation keys (used in ServiceValidationError/HomeAssistantError)
|
||||
ERR_CREDENTIAL_TYPE_NOT_SUPPORTED = "credential_type_not_supported"
|
||||
ERR_INVALID_CREDENTIAL_DATA = "invalid_credential_data"
|
||||
|
||||
# SetCredential response status mapping (Matter DlStatus)
|
||||
_DlStatus = clusters.DoorLock.Enums.DlStatus
|
||||
SET_CREDENTIAL_STATUS_MAP: dict[int, str] = {
|
||||
_DlStatus.kSuccess: "success",
|
||||
_DlStatus.kFailure: "failure",
|
||||
_DlStatus.kDuplicate: "duplicate",
|
||||
_DlStatus.kOccupied: "occupied",
|
||||
}
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from matter_server.client import MatterClient
|
||||
from matter_server.client.models.node import MatterEndpoint, MatterNode
|
||||
|
||||
# DoorLock Feature bitmap from Matter SDK
|
||||
DoorLockFeature = clusters.DoorLock.Bitmaps.Feature
|
||||
|
||||
|
||||
# --- TypedDicts for service action responses ---
|
||||
|
||||
|
||||
class LockUserCredentialData(TypedDict):
|
||||
"""Credential data within a user response."""
|
||||
|
||||
type: str
|
||||
index: int | None
|
||||
|
||||
|
||||
class LockUserData(TypedDict):
|
||||
"""User data returned from lock queries."""
|
||||
|
||||
user_index: int | None
|
||||
user_name: str | None
|
||||
user_unique_id: int | None
|
||||
user_status: str
|
||||
user_type: str
|
||||
credential_rule: str
|
||||
credentials: list[LockUserCredentialData]
|
||||
next_user_index: int | None
|
||||
|
||||
|
||||
class SetLockUserResult(TypedDict):
|
||||
"""Result of set_lock_user service action."""
|
||||
|
||||
user_index: int
|
||||
|
||||
|
||||
class GetLockUsersResult(TypedDict):
|
||||
"""Result of get_lock_users service action."""
|
||||
|
||||
max_users: int
|
||||
users: list[LockUserData]
|
||||
|
||||
|
||||
class GetLockInfoResult(TypedDict):
|
||||
"""Result of get_lock_info service action."""
|
||||
|
||||
supports_user_management: bool
|
||||
supported_credential_types: list[str]
|
||||
max_users: int | None
|
||||
max_pin_users: int | None
|
||||
max_rfid_users: int | None
|
||||
max_credentials_per_user: int | None
|
||||
min_pin_length: int | None
|
||||
max_pin_length: int | None
|
||||
min_rfid_length: int | None
|
||||
max_rfid_length: int | None
|
||||
|
||||
|
||||
class SetLockCredentialResult(TypedDict):
|
||||
"""Result of set_lock_credential service action."""
|
||||
|
||||
credential_index: int
|
||||
user_index: int | None
|
||||
next_credential_index: int | None
|
||||
|
||||
|
||||
class GetLockCredentialStatusResult(TypedDict):
|
||||
"""Result of get_lock_credential_status service action."""
|
||||
|
||||
credential_exists: bool
|
||||
user_index: int | None
|
||||
next_credential_index: int | None
|
||||
|
||||
|
||||
def _get_lock_endpoint_from_node(node: MatterNode) -> MatterEndpoint | None:
|
||||
"""Get the DoorLock endpoint from a node.
|
||||
|
||||
Returns the first endpoint that has the DoorLock cluster, or None if not found.
|
||||
"""
|
||||
for endpoint in node.endpoints.values():
|
||||
if endpoint.has_cluster(clusters.DoorLock):
|
||||
return endpoint
|
||||
return None
|
||||
|
||||
|
||||
def _get_feature_map(endpoint: MatterEndpoint) -> int | None:
|
||||
"""Read the DoorLock FeatureMap attribute from an endpoint."""
|
||||
value: int | None = endpoint.get_attribute_value(
|
||||
None, clusters.DoorLock.Attributes.FeatureMap
|
||||
)
|
||||
return value
|
||||
|
||||
|
||||
def _lock_supports_usr_feature(endpoint: MatterEndpoint) -> bool:
|
||||
"""Check if lock endpoint supports USR (User) feature.
|
||||
|
||||
The USR feature indicates the lock supports user and credential management
|
||||
commands like SetUser, GetUser, SetCredential, etc.
|
||||
"""
|
||||
feature_map = _get_feature_map(endpoint)
|
||||
if feature_map is None:
|
||||
return False
|
||||
return bool(feature_map & DoorLockFeature.kUser)
|
||||
|
||||
|
||||
# --- Pure utility functions ---
|
||||
|
||||
|
||||
def _get_attr(obj: Any, attr: str) -> Any:
|
||||
"""Get attribute from object or dict.
|
||||
|
||||
Matter SDK responses can be either dataclass objects or dicts depending on
|
||||
the SDK version and serialization context.
|
||||
"""
|
||||
if isinstance(obj, dict):
|
||||
return obj.get(attr)
|
||||
return getattr(obj, attr, None)
|
||||
|
||||
|
||||
def _get_supported_credential_types(feature_map: int) -> list[str]:
|
||||
"""Get list of supported credential types from feature map."""
|
||||
types = []
|
||||
if feature_map & DoorLockFeature.kPinCredential:
|
||||
types.append(CRED_TYPE_PIN)
|
||||
if feature_map & DoorLockFeature.kRfidCredential:
|
||||
types.append(CRED_TYPE_RFID)
|
||||
if feature_map & DoorLockFeature.kFingerCredentials:
|
||||
types.append(CRED_TYPE_FINGERPRINT)
|
||||
if feature_map & DoorLockFeature.kFaceCredentials:
|
||||
types.append(CRED_TYPE_FACE)
|
||||
return types
|
||||
|
||||
|
||||
def _format_user_response(user_data: Any) -> LockUserData | None:
|
||||
"""Format GetUser response to API response format.
|
||||
|
||||
Returns None if the user slot is empty (no userStatus).
|
||||
"""
|
||||
if user_data is None:
|
||||
return None
|
||||
|
||||
user_status = _get_attr(user_data, "userStatus")
|
||||
if user_status is None:
|
||||
return None
|
||||
|
||||
creds = _get_attr(user_data, "credentials")
|
||||
credentials: list[LockUserCredentialData] = [
|
||||
LockUserCredentialData(
|
||||
type=CREDENTIAL_TYPE_MAP.get(_get_attr(cred, "credentialType"), "unknown"),
|
||||
index=_get_attr(cred, "credentialIndex"),
|
||||
)
|
||||
for cred in (creds or [])
|
||||
]
|
||||
|
||||
return LockUserData(
|
||||
user_index=_get_attr(user_data, "userIndex"),
|
||||
user_name=_get_attr(user_data, "userName"),
|
||||
user_unique_id=_get_attr(user_data, "userUniqueID"),
|
||||
user_status=USER_STATUS_MAP.get(user_status, "unknown"),
|
||||
user_type=USER_TYPE_MAP.get(_get_attr(user_data, "userType"), "unknown"),
|
||||
credential_rule=CREDENTIAL_RULE_MAP.get(
|
||||
_get_attr(user_data, "credentialRule"), "unknown"
|
||||
),
|
||||
credentials=credentials,
|
||||
next_user_index=_get_attr(user_data, "nextUserIndex"),
|
||||
)
|
||||
|
||||
|
||||
# --- Credential management helpers ---
|
||||
|
||||
|
||||
async def _clear_user_credentials(
|
||||
matter_client: MatterClient,
|
||||
node_id: int,
|
||||
endpoint_id: int,
|
||||
user_index: int,
|
||||
) -> None:
|
||||
"""Clear all credentials for a specific user.
|
||||
|
||||
Fetches the user to get credential list, then clears each credential.
|
||||
"""
|
||||
get_user_response = await matter_client.send_device_command(
|
||||
node_id=node_id,
|
||||
endpoint_id=endpoint_id,
|
||||
command=clusters.DoorLock.Commands.GetUser(userIndex=user_index),
|
||||
)
|
||||
|
||||
creds = _get_attr(get_user_response, "credentials")
|
||||
if not creds:
|
||||
return
|
||||
|
||||
for cred in creds:
|
||||
cred_type = _get_attr(cred, "credentialType")
|
||||
cred_index = _get_attr(cred, "credentialIndex")
|
||||
await matter_client.send_device_command(
|
||||
node_id=node_id,
|
||||
endpoint_id=endpoint_id,
|
||||
command=clusters.DoorLock.Commands.ClearCredential(
|
||||
credential=clusters.DoorLock.Structs.CredentialStruct(
|
||||
credentialType=cred_type,
|
||||
credentialIndex=cred_index,
|
||||
),
|
||||
),
|
||||
timed_request_timeout_ms=LOCK_TIMED_REQUEST_TIMEOUT_MS,
|
||||
)
|
||||
|
||||
|
||||
class LockEndpointNotFoundError(HomeAssistantError):
|
||||
"""Lock endpoint not found on node."""
|
||||
|
||||
|
||||
class UsrFeatureNotSupportedError(ServiceValidationError):
|
||||
"""Lock does not support USR (user management) feature."""
|
||||
|
||||
|
||||
class UserSlotEmptyError(ServiceValidationError):
|
||||
"""User slot is empty."""
|
||||
|
||||
|
||||
class NoAvailableUserSlotsError(ServiceValidationError):
|
||||
"""No available user slots on the lock."""
|
||||
|
||||
|
||||
class CredentialTypeNotSupportedError(ServiceValidationError):
|
||||
"""Lock does not support the requested credential type."""
|
||||
|
||||
|
||||
class CredentialDataInvalidError(ServiceValidationError):
|
||||
"""Credential data fails validation."""
|
||||
|
||||
|
||||
class SetCredentialFailedError(HomeAssistantError):
|
||||
"""SetCredential command returned a non-success status."""
|
||||
|
||||
|
||||
def _get_lock_endpoint_or_raise(node: MatterNode) -> MatterEndpoint:
|
||||
"""Get the DoorLock endpoint from a node or raise an error."""
|
||||
lock_endpoint = _get_lock_endpoint_from_node(node)
|
||||
if lock_endpoint is None:
|
||||
raise LockEndpointNotFoundError("No lock endpoint found on this device")
|
||||
return lock_endpoint
|
||||
|
||||
|
||||
def _ensure_usr_support(lock_endpoint: MatterEndpoint) -> None:
|
||||
"""Ensure the lock endpoint supports USR (user management) feature.
|
||||
|
||||
Raises UsrFeatureNotSupportedError if the lock doesn't support user management.
|
||||
"""
|
||||
if not _lock_supports_usr_feature(lock_endpoint):
|
||||
raise UsrFeatureNotSupportedError(
|
||||
"Lock does not support user/credential management"
|
||||
)
|
||||
|
||||
|
||||
# --- High-level business logic functions ---
|
||||
|
||||
|
||||
async def get_lock_info(
|
||||
matter_client: MatterClient,
|
||||
node: MatterNode,
|
||||
) -> GetLockInfoResult:
|
||||
"""Get lock capabilities and configuration info.
|
||||
|
||||
Returns a typed dict with lock capability information.
|
||||
Raises HomeAssistantError if lock endpoint not found.
|
||||
"""
|
||||
lock_endpoint = _get_lock_endpoint_or_raise(node)
|
||||
supports_usr = _lock_supports_usr_feature(lock_endpoint)
|
||||
|
||||
# Get feature map for credential type detection
|
||||
feature_map = (
|
||||
lock_endpoint.get_attribute_value(None, clusters.DoorLock.Attributes.FeatureMap)
|
||||
or 0
|
||||
)
|
||||
|
||||
result = GetLockInfoResult(
|
||||
supports_user_management=supports_usr,
|
||||
supported_credential_types=_get_supported_credential_types(feature_map),
|
||||
max_users=None,
|
||||
max_pin_users=None,
|
||||
max_rfid_users=None,
|
||||
max_credentials_per_user=None,
|
||||
min_pin_length=None,
|
||||
max_pin_length=None,
|
||||
min_rfid_length=None,
|
||||
max_rfid_length=None,
|
||||
)
|
||||
|
||||
# Populate capacity info if USR feature is supported
|
||||
if supports_usr:
|
||||
result["max_users"] = lock_endpoint.get_attribute_value(
|
||||
None, clusters.DoorLock.Attributes.NumberOfTotalUsersSupported
|
||||
)
|
||||
result["max_pin_users"] = lock_endpoint.get_attribute_value(
|
||||
None, clusters.DoorLock.Attributes.NumberOfPINUsersSupported
|
||||
)
|
||||
result["max_rfid_users"] = lock_endpoint.get_attribute_value(
|
||||
None, clusters.DoorLock.Attributes.NumberOfRFIDUsersSupported
|
||||
)
|
||||
result["max_credentials_per_user"] = lock_endpoint.get_attribute_value(
|
||||
None, clusters.DoorLock.Attributes.NumberOfCredentialsSupportedPerUser
|
||||
)
|
||||
result["min_pin_length"] = lock_endpoint.get_attribute_value(
|
||||
None, clusters.DoorLock.Attributes.MinPINCodeLength
|
||||
)
|
||||
result["max_pin_length"] = lock_endpoint.get_attribute_value(
|
||||
None, clusters.DoorLock.Attributes.MaxPINCodeLength
|
||||
)
|
||||
result["min_rfid_length"] = lock_endpoint.get_attribute_value(
|
||||
None, clusters.DoorLock.Attributes.MinRFIDCodeLength
|
||||
)
|
||||
result["max_rfid_length"] = lock_endpoint.get_attribute_value(
|
||||
None, clusters.DoorLock.Attributes.MaxRFIDCodeLength
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
async def set_lock_user(
|
||||
matter_client: MatterClient,
|
||||
node: MatterNode,
|
||||
*,
|
||||
user_index: int | None = None,
|
||||
user_name: str | None = None,
|
||||
user_unique_id: int | None = None,
|
||||
user_status: str | None = None,
|
||||
user_type: str | None = None,
|
||||
credential_rule: str | None = None,
|
||||
) -> SetLockUserResult:
|
||||
"""Add or update a user on the lock.
|
||||
|
||||
When user_status, user_type, or credential_rule is None, defaults are used
|
||||
for new users and existing values are preserved for modifications.
|
||||
|
||||
Returns typed dict with user_index on success.
|
||||
Raises HomeAssistantError on failure.
|
||||
"""
|
||||
lock_endpoint = _get_lock_endpoint_or_raise(node)
|
||||
_ensure_usr_support(lock_endpoint)
|
||||
|
||||
if user_index is None:
|
||||
# Adding new user - find first available slot
|
||||
max_users = (
|
||||
lock_endpoint.get_attribute_value(
|
||||
None, clusters.DoorLock.Attributes.NumberOfTotalUsersSupported
|
||||
)
|
||||
or 0
|
||||
)
|
||||
|
||||
for idx in range(1, max_users + 1):
|
||||
get_user_response = await matter_client.send_device_command(
|
||||
node_id=node.node_id,
|
||||
endpoint_id=lock_endpoint.endpoint_id,
|
||||
command=clusters.DoorLock.Commands.GetUser(userIndex=idx),
|
||||
)
|
||||
if _get_attr(get_user_response, "userStatus") is None:
|
||||
user_index = idx
|
||||
break
|
||||
|
||||
if user_index is None:
|
||||
raise NoAvailableUserSlotsError("No available user slots on the lock")
|
||||
|
||||
user_status_enum = (
|
||||
USER_STATUS_REVERSE_MAP.get(
|
||||
user_status,
|
||||
clusters.DoorLock.Enums.UserStatusEnum.kOccupiedEnabled,
|
||||
)
|
||||
if user_status is not None
|
||||
else clusters.DoorLock.Enums.UserStatusEnum.kOccupiedEnabled
|
||||
)
|
||||
|
||||
await matter_client.send_device_command(
|
||||
node_id=node.node_id,
|
||||
endpoint_id=lock_endpoint.endpoint_id,
|
||||
command=clusters.DoorLock.Commands.SetUser(
|
||||
operationType=clusters.DoorLock.Enums.DataOperationTypeEnum.kAdd,
|
||||
userIndex=user_index,
|
||||
userName=user_name,
|
||||
userUniqueID=user_unique_id,
|
||||
userStatus=user_status_enum,
|
||||
userType=USER_TYPE_REVERSE_MAP.get(
|
||||
user_type,
|
||||
clusters.DoorLock.Enums.UserTypeEnum.kUnrestrictedUser,
|
||||
)
|
||||
if user_type is not None
|
||||
else clusters.DoorLock.Enums.UserTypeEnum.kUnrestrictedUser,
|
||||
credentialRule=CREDENTIAL_RULE_REVERSE_MAP.get(
|
||||
credential_rule,
|
||||
clusters.DoorLock.Enums.CredentialRuleEnum.kSingle,
|
||||
)
|
||||
if credential_rule is not None
|
||||
else clusters.DoorLock.Enums.CredentialRuleEnum.kSingle,
|
||||
),
|
||||
timed_request_timeout_ms=LOCK_TIMED_REQUEST_TIMEOUT_MS,
|
||||
)
|
||||
else:
|
||||
# Updating existing user - preserve existing values when not specified
|
||||
get_user_response = await matter_client.send_device_command(
|
||||
node_id=node.node_id,
|
||||
endpoint_id=lock_endpoint.endpoint_id,
|
||||
command=clusters.DoorLock.Commands.GetUser(userIndex=user_index),
|
||||
)
|
||||
|
||||
if _get_attr(get_user_response, "userStatus") is None:
|
||||
raise UserSlotEmptyError(f"User slot {user_index} is empty")
|
||||
|
||||
resolved_user_name = (
|
||||
user_name
|
||||
if user_name is not None
|
||||
else _get_attr(get_user_response, "userName")
|
||||
)
|
||||
resolved_unique_id = (
|
||||
user_unique_id
|
||||
if user_unique_id is not None
|
||||
else _get_attr(get_user_response, "userUniqueID")
|
||||
)
|
||||
|
||||
resolved_status = (
|
||||
USER_STATUS_REVERSE_MAP[user_status]
|
||||
if user_status is not None
|
||||
else _get_attr(get_user_response, "userStatus")
|
||||
)
|
||||
|
||||
resolved_type = (
|
||||
USER_TYPE_REVERSE_MAP[user_type]
|
||||
if user_type is not None
|
||||
else _get_attr(get_user_response, "userType")
|
||||
)
|
||||
|
||||
resolved_rule = (
|
||||
CREDENTIAL_RULE_REVERSE_MAP[credential_rule]
|
||||
if credential_rule is not None
|
||||
else _get_attr(get_user_response, "credentialRule")
|
||||
)
|
||||
|
||||
await matter_client.send_device_command(
|
||||
node_id=node.node_id,
|
||||
endpoint_id=lock_endpoint.endpoint_id,
|
||||
command=clusters.DoorLock.Commands.SetUser(
|
||||
operationType=clusters.DoorLock.Enums.DataOperationTypeEnum.kModify,
|
||||
userIndex=user_index,
|
||||
userName=resolved_user_name,
|
||||
userUniqueID=resolved_unique_id,
|
||||
userStatus=resolved_status,
|
||||
userType=resolved_type,
|
||||
credentialRule=resolved_rule,
|
||||
),
|
||||
timed_request_timeout_ms=LOCK_TIMED_REQUEST_TIMEOUT_MS,
|
||||
)
|
||||
|
||||
return SetLockUserResult(user_index=user_index)
|
||||
|
||||
|
||||
async def get_lock_users(
|
||||
matter_client: MatterClient,
|
||||
node: MatterNode,
|
||||
) -> GetLockUsersResult:
|
||||
"""Get all users from the lock.
|
||||
|
||||
Returns typed dict with users list and max_users capacity.
|
||||
Raises HomeAssistantError on failure.
|
||||
"""
|
||||
lock_endpoint = _get_lock_endpoint_or_raise(node)
|
||||
_ensure_usr_support(lock_endpoint)
|
||||
|
||||
max_users = (
|
||||
lock_endpoint.get_attribute_value(
|
||||
None, clusters.DoorLock.Attributes.NumberOfTotalUsersSupported
|
||||
)
|
||||
or 0
|
||||
)
|
||||
|
||||
users: list[LockUserData] = []
|
||||
current_index = 1
|
||||
|
||||
# Iterate through users using next_user_index for efficiency
|
||||
while current_index is not None and current_index <= max_users:
|
||||
get_user_response = await matter_client.send_device_command(
|
||||
node_id=node.node_id,
|
||||
endpoint_id=lock_endpoint.endpoint_id,
|
||||
command=clusters.DoorLock.Commands.GetUser(
|
||||
userIndex=current_index,
|
||||
),
|
||||
)
|
||||
|
||||
user_data = _format_user_response(get_user_response)
|
||||
if user_data is not None:
|
||||
users.append(user_data)
|
||||
|
||||
# Move to next user index
|
||||
next_index = _get_attr(get_user_response, "nextUserIndex")
|
||||
if next_index is None or next_index <= current_index:
|
||||
break
|
||||
current_index = next_index
|
||||
|
||||
return GetLockUsersResult(
|
||||
max_users=max_users,
|
||||
users=users,
|
||||
)
|
||||
|
||||
|
||||
async def clear_lock_user(
|
||||
matter_client: MatterClient,
|
||||
node: MatterNode,
|
||||
user_index: int,
|
||||
) -> None:
|
||||
"""Clear a user from the lock, cleaning up credentials first.
|
||||
|
||||
Use index 0xFFFE (CLEAR_ALL_INDEX) to clear all users.
|
||||
Raises HomeAssistantError on failure.
|
||||
"""
|
||||
lock_endpoint = _get_lock_endpoint_or_raise(node)
|
||||
_ensure_usr_support(lock_endpoint)
|
||||
|
||||
if user_index == CLEAR_ALL_INDEX:
|
||||
# Clear all: clear all credentials first, then all users
|
||||
await matter_client.send_device_command(
|
||||
node_id=node.node_id,
|
||||
endpoint_id=lock_endpoint.endpoint_id,
|
||||
command=clusters.DoorLock.Commands.ClearCredential(
|
||||
credential=None,
|
||||
),
|
||||
timed_request_timeout_ms=LOCK_TIMED_REQUEST_TIMEOUT_MS,
|
||||
)
|
||||
else:
|
||||
# Clear credentials for this specific user before deleting them
|
||||
await _clear_user_credentials(
|
||||
matter_client,
|
||||
node.node_id,
|
||||
lock_endpoint.endpoint_id,
|
||||
user_index,
|
||||
)
|
||||
|
||||
await matter_client.send_device_command(
|
||||
node_id=node.node_id,
|
||||
endpoint_id=lock_endpoint.endpoint_id,
|
||||
command=clusters.DoorLock.Commands.ClearUser(
|
||||
userIndex=user_index,
|
||||
),
|
||||
timed_request_timeout_ms=LOCK_TIMED_REQUEST_TIMEOUT_MS,
|
||||
)
|
||||
|
||||
|
||||
# --- Credential validation helpers ---
|
||||
|
||||
# Map credential type strings to the feature bit that must be set
|
||||
_CREDENTIAL_TYPE_FEATURE_MAP: dict[str, int] = {
|
||||
CRED_TYPE_PIN: DoorLockFeature.kPinCredential,
|
||||
CRED_TYPE_RFID: DoorLockFeature.kRfidCredential,
|
||||
CRED_TYPE_FINGERPRINT: DoorLockFeature.kFingerCredentials,
|
||||
CRED_TYPE_FINGER_VEIN: DoorLockFeature.kFingerCredentials,
|
||||
CRED_TYPE_FACE: DoorLockFeature.kFaceCredentials,
|
||||
}
|
||||
|
||||
|
||||
def _validate_credential_type_support(
|
||||
lock_endpoint: MatterEndpoint, credential_type: str
|
||||
) -> None:
|
||||
"""Validate the lock supports the requested credential type.
|
||||
|
||||
Raises CredentialTypeNotSupportedError if not supported.
|
||||
"""
|
||||
required_bit = _CREDENTIAL_TYPE_FEATURE_MAP.get(credential_type)
|
||||
if required_bit is None:
|
||||
raise CredentialTypeNotSupportedError(
|
||||
translation_domain="matter",
|
||||
translation_key=ERR_CREDENTIAL_TYPE_NOT_SUPPORTED,
|
||||
translation_placeholders={"credential_type": credential_type},
|
||||
)
|
||||
|
||||
feature_map = _get_feature_map(lock_endpoint) or 0
|
||||
if not (feature_map & required_bit):
|
||||
raise CredentialTypeNotSupportedError(
|
||||
translation_domain="matter",
|
||||
translation_key=ERR_CREDENTIAL_TYPE_NOT_SUPPORTED,
|
||||
translation_placeholders={"credential_type": credential_type},
|
||||
)
|
||||
|
||||
|
||||
def _validate_credential_data(
|
||||
lock_endpoint: MatterEndpoint, credential_type: str, credential_data: str
|
||||
) -> None:
|
||||
"""Validate credential data against lock constraints.
|
||||
|
||||
For PIN: checks digits-only and length against Min/MaxPINCodeLength.
|
||||
For RFID: checks valid hex and byte length against Min/MaxRFIDCodeLength.
|
||||
Raises CredentialDataInvalidError on failure.
|
||||
"""
|
||||
if credential_type == CRED_TYPE_PIN:
|
||||
if not credential_data.isdigit():
|
||||
raise CredentialDataInvalidError(
|
||||
translation_domain="matter",
|
||||
translation_key=ERR_INVALID_CREDENTIAL_DATA,
|
||||
translation_placeholders={"reason": "PIN must contain only digits"},
|
||||
)
|
||||
min_len = (
|
||||
lock_endpoint.get_attribute_value(
|
||||
None, clusters.DoorLock.Attributes.MinPINCodeLength
|
||||
)
|
||||
or 0
|
||||
)
|
||||
max_len = (
|
||||
lock_endpoint.get_attribute_value(
|
||||
None, clusters.DoorLock.Attributes.MaxPINCodeLength
|
||||
)
|
||||
or 255
|
||||
)
|
||||
if not min_len <= len(credential_data) <= max_len:
|
||||
raise CredentialDataInvalidError(
|
||||
translation_domain="matter",
|
||||
translation_key=ERR_INVALID_CREDENTIAL_DATA,
|
||||
translation_placeholders={
|
||||
"reason": (f"PIN length must be between {min_len} and {max_len}")
|
||||
},
|
||||
)
|
||||
|
||||
elif credential_type == CRED_TYPE_RFID:
|
||||
try:
|
||||
rfid_bytes = bytes.fromhex(credential_data)
|
||||
except ValueError as err:
|
||||
raise CredentialDataInvalidError(
|
||||
translation_domain="matter",
|
||||
translation_key=ERR_INVALID_CREDENTIAL_DATA,
|
||||
translation_placeholders={
|
||||
"reason": "RFID data must be valid hexadecimal"
|
||||
},
|
||||
) from err
|
||||
min_len = (
|
||||
lock_endpoint.get_attribute_value(
|
||||
None, clusters.DoorLock.Attributes.MinRFIDCodeLength
|
||||
)
|
||||
or 0
|
||||
)
|
||||
max_len = (
|
||||
lock_endpoint.get_attribute_value(
|
||||
None, clusters.DoorLock.Attributes.MaxRFIDCodeLength
|
||||
)
|
||||
or 255
|
||||
)
|
||||
if not min_len <= len(rfid_bytes) <= max_len:
|
||||
raise CredentialDataInvalidError(
|
||||
translation_domain="matter",
|
||||
translation_key=ERR_INVALID_CREDENTIAL_DATA,
|
||||
translation_placeholders={
|
||||
"reason": (
|
||||
f"RFID data length must be between"
|
||||
f" {min_len} and {max_len} bytes"
|
||||
)
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
def _credential_data_to_bytes(credential_type: str, credential_data: str) -> bytes:
|
||||
"""Convert credential data string to bytes for the Matter command."""
|
||||
if credential_type == CRED_TYPE_RFID:
|
||||
return bytes.fromhex(credential_data)
|
||||
# PIN and other types: encode as UTF-8
|
||||
return credential_data.encode()
|
||||
|
||||
|
||||
# --- Credential business logic functions ---
|
||||
|
||||
|
||||
async def set_lock_credential(
|
||||
matter_client: MatterClient,
|
||||
node: MatterNode,
|
||||
*,
|
||||
credential_type: str,
|
||||
credential_data: str,
|
||||
credential_index: int | None = None,
|
||||
user_index: int | None = None,
|
||||
user_status: str | None = None,
|
||||
user_type: str | None = None,
|
||||
) -> SetLockCredentialResult:
|
||||
"""Add or modify a credential on the lock.
|
||||
|
||||
Returns typed dict with credential_index, user_index, and next_credential_index.
|
||||
Raises ServiceValidationError for validation failures.
|
||||
Raises HomeAssistantError for device communication failures.
|
||||
"""
|
||||
lock_endpoint = _get_lock_endpoint_or_raise(node)
|
||||
_ensure_usr_support(lock_endpoint)
|
||||
_validate_credential_type_support(lock_endpoint, credential_type)
|
||||
_validate_credential_data(lock_endpoint, credential_type, credential_data)
|
||||
|
||||
cred_type_int = CREDENTIAL_TYPE_REVERSE_MAP[credential_type]
|
||||
cred_data_bytes = _credential_data_to_bytes(credential_type, credential_data)
|
||||
|
||||
# Determine operation type and credential index
|
||||
operation_type = clusters.DoorLock.Enums.DataOperationTypeEnum.kAdd
|
||||
|
||||
if credential_index is None:
|
||||
# Auto-find first available credential slot
|
||||
max_creds = (
|
||||
lock_endpoint.get_attribute_value(
|
||||
None,
|
||||
clusters.DoorLock.Attributes.NumberOfCredentialsSupportedPerUser,
|
||||
)
|
||||
or 5
|
||||
)
|
||||
for idx in range(1, max_creds + 1):
|
||||
status_response = await matter_client.send_device_command(
|
||||
node_id=node.node_id,
|
||||
endpoint_id=lock_endpoint.endpoint_id,
|
||||
command=clusters.DoorLock.Commands.GetCredentialStatus(
|
||||
credential=clusters.DoorLock.Structs.CredentialStruct(
|
||||
credentialType=cred_type_int,
|
||||
credentialIndex=idx,
|
||||
),
|
||||
),
|
||||
)
|
||||
if not _get_attr(status_response, "credentialExists"):
|
||||
credential_index = idx
|
||||
break
|
||||
|
||||
if credential_index is None:
|
||||
raise NoAvailableUserSlotsError("No available credential slots on the lock")
|
||||
else:
|
||||
# Check if slot is occupied to determine Add vs Modify
|
||||
status_response = await matter_client.send_device_command(
|
||||
node_id=node.node_id,
|
||||
endpoint_id=lock_endpoint.endpoint_id,
|
||||
command=clusters.DoorLock.Commands.GetCredentialStatus(
|
||||
credential=clusters.DoorLock.Structs.CredentialStruct(
|
||||
credentialType=cred_type_int,
|
||||
credentialIndex=credential_index,
|
||||
),
|
||||
),
|
||||
)
|
||||
if _get_attr(status_response, "credentialExists"):
|
||||
operation_type = clusters.DoorLock.Enums.DataOperationTypeEnum.kModify
|
||||
|
||||
# Resolve optional user_status and user_type enums
|
||||
resolved_user_status = (
|
||||
USER_STATUS_REVERSE_MAP.get(user_status) if user_status is not None else None
|
||||
)
|
||||
resolved_user_type = (
|
||||
USER_TYPE_REVERSE_MAP.get(user_type) if user_type is not None else None
|
||||
)
|
||||
|
||||
set_cred_response = await matter_client.send_device_command(
|
||||
node_id=node.node_id,
|
||||
endpoint_id=lock_endpoint.endpoint_id,
|
||||
command=clusters.DoorLock.Commands.SetCredential(
|
||||
operationType=operation_type,
|
||||
credential=clusters.DoorLock.Structs.CredentialStruct(
|
||||
credentialType=cred_type_int,
|
||||
credentialIndex=credential_index,
|
||||
),
|
||||
credentialData=cred_data_bytes,
|
||||
userIndex=user_index,
|
||||
userStatus=resolved_user_status,
|
||||
userType=resolved_user_type,
|
||||
),
|
||||
timed_request_timeout_ms=LOCK_TIMED_REQUEST_TIMEOUT_MS,
|
||||
)
|
||||
|
||||
status_code = _get_attr(set_cred_response, "status")
|
||||
status_str = SET_CREDENTIAL_STATUS_MAP.get(status_code, f"unknown({status_code})")
|
||||
if status_str != "success":
|
||||
raise SetCredentialFailedError(
|
||||
translation_domain="matter",
|
||||
translation_key="set_credential_failed",
|
||||
translation_placeholders={"status": status_str},
|
||||
)
|
||||
|
||||
return SetLockCredentialResult(
|
||||
credential_index=credential_index,
|
||||
user_index=_get_attr(set_cred_response, "userIndex"),
|
||||
next_credential_index=_get_attr(set_cred_response, "nextCredentialIndex"),
|
||||
)
|
||||
|
||||
|
||||
async def clear_lock_credential(
|
||||
matter_client: MatterClient,
|
||||
node: MatterNode,
|
||||
*,
|
||||
credential_type: str,
|
||||
credential_index: int,
|
||||
) -> None:
|
||||
"""Clear a credential from the lock.
|
||||
|
||||
Raises HomeAssistantError on failure.
|
||||
"""
|
||||
lock_endpoint = _get_lock_endpoint_or_raise(node)
|
||||
_ensure_usr_support(lock_endpoint)
|
||||
|
||||
cred_type_int = CREDENTIAL_TYPE_REVERSE_MAP[credential_type]
|
||||
|
||||
await matter_client.send_device_command(
|
||||
node_id=node.node_id,
|
||||
endpoint_id=lock_endpoint.endpoint_id,
|
||||
command=clusters.DoorLock.Commands.ClearCredential(
|
||||
credential=clusters.DoorLock.Structs.CredentialStruct(
|
||||
credentialType=cred_type_int,
|
||||
credentialIndex=credential_index,
|
||||
),
|
||||
),
|
||||
timed_request_timeout_ms=LOCK_TIMED_REQUEST_TIMEOUT_MS,
|
||||
)
|
||||
|
||||
|
||||
async def get_lock_credential_status(
|
||||
matter_client: MatterClient,
|
||||
node: MatterNode,
|
||||
*,
|
||||
credential_type: str,
|
||||
credential_index: int,
|
||||
) -> GetLockCredentialStatusResult:
|
||||
"""Get the status of a credential slot on the lock.
|
||||
|
||||
Returns typed dict with credential_exists, user_index, next_credential_index.
|
||||
Raises HomeAssistantError on failure.
|
||||
"""
|
||||
lock_endpoint = _get_lock_endpoint_or_raise(node)
|
||||
_ensure_usr_support(lock_endpoint)
|
||||
|
||||
cred_type_int = CREDENTIAL_TYPE_REVERSE_MAP[credential_type]
|
||||
|
||||
response = await matter_client.send_device_command(
|
||||
node_id=node.node_id,
|
||||
endpoint_id=lock_endpoint.endpoint_id,
|
||||
command=clusters.DoorLock.Commands.GetCredentialStatus(
|
||||
credential=clusters.DoorLock.Structs.CredentialStruct(
|
||||
credentialType=cred_type_int,
|
||||
credentialIndex=credential_index,
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
return GetLockCredentialStatusResult(
|
||||
credential_exists=bool(_get_attr(response, "credentialExists")),
|
||||
user_index=_get_attr(response, "userIndex"),
|
||||
next_credential_index=_get_attr(response, "nextCredentialIndex"),
|
||||
)
|
||||
@@ -4,11 +4,27 @@ from __future__ import annotations
|
||||
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.components.lock import DOMAIN as LOCK_DOMAIN
|
||||
from homeassistant.components.water_heater import DOMAIN as WATER_HEATER_DOMAIN
|
||||
from homeassistant.core import HomeAssistant, callback
|
||||
from homeassistant.core import HomeAssistant, SupportsResponse, callback
|
||||
from homeassistant.helpers import config_validation as cv, service
|
||||
|
||||
from .const import DOMAIN
|
||||
from .const import (
|
||||
ATTR_CREDENTIAL_DATA,
|
||||
ATTR_CREDENTIAL_INDEX,
|
||||
ATTR_CREDENTIAL_RULE,
|
||||
ATTR_CREDENTIAL_TYPE,
|
||||
ATTR_USER_INDEX,
|
||||
ATTR_USER_NAME,
|
||||
ATTR_USER_STATUS,
|
||||
ATTR_USER_TYPE,
|
||||
CLEAR_ALL_INDEX,
|
||||
CREDENTIAL_RULE_REVERSE_MAP,
|
||||
CREDENTIAL_TYPE_REVERSE_MAP,
|
||||
DOMAIN,
|
||||
SERVICE_CREDENTIAL_TYPES,
|
||||
USER_TYPE_REVERSE_MAP,
|
||||
)
|
||||
|
||||
ATTR_DURATION = "duration"
|
||||
ATTR_EMERGENCY_BOOST = "emergency_boost"
|
||||
@@ -36,3 +52,108 @@ def async_setup_services(hass: HomeAssistant) -> None:
|
||||
},
|
||||
func="async_set_boost",
|
||||
)
|
||||
|
||||
# Lock services - Full user CRUD
|
||||
service.async_register_platform_entity_service(
|
||||
hass,
|
||||
DOMAIN,
|
||||
"set_lock_user",
|
||||
entity_domain=LOCK_DOMAIN,
|
||||
schema={
|
||||
vol.Optional(ATTR_USER_INDEX): vol.All(vol.Coerce(int), vol.Range(min=1)),
|
||||
vol.Optional(ATTR_USER_NAME): vol.Any(str, None),
|
||||
vol.Optional(ATTR_USER_TYPE): vol.In(USER_TYPE_REVERSE_MAP.keys()),
|
||||
vol.Optional(ATTR_CREDENTIAL_RULE): vol.In(
|
||||
CREDENTIAL_RULE_REVERSE_MAP.keys()
|
||||
),
|
||||
},
|
||||
func="async_set_lock_user",
|
||||
)
|
||||
|
||||
service.async_register_platform_entity_service(
|
||||
hass,
|
||||
DOMAIN,
|
||||
"clear_lock_user",
|
||||
entity_domain=LOCK_DOMAIN,
|
||||
schema={
|
||||
vol.Required(ATTR_USER_INDEX): vol.All(
|
||||
vol.Coerce(int),
|
||||
vol.Any(vol.Range(min=1), CLEAR_ALL_INDEX),
|
||||
),
|
||||
},
|
||||
func="async_clear_lock_user",
|
||||
)
|
||||
|
||||
# Lock services - Query operations
|
||||
service.async_register_platform_entity_service(
|
||||
hass,
|
||||
DOMAIN,
|
||||
"get_lock_info",
|
||||
entity_domain=LOCK_DOMAIN,
|
||||
schema={},
|
||||
func="async_get_lock_info",
|
||||
supports_response=SupportsResponse.ONLY,
|
||||
)
|
||||
|
||||
service.async_register_platform_entity_service(
|
||||
hass,
|
||||
DOMAIN,
|
||||
"get_lock_users",
|
||||
entity_domain=LOCK_DOMAIN,
|
||||
schema={},
|
||||
func="async_get_lock_users",
|
||||
supports_response=SupportsResponse.ONLY,
|
||||
)
|
||||
|
||||
# Lock services - Credential management
|
||||
service.async_register_platform_entity_service(
|
||||
hass,
|
||||
DOMAIN,
|
||||
"set_lock_credential",
|
||||
entity_domain=LOCK_DOMAIN,
|
||||
schema={
|
||||
vol.Required(ATTR_CREDENTIAL_TYPE): vol.In(SERVICE_CREDENTIAL_TYPES),
|
||||
vol.Required(ATTR_CREDENTIAL_DATA): str,
|
||||
vol.Optional(ATTR_CREDENTIAL_INDEX): vol.All(
|
||||
vol.Coerce(int), vol.Range(min=0)
|
||||
),
|
||||
vol.Optional(ATTR_USER_INDEX): vol.All(vol.Coerce(int), vol.Range(min=1)),
|
||||
vol.Optional(ATTR_USER_STATUS): vol.In(
|
||||
["occupied_enabled", "occupied_disabled"]
|
||||
),
|
||||
vol.Optional(ATTR_USER_TYPE): vol.In(USER_TYPE_REVERSE_MAP.keys()),
|
||||
},
|
||||
func="async_set_lock_credential",
|
||||
supports_response=SupportsResponse.ONLY,
|
||||
)
|
||||
|
||||
service.async_register_platform_entity_service(
|
||||
hass,
|
||||
DOMAIN,
|
||||
"clear_lock_credential",
|
||||
entity_domain=LOCK_DOMAIN,
|
||||
schema={
|
||||
vol.Required(ATTR_CREDENTIAL_TYPE): vol.In(SERVICE_CREDENTIAL_TYPES),
|
||||
vol.Required(ATTR_CREDENTIAL_INDEX): vol.All(
|
||||
vol.Coerce(int), vol.Range(min=0)
|
||||
),
|
||||
},
|
||||
func="async_clear_lock_credential",
|
||||
)
|
||||
|
||||
service.async_register_platform_entity_service(
|
||||
hass,
|
||||
DOMAIN,
|
||||
"get_lock_credential_status",
|
||||
entity_domain=LOCK_DOMAIN,
|
||||
schema={
|
||||
vol.Required(ATTR_CREDENTIAL_TYPE): vol.In(
|
||||
CREDENTIAL_TYPE_REVERSE_MAP.keys()
|
||||
),
|
||||
vol.Required(ATTR_CREDENTIAL_INDEX): vol.All(
|
||||
vol.Coerce(int), vol.Range(min=0)
|
||||
),
|
||||
},
|
||||
func="async_get_lock_credential_status",
|
||||
supports_response=SupportsResponse.ONLY,
|
||||
)
|
||||
|
||||
@@ -1,3 +1,177 @@
|
||||
clear_lock_credential:
|
||||
target:
|
||||
entity:
|
||||
domain: lock
|
||||
integration: matter
|
||||
fields:
|
||||
credential_type:
|
||||
selector:
|
||||
select:
|
||||
options:
|
||||
- pin
|
||||
- rfid
|
||||
- fingerprint
|
||||
- finger_vein
|
||||
- face
|
||||
required: true
|
||||
credential_index:
|
||||
selector:
|
||||
number:
|
||||
min: 0
|
||||
max: 65534
|
||||
step: 1
|
||||
mode: box
|
||||
required: true
|
||||
|
||||
clear_lock_user:
|
||||
target:
|
||||
entity:
|
||||
domain: lock
|
||||
integration: matter
|
||||
fields:
|
||||
user_index:
|
||||
selector:
|
||||
number:
|
||||
min: 1
|
||||
max: 65534
|
||||
step: 1
|
||||
mode: box
|
||||
required: true
|
||||
|
||||
get_lock_credential_status:
|
||||
target:
|
||||
entity:
|
||||
domain: lock
|
||||
integration: matter
|
||||
fields:
|
||||
credential_type:
|
||||
selector:
|
||||
select:
|
||||
options:
|
||||
- programming_pin
|
||||
- pin
|
||||
- rfid
|
||||
- fingerprint
|
||||
- finger_vein
|
||||
- face
|
||||
- aliro_credential_issuer_key
|
||||
- aliro_evictable_endpoint_key
|
||||
- aliro_non_evictable_endpoint_key
|
||||
required: true
|
||||
credential_index:
|
||||
selector:
|
||||
number:
|
||||
min: 0
|
||||
max: 65534
|
||||
step: 1
|
||||
mode: box
|
||||
required: true
|
||||
|
||||
get_lock_info:
|
||||
target:
|
||||
entity:
|
||||
domain: lock
|
||||
integration: matter
|
||||
|
||||
get_lock_users:
|
||||
target:
|
||||
entity:
|
||||
domain: lock
|
||||
integration: matter
|
||||
|
||||
set_lock_credential:
|
||||
target:
|
||||
entity:
|
||||
domain: lock
|
||||
integration: matter
|
||||
fields:
|
||||
credential_type:
|
||||
selector:
|
||||
select:
|
||||
options:
|
||||
- pin
|
||||
- rfid
|
||||
- fingerprint
|
||||
- finger_vein
|
||||
- face
|
||||
required: true
|
||||
credential_data:
|
||||
selector:
|
||||
text:
|
||||
required: true
|
||||
credential_index:
|
||||
selector:
|
||||
number:
|
||||
min: 0
|
||||
max: 65534
|
||||
step: 1
|
||||
mode: box
|
||||
user_index:
|
||||
selector:
|
||||
number:
|
||||
min: 1
|
||||
max: 65534
|
||||
step: 1
|
||||
mode: box
|
||||
user_status:
|
||||
selector:
|
||||
select:
|
||||
options:
|
||||
- occupied_enabled
|
||||
- occupied_disabled
|
||||
user_type:
|
||||
selector:
|
||||
select:
|
||||
options:
|
||||
- unrestricted_user
|
||||
- year_day_schedule_user
|
||||
- week_day_schedule_user
|
||||
- programming_user
|
||||
- non_access_user
|
||||
- forced_user
|
||||
- disposable_user
|
||||
- expiring_user
|
||||
- schedule_restricted_user
|
||||
- remote_only_user
|
||||
|
||||
set_lock_user:
|
||||
target:
|
||||
entity:
|
||||
domain: lock
|
||||
integration: matter
|
||||
fields:
|
||||
user_index:
|
||||
selector:
|
||||
number:
|
||||
min: 1
|
||||
max: 255
|
||||
step: 1
|
||||
mode: box
|
||||
user_name:
|
||||
selector:
|
||||
text:
|
||||
user_type:
|
||||
selector:
|
||||
select:
|
||||
options:
|
||||
- unrestricted_user
|
||||
- year_day_schedule_user
|
||||
- week_day_schedule_user
|
||||
- programming_user
|
||||
- non_access_user
|
||||
- forced_user
|
||||
- disposable_user
|
||||
- expiring_user
|
||||
- schedule_restricted_user
|
||||
- remote_only_user
|
||||
credential_rule:
|
||||
selector:
|
||||
select:
|
||||
options:
|
||||
- single
|
||||
- dual
|
||||
- tri
|
||||
|
||||
water_heater_boost:
|
||||
target:
|
||||
entity:
|
||||
|
||||
@@ -619,6 +619,17 @@
|
||||
}
|
||||
}
|
||||
},
|
||||
"exceptions": {
|
||||
"credential_type_not_supported": {
|
||||
"message": "The lock does not support credential type `{credential_type}`."
|
||||
},
|
||||
"invalid_credential_data": {
|
||||
"message": "Invalid credential data: {reason}."
|
||||
},
|
||||
"set_credential_failed": {
|
||||
"message": "Failed to set credential: lock returned status `{status}`."
|
||||
}
|
||||
},
|
||||
"issues": {
|
||||
"server_version_version_too_new": {
|
||||
"description": "The version of the Matter Server you are currently running is too new for this version of Home Assistant. Please update Home Assistant or downgrade the Matter Server to an older version to fix this issue.",
|
||||
@@ -630,6 +641,52 @@
|
||||
}
|
||||
},
|
||||
"services": {
|
||||
"clear_lock_credential": {
|
||||
"description": "Removes a credential from the lock.",
|
||||
"fields": {
|
||||
"credential_index": {
|
||||
"description": "The credential slot index to clear.",
|
||||
"name": "Credential index"
|
||||
},
|
||||
"credential_type": {
|
||||
"description": "The type of credential to clear.",
|
||||
"name": "Credential type"
|
||||
}
|
||||
},
|
||||
"name": "Clear lock credential"
|
||||
},
|
||||
"clear_lock_user": {
|
||||
"description": "Deletes a lock user and all associated credentials. Use index 65534 to clear all users.",
|
||||
"fields": {
|
||||
"user_index": {
|
||||
"description": "The user slot index (1-based) to clear, or 65534 to clear all.",
|
||||
"name": "User index"
|
||||
}
|
||||
},
|
||||
"name": "Clear lock user"
|
||||
},
|
||||
"get_lock_credential_status": {
|
||||
"description": "Returns the status of a credential slot on the lock.",
|
||||
"fields": {
|
||||
"credential_index": {
|
||||
"description": "The credential slot index to query.",
|
||||
"name": "Credential index"
|
||||
},
|
||||
"credential_type": {
|
||||
"description": "The type of credential to query.",
|
||||
"name": "Credential type"
|
||||
}
|
||||
},
|
||||
"name": "Get lock credential status"
|
||||
},
|
||||
"get_lock_info": {
|
||||
"description": "Returns lock capabilities including supported credential types, user capacity, and PIN length constraints.",
|
||||
"name": "Get lock info"
|
||||
},
|
||||
"get_lock_users": {
|
||||
"description": "Returns all users configured on the lock with their credentials.",
|
||||
"name": "Get lock users"
|
||||
},
|
||||
"open_commissioning_window": {
|
||||
"description": "Allows adding one of your devices to another Matter network by opening the commissioning window for this Matter device for 60 seconds.",
|
||||
"fields": {
|
||||
@@ -640,6 +697,58 @@
|
||||
},
|
||||
"name": "Open commissioning window"
|
||||
},
|
||||
"set_lock_credential": {
|
||||
"description": "Adds or updates a credential on the lock.",
|
||||
"fields": {
|
||||
"credential_data": {
|
||||
"description": "The credential data. For PIN: digits only. For RFID: hexadecimal string.",
|
||||
"name": "Credential data"
|
||||
},
|
||||
"credential_index": {
|
||||
"description": "The credential slot index. Leave empty to auto-find an available slot.",
|
||||
"name": "Credential index"
|
||||
},
|
||||
"credential_type": {
|
||||
"description": "The type of credential (e.g., pin, rfid, fingerprint).",
|
||||
"name": "Credential type"
|
||||
},
|
||||
"user_index": {
|
||||
"description": "The user index to associate the credential with. Leave empty for automatic assignment.",
|
||||
"name": "User index"
|
||||
},
|
||||
"user_status": {
|
||||
"description": "The user status to set when creating a new user for this credential.",
|
||||
"name": "User status"
|
||||
},
|
||||
"user_type": {
|
||||
"description": "The user type to set when creating a new user for this credential.",
|
||||
"name": "User type"
|
||||
}
|
||||
},
|
||||
"name": "Set lock credential"
|
||||
},
|
||||
"set_lock_user": {
|
||||
"description": "Creates or updates a lock user.",
|
||||
"fields": {
|
||||
"credential_rule": {
|
||||
"description": "The credential rule for the user.",
|
||||
"name": "Credential rule"
|
||||
},
|
||||
"user_index": {
|
||||
"description": "The user slot index (1-based). Leave empty to auto-find an available slot.",
|
||||
"name": "User index"
|
||||
},
|
||||
"user_name": {
|
||||
"description": "The name for the user.",
|
||||
"name": "User name"
|
||||
},
|
||||
"user_type": {
|
||||
"description": "The type of user to create.",
|
||||
"name": "User type"
|
||||
}
|
||||
},
|
||||
"name": "Set lock user"
|
||||
},
|
||||
"water_heater_boost": {
|
||||
"description": "Enables water heater boost for a specific duration.",
|
||||
"fields": {
|
||||
|
||||
@@ -128,6 +128,7 @@ class MonopriceZone(MediaPlayerEntity):
|
||||
)
|
||||
_attr_has_entity_name = True
|
||||
_attr_name = None
|
||||
_attr_volume_step = 1 / MAX_VOLUME
|
||||
|
||||
def __init__(self, monoprice, sources, namespace, zone_id):
|
||||
"""Initialize new zone."""
|
||||
@@ -211,17 +212,3 @@ class MonopriceZone(MediaPlayerEntity):
|
||||
def set_volume_level(self, volume: float) -> None:
|
||||
"""Set volume level, range 0..1."""
|
||||
self._monoprice.set_volume(self._zone_id, round(volume * MAX_VOLUME))
|
||||
|
||||
def volume_up(self) -> None:
|
||||
"""Volume up the media player."""
|
||||
if self.volume_level is None:
|
||||
return
|
||||
volume = round(self.volume_level * MAX_VOLUME)
|
||||
self._monoprice.set_volume(self._zone_id, min(volume + 1, MAX_VOLUME))
|
||||
|
||||
def volume_down(self) -> None:
|
||||
"""Volume down media player."""
|
||||
if self.volume_level is None:
|
||||
return
|
||||
volume = round(self.volume_level * MAX_VOLUME)
|
||||
self._monoprice.set_volume(self._zone_id, max(volume - 1, 0))
|
||||
|
||||
@@ -93,6 +93,7 @@ class MpdDevice(MediaPlayerEntity):
|
||||
_attr_media_content_type = MediaType.MUSIC
|
||||
_attr_has_entity_name = True
|
||||
_attr_name = None
|
||||
_attr_volume_step = 0.05
|
||||
|
||||
def __init__(
|
||||
self, server: str, port: int, password: str | None, unique_id: str
|
||||
@@ -393,24 +394,6 @@ class MpdDevice(MediaPlayerEntity):
|
||||
if "volume" in self._status:
|
||||
await self._client.setvol(int(volume * 100))
|
||||
|
||||
async def async_volume_up(self) -> None:
|
||||
"""Service to send the MPD the command for volume up."""
|
||||
async with self.connection():
|
||||
if "volume" in self._status:
|
||||
current_volume = int(self._status["volume"])
|
||||
|
||||
if current_volume <= 100:
|
||||
self._client.setvol(current_volume + 5)
|
||||
|
||||
async def async_volume_down(self) -> None:
|
||||
"""Service to send the MPD the command for volume down."""
|
||||
async with self.connection():
|
||||
if "volume" in self._status:
|
||||
current_volume = int(self._status["volume"])
|
||||
|
||||
if current_volume >= 0:
|
||||
await self._client.setvol(current_volume - 5)
|
||||
|
||||
async def async_media_play(self) -> None:
|
||||
"""Service to send the MPD the command for play/pause."""
|
||||
async with self.connection():
|
||||
|
||||
@@ -198,8 +198,10 @@ class NADtcp(MediaPlayerEntity):
|
||||
self._nad_receiver = NADReceiverTCP(config.get(CONF_HOST))
|
||||
self._min_vol = (config[CONF_MIN_VOLUME] + 90) * 2 # from dB to nad vol (0-200)
|
||||
self._max_vol = (config[CONF_MAX_VOLUME] + 90) * 2 # from dB to nad vol (0-200)
|
||||
self._volume_step = config[CONF_VOLUME_STEP]
|
||||
self._nad_volume = None
|
||||
vol_range = self._max_vol - self._min_vol
|
||||
if vol_range:
|
||||
self._attr_volume_step = 2 * config[CONF_VOLUME_STEP] / vol_range
|
||||
self._source_list = self._nad_receiver.available_sources()
|
||||
|
||||
def turn_off(self) -> None:
|
||||
@@ -210,14 +212,6 @@ class NADtcp(MediaPlayerEntity):
|
||||
"""Turn the media player on."""
|
||||
self._nad_receiver.power_on()
|
||||
|
||||
def volume_up(self) -> None:
|
||||
"""Step volume up in the configured increments."""
|
||||
self._nad_receiver.set_volume(self._nad_volume + 2 * self._volume_step)
|
||||
|
||||
def volume_down(self) -> None:
|
||||
"""Step volume down in the configured increments."""
|
||||
self._nad_receiver.set_volume(self._nad_volume - 2 * self._volume_step)
|
||||
|
||||
def set_volume_level(self, volume: float) -> None:
|
||||
"""Set volume level, range 0..1."""
|
||||
nad_volume_to_set = int(
|
||||
|
||||
@@ -4,13 +4,19 @@ from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
|
||||
from powerfox import DeviceType, Powerfox, PowerfoxConnectionError
|
||||
from powerfox import (
|
||||
DeviceType,
|
||||
Powerfox,
|
||||
PowerfoxAuthenticationError,
|
||||
PowerfoxConnectionError,
|
||||
)
|
||||
|
||||
from homeassistant.const import CONF_EMAIL, CONF_PASSWORD, Platform
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.exceptions import ConfigEntryNotReady
|
||||
from homeassistant.exceptions import ConfigEntryAuthFailed, ConfigEntryNotReady
|
||||
from homeassistant.helpers.aiohttp_client import async_get_clientsession
|
||||
|
||||
from .const import DOMAIN
|
||||
from .coordinator import (
|
||||
PowerfoxConfigEntry,
|
||||
PowerfoxDataUpdateCoordinator,
|
||||
@@ -30,9 +36,18 @@ async def async_setup_entry(hass: HomeAssistant, entry: PowerfoxConfigEntry) ->
|
||||
|
||||
try:
|
||||
devices = await client.all_devices()
|
||||
except PowerfoxAuthenticationError as err:
|
||||
await client.close()
|
||||
raise ConfigEntryAuthFailed(
|
||||
translation_domain=DOMAIN,
|
||||
translation_key="auth_failed",
|
||||
) from err
|
||||
except PowerfoxConnectionError as err:
|
||||
await client.close()
|
||||
raise ConfigEntryNotReady from err
|
||||
raise ConfigEntryNotReady(
|
||||
translation_domain=DOMAIN,
|
||||
translation_key="connection_error",
|
||||
) from err
|
||||
|
||||
coordinators: list[
|
||||
PowerfoxDataUpdateCoordinator | PowerfoxReportDataUpdateCoordinator
|
||||
|
||||
@@ -59,18 +59,24 @@ class PowerfoxBaseCoordinator[T](DataUpdateCoordinator[T]):
|
||||
except PowerfoxAuthenticationError as err:
|
||||
raise ConfigEntryAuthFailed(
|
||||
translation_domain=DOMAIN,
|
||||
translation_key="invalid_auth",
|
||||
translation_placeholders={"error": str(err)},
|
||||
translation_key="auth_failed",
|
||||
) from err
|
||||
except (
|
||||
PowerfoxConnectionError,
|
||||
PowerfoxNoDataError,
|
||||
PowerfoxPrivacyError,
|
||||
) as err:
|
||||
except PowerfoxConnectionError as err:
|
||||
raise UpdateFailed(
|
||||
translation_domain=DOMAIN,
|
||||
translation_key="update_failed",
|
||||
translation_placeholders={"error": str(err)},
|
||||
translation_key="connection_error",
|
||||
) from err
|
||||
except PowerfoxNoDataError as err:
|
||||
raise UpdateFailed(
|
||||
translation_domain=DOMAIN,
|
||||
translation_key="no_data_error",
|
||||
translation_placeholders={"device_name": self.device.name},
|
||||
) from err
|
||||
except PowerfoxPrivacyError as err:
|
||||
raise UpdateFailed(
|
||||
translation_domain=DOMAIN,
|
||||
translation_key="privacy_error",
|
||||
translation_placeholders={"device_name": self.device.name},
|
||||
) from err
|
||||
|
||||
async def _async_fetch_data(self) -> T:
|
||||
|
||||
@@ -116,11 +116,17 @@
|
||||
}
|
||||
},
|
||||
"exceptions": {
|
||||
"invalid_auth": {
|
||||
"message": "Error while authenticating with the Powerfox service: {error}"
|
||||
"auth_failed": {
|
||||
"message": "Authentication with the Powerfox service failed. Please re-authenticate your account."
|
||||
},
|
||||
"update_failed": {
|
||||
"message": "Error while updating the Powerfox service: {error}"
|
||||
"connection_error": {
|
||||
"message": "Could not connect to the Powerfox service. Please check your network connection."
|
||||
},
|
||||
"no_data_error": {
|
||||
"message": "No data available for device \"{device_name}\". The device may not have reported data yet."
|
||||
},
|
||||
"privacy_error": {
|
||||
"message": "Data for device \"{device_name}\" is restricted due to privacy settings in the Powerfox app."
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -66,6 +66,7 @@ from .repairs import (
|
||||
from .services import async_setup_services
|
||||
from .utils import (
|
||||
async_create_issue_unsupported_firmware,
|
||||
async_migrate_rpc_sensor_description_unique_ids,
|
||||
async_migrate_rpc_virtual_components_unique_ids,
|
||||
get_coap_context,
|
||||
get_device_entry_gen,
|
||||
@@ -296,6 +297,12 @@ async def _async_setup_rpc_entry(hass: HomeAssistant, entry: ShellyConfigEntry)
|
||||
runtime_data = entry.runtime_data
|
||||
runtime_data.platforms = RPC_SLEEPING_PLATFORMS
|
||||
|
||||
await er.async_migrate_entries(
|
||||
hass,
|
||||
entry.entry_id,
|
||||
async_migrate_rpc_sensor_description_unique_ids,
|
||||
)
|
||||
|
||||
if sleep_period == 0:
|
||||
# Not a sleeping device, finish setup
|
||||
LOGGER.debug("Setting up online RPC device %s", entry.title)
|
||||
|
||||
@@ -1220,7 +1220,7 @@ RPC_SENSORS: Final = {
|
||||
entity_category=EntityCategory.DIAGNOSTIC,
|
||||
use_polling_coordinator=True,
|
||||
),
|
||||
"temperature_0": RpcSensorDescription(
|
||||
"temperature_tc": RpcSensorDescription(
|
||||
key="temperature",
|
||||
sub_key="tC",
|
||||
native_unit_of_measurement=UnitOfTemperature.CELSIUS,
|
||||
@@ -1249,7 +1249,7 @@ RPC_SENSORS: Final = {
|
||||
entity_category=EntityCategory.DIAGNOSTIC,
|
||||
use_polling_coordinator=True,
|
||||
),
|
||||
"humidity_0": RpcSensorDescription(
|
||||
"humidity_rh": RpcSensorDescription(
|
||||
key="humidity",
|
||||
sub_key="rh",
|
||||
native_unit_of_measurement=PERCENTAGE,
|
||||
|
||||
@@ -969,6 +969,30 @@ def format_ble_addr(ble_addr: str) -> str:
|
||||
return ble_addr.replace(":", "").upper()
|
||||
|
||||
|
||||
@callback
|
||||
def async_migrate_rpc_sensor_description_unique_ids(
|
||||
entity_entry: er.RegistryEntry,
|
||||
) -> dict[str, Any] | None:
|
||||
"""Migrate RPC sensor unique_ids after sensor description key rename."""
|
||||
unique_id_map = {
|
||||
"-temperature_0": "-temperature_tc",
|
||||
"-humidity_0": "-humidity_rh",
|
||||
}
|
||||
|
||||
for old_suffix, new_suffix in unique_id_map.items():
|
||||
if entity_entry.unique_id.endswith(old_suffix):
|
||||
new_unique_id = entity_entry.unique_id.removesuffix(old_suffix) + new_suffix
|
||||
LOGGER.debug(
|
||||
"Migrating unique_id for %s entity from [%s] to [%s]",
|
||||
entity_entry.entity_id,
|
||||
entity_entry.unique_id,
|
||||
new_unique_id,
|
||||
)
|
||||
return {"new_unique_id": new_unique_id}
|
||||
|
||||
return None
|
||||
|
||||
|
||||
@callback
|
||||
def async_migrate_rpc_virtual_components_unique_ids(
|
||||
config: dict[str, Any], entity_entry: er.RegistryEntry
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
import asyncio
|
||||
from collections.abc import Callable
|
||||
from functools import partial
|
||||
from typing import Final
|
||||
from typing import Any, Final
|
||||
|
||||
from aiohttp import ClientError, ClientResponseError
|
||||
from tesla_fleet_api.const import Scope
|
||||
@@ -106,7 +106,7 @@ async def _get_access_token(oauth_session: OAuth2Session) -> str:
|
||||
translation_domain=DOMAIN,
|
||||
translation_key="not_ready_connection_error",
|
||||
) from err
|
||||
return oauth_session.token[CONF_ACCESS_TOKEN]
|
||||
return str(oauth_session.token[CONF_ACCESS_TOKEN])
|
||||
|
||||
|
||||
async def async_setup_entry(hass: HomeAssistant, entry: TeslemetryConfigEntry) -> bool:
|
||||
@@ -227,7 +227,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: TeslemetryConfigEntry) -
|
||||
stream=stream,
|
||||
stream_vehicle=stream_vehicle,
|
||||
vin=vin,
|
||||
firmware=firmware,
|
||||
firmware=firmware or "",
|
||||
device=device,
|
||||
)
|
||||
)
|
||||
@@ -398,10 +398,12 @@ async def async_migrate_entry(
|
||||
return True
|
||||
|
||||
|
||||
def create_handle_vehicle_stream(vin: str, coordinator) -> Callable[[dict], None]:
|
||||
def create_handle_vehicle_stream(
|
||||
vin: str, coordinator: TeslemetryVehicleDataCoordinator
|
||||
) -> Callable[[dict[str, Any]], None]:
|
||||
"""Create a handle vehicle stream function."""
|
||||
|
||||
def handle_vehicle_stream(data: dict) -> None:
|
||||
def handle_vehicle_stream(data: dict[str, Any]) -> None:
|
||||
"""Handle vehicle data from the stream."""
|
||||
if "vehicle_data" in data:
|
||||
LOGGER.debug("Streaming received vehicle data from %s", vin)
|
||||
@@ -450,7 +452,7 @@ def async_setup_energy_device(
|
||||
|
||||
async def async_setup_stream(
|
||||
hass: HomeAssistant, entry: TeslemetryConfigEntry, vehicle: TeslemetryVehicleData
|
||||
):
|
||||
) -> None:
|
||||
"""Set up the stream for a vehicle."""
|
||||
|
||||
await vehicle.stream_vehicle.get_config()
|
||||
|
||||
@@ -329,11 +329,11 @@ class TeslemetryStreamingClimateEntity(
|
||||
)
|
||||
)
|
||||
|
||||
def _async_handle_inside_temp(self, data: float | None):
|
||||
def _async_handle_inside_temp(self, data: float | None) -> None:
|
||||
self._attr_current_temperature = data
|
||||
self.async_write_ha_state()
|
||||
|
||||
def _async_handle_hvac_power(self, data: str | None):
|
||||
def _async_handle_hvac_power(self, data: str | None) -> None:
|
||||
self._attr_hvac_mode = (
|
||||
None
|
||||
if data is None
|
||||
@@ -343,15 +343,15 @@ class TeslemetryStreamingClimateEntity(
|
||||
)
|
||||
self.async_write_ha_state()
|
||||
|
||||
def _async_handle_climate_keeper_mode(self, data: str | None):
|
||||
def _async_handle_climate_keeper_mode(self, data: str | None) -> None:
|
||||
self._attr_preset_mode = PRESET_MODES.get(data) if data else None
|
||||
self.async_write_ha_state()
|
||||
|
||||
def _async_handle_hvac_temperature_request(self, data: float | None):
|
||||
def _async_handle_hvac_temperature_request(self, data: float | None) -> None:
|
||||
self._attr_target_temperature = data
|
||||
self.async_write_ha_state()
|
||||
|
||||
def _async_handle_rhd(self, data: bool | None):
|
||||
def _async_handle_rhd(self, data: bool | None) -> None:
|
||||
if data is not None:
|
||||
self.rhd = data
|
||||
|
||||
@@ -538,15 +538,15 @@ class TeslemetryStreamingCabinOverheatProtectionEntity(
|
||||
)
|
||||
)
|
||||
|
||||
def _async_handle_inside_temp(self, value: float | None):
|
||||
def _async_handle_inside_temp(self, value: float | None) -> None:
|
||||
self._attr_current_temperature = value
|
||||
self.async_write_ha_state()
|
||||
|
||||
def _async_handle_protection_mode(self, value: str | None):
|
||||
def _async_handle_protection_mode(self, value: str | None) -> None:
|
||||
self._attr_hvac_mode = COP_MODES.get(value) if value is not None else None
|
||||
self.async_write_ha_state()
|
||||
|
||||
def _async_handle_temperature_limit(self, value: str | None):
|
||||
def _async_handle_temperature_limit(self, value: str | None) -> None:
|
||||
self._attr_target_temperature = (
|
||||
COP_LEVELS.get(value) if value is not None else None
|
||||
)
|
||||
|
||||
@@ -70,7 +70,7 @@ class TeslemetryVehicleDataCoordinator(DataUpdateCoordinator[dict[str, Any]]):
|
||||
hass: HomeAssistant,
|
||||
config_entry: TeslemetryConfigEntry,
|
||||
api: Vehicle,
|
||||
product: dict,
|
||||
product: dict[str, Any],
|
||||
) -> None:
|
||||
"""Initialize Teslemetry Vehicle Update Coordinator."""
|
||||
super().__init__(
|
||||
@@ -119,7 +119,7 @@ class TeslemetryEnergySiteLiveCoordinator(DataUpdateCoordinator[dict[str, Any]])
|
||||
hass: HomeAssistant,
|
||||
config_entry: TeslemetryConfigEntry,
|
||||
api: EnergySite,
|
||||
data: dict,
|
||||
data: dict[str, Any],
|
||||
) -> None:
|
||||
"""Initialize Teslemetry Energy Site Live coordinator."""
|
||||
super().__init__(
|
||||
@@ -140,7 +140,7 @@ class TeslemetryEnergySiteLiveCoordinator(DataUpdateCoordinator[dict[str, Any]])
|
||||
async def _async_update_data(self) -> dict[str, Any]:
|
||||
"""Update energy site data using Teslemetry API."""
|
||||
try:
|
||||
data = (await self.api.live_status())["response"]
|
||||
data: dict[str, Any] = (await self.api.live_status())["response"]
|
||||
except (InvalidToken, SubscriptionRequired) as e:
|
||||
raise ConfigEntryAuthFailed from e
|
||||
except RETRY_EXCEPTIONS as e:
|
||||
@@ -171,7 +171,7 @@ class TeslemetryEnergySiteInfoCoordinator(DataUpdateCoordinator[dict[str, Any]])
|
||||
hass: HomeAssistant,
|
||||
config_entry: TeslemetryConfigEntry,
|
||||
api: EnergySite,
|
||||
product: dict,
|
||||
product: dict[str, Any],
|
||||
) -> None:
|
||||
"""Initialize Teslemetry Energy Info coordinator."""
|
||||
super().__init__(
|
||||
|
||||
@@ -199,7 +199,7 @@ class TeslemetryStreamingWindowEntity(
|
||||
f"Adding field {signal} to {self.vehicle.vin}",
|
||||
)
|
||||
|
||||
def _handle_stream_update(self, data) -> None:
|
||||
def _handle_stream_update(self, data: dict[str, Any]) -> None:
|
||||
"""Update the entity attributes."""
|
||||
|
||||
change = False
|
||||
|
||||
@@ -28,7 +28,7 @@ class TeslemetryRootEntity(Entity):
|
||||
_attr_has_entity_name = True
|
||||
scoped: bool
|
||||
|
||||
def raise_for_scope(self, scope: Scope):
|
||||
def raise_for_scope(self, scope: Scope) -> None:
|
||||
"""Raise an error if a scope is not available."""
|
||||
if not self.scoped:
|
||||
raise ServiceValidationError(
|
||||
@@ -231,11 +231,12 @@ class TeslemetryWallConnectorEntity(TeslemetryPollingEntity):
|
||||
@property
|
||||
def _value(self) -> StateType:
|
||||
"""Return a specific wall connector value from coordinator data."""
|
||||
return (
|
||||
value: StateType = (
|
||||
self.coordinator.data.get("wall_connectors", {})
|
||||
.get(self.din, {})
|
||||
.get(self.key)
|
||||
)
|
||||
return value
|
||||
|
||||
@property
|
||||
def exists(self) -> bool:
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
"""Teslemetry helper functions."""
|
||||
|
||||
from collections.abc import Awaitable
|
||||
from typing import Any
|
||||
|
||||
from tesla_fleet_api.exceptions import TeslaFleetError
|
||||
@@ -30,7 +31,7 @@ def flatten(
|
||||
return result
|
||||
|
||||
|
||||
async def handle_command(command) -> dict[str, Any]:
|
||||
async def handle_command(command: Awaitable[dict[str, Any]]) -> dict[str, Any]:
|
||||
"""Handle a command."""
|
||||
try:
|
||||
result = await command
|
||||
@@ -44,7 +45,7 @@ async def handle_command(command) -> dict[str, Any]:
|
||||
return result
|
||||
|
||||
|
||||
async def handle_vehicle_command(command) -> Any:
|
||||
async def handle_vehicle_command(command: Awaitable[dict[str, Any]]) -> Any:
|
||||
"""Handle a vehicle command."""
|
||||
result = await handle_command(command)
|
||||
if (response := result.get("response")) is None:
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from dataclasses import dataclass
|
||||
from dataclasses import dataclass, field
|
||||
|
||||
from tesla_fleet_api.const import Scope
|
||||
from tesla_fleet_api.teslemetry import EnergySite, Vehicle
|
||||
@@ -43,7 +43,7 @@ class TeslemetryVehicleData:
|
||||
vin: str
|
||||
firmware: str
|
||||
device: DeviceInfo
|
||||
wakelock = asyncio.Lock()
|
||||
wakelock: asyncio.Lock = field(default_factory=asyncio.Lock)
|
||||
|
||||
|
||||
@dataclass
|
||||
|
||||
@@ -66,4 +66,4 @@ rules:
|
||||
# Platinum
|
||||
async-dependency: done
|
||||
inject-websession: done
|
||||
strict-typing: todo
|
||||
strict-typing: done
|
||||
|
||||
@@ -188,7 +188,7 @@ class TeslemetryStreamingUpdateEntity(
|
||||
|
||||
def _async_handle_software_update_download_percent_complete(
|
||||
self, value: float | None
|
||||
):
|
||||
) -> None:
|
||||
"""Handle software update download percent complete."""
|
||||
|
||||
self._download_percentage = round(value) if value is not None else 0
|
||||
@@ -203,20 +203,22 @@ class TeslemetryStreamingUpdateEntity(
|
||||
|
||||
def _async_handle_software_update_installation_percent_complete(
|
||||
self, value: float | None
|
||||
):
|
||||
) -> None:
|
||||
"""Handle software update installation percent complete."""
|
||||
|
||||
self._install_percentage = round(value) if value is not None else 0
|
||||
self._async_update_progress()
|
||||
self.async_write_ha_state()
|
||||
|
||||
def _async_handle_software_update_scheduled_start_time(self, value: str | None):
|
||||
def _async_handle_software_update_scheduled_start_time(
|
||||
self, value: str | None
|
||||
) -> None:
|
||||
"""Handle software update scheduled start time."""
|
||||
|
||||
self._attr_in_progress = value is not None
|
||||
self.async_write_ha_state()
|
||||
|
||||
def _async_handle_software_update_version(self, value: str | None):
|
||||
def _async_handle_software_update_version(self, value: str | None) -> None:
|
||||
"""Handle software update version."""
|
||||
|
||||
self._attr_latest_version = (
|
||||
@@ -224,7 +226,7 @@ class TeslemetryStreamingUpdateEntity(
|
||||
)
|
||||
self.async_write_ha_state()
|
||||
|
||||
def _async_handle_version(self, value: str | None):
|
||||
def _async_handle_version(self, value: str | None) -> None:
|
||||
"""Handle version."""
|
||||
|
||||
if value is not None:
|
||||
|
||||
10
mypy.ini
generated
10
mypy.ini
generated
@@ -5208,6 +5208,16 @@ disallow_untyped_defs = true
|
||||
warn_return_any = true
|
||||
warn_unreachable = true
|
||||
|
||||
[mypy-homeassistant.components.teslemetry.*]
|
||||
check_untyped_defs = true
|
||||
disallow_incomplete_defs = true
|
||||
disallow_subclassing_any = true
|
||||
disallow_untyped_calls = true
|
||||
disallow_untyped_decorators = true
|
||||
disallow_untyped_defs = true
|
||||
warn_return_any = true
|
||||
warn_unreachable = true
|
||||
|
||||
[mypy-homeassistant.components.text.*]
|
||||
check_untyped_defs = true
|
||||
disallow_incomplete_defs = true
|
||||
|
||||
2
requirements_all.txt
generated
2
requirements_all.txt
generated
@@ -1116,7 +1116,7 @@ goslide-api==0.7.0
|
||||
gotailwind==0.3.0
|
||||
|
||||
# homeassistant.components.govee_ble
|
||||
govee-ble==0.44.0
|
||||
govee-ble==1.2.0
|
||||
|
||||
# homeassistant.components.govee_light_local
|
||||
govee-local-api==2.3.0
|
||||
|
||||
2
requirements_test_all.txt
generated
2
requirements_test_all.txt
generated
@@ -992,7 +992,7 @@ goslide-api==0.7.0
|
||||
gotailwind==0.3.0
|
||||
|
||||
# homeassistant.components.govee_ble
|
||||
govee-ble==0.44.0
|
||||
govee-ble==1.2.0
|
||||
|
||||
# homeassistant.components.govee_light_local
|
||||
govee-local-api==2.3.0
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
from collections.abc import AsyncGenerator, Generator, Iterable
|
||||
import datetime
|
||||
from unittest.mock import AsyncMock, patch
|
||||
from unittest.mock import DEFAULT, AsyncMock, patch
|
||||
|
||||
from anthropic.pagination import AsyncPage
|
||||
from anthropic.types import (
|
||||
@@ -239,8 +239,10 @@ def mock_create_stream() -> Generator[AsyncMock]:
|
||||
"anthropic.resources.messages.AsyncMessages.create",
|
||||
new_callable=AsyncMock,
|
||||
) as mock_create:
|
||||
mock_create.side_effect = lambda **kwargs: mock_generator(
|
||||
mock_create.return_value.pop(0), **kwargs
|
||||
mock_create.side_effect = lambda **kwargs: (
|
||||
mock_generator(mock_create.return_value.pop(0), **kwargs)
|
||||
if isinstance(mock_create.return_value, list)
|
||||
else DEFAULT
|
||||
)
|
||||
|
||||
yield mock_create
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
from pathlib import Path
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
from anthropic.types import Message, TextBlock, Usage
|
||||
from freezegun import freeze_time
|
||||
import pytest
|
||||
from syrupy.assertion import SnapshotAssertion
|
||||
@@ -71,7 +72,6 @@ async def test_empty_data(
|
||||
mock_config_entry: MockConfigEntry,
|
||||
mock_init_component,
|
||||
mock_create_stream: AsyncMock,
|
||||
entity_registry: er.EntityRegistry,
|
||||
) -> None:
|
||||
"""Test AI Task data generation but the data returned is empty."""
|
||||
mock_create_stream.return_value = [create_content_block(0, [""])]
|
||||
@@ -87,6 +87,31 @@ async def test_empty_data(
|
||||
)
|
||||
|
||||
|
||||
async def test_stream_wrong_type(
|
||||
hass: HomeAssistant,
|
||||
mock_config_entry: MockConfigEntry,
|
||||
mock_init_component,
|
||||
mock_create_stream: AsyncMock,
|
||||
) -> None:
|
||||
"""Test error if the response is not a stream."""
|
||||
mock_create_stream.return_value = Message(
|
||||
type="message",
|
||||
id="message_id",
|
||||
model="claude-opus-4-6",
|
||||
role="assistant",
|
||||
content=[TextBlock(type="text", text="This is not a stream")],
|
||||
usage=Usage(input_tokens=42, output_tokens=42),
|
||||
)
|
||||
|
||||
with pytest.raises(HomeAssistantError, match="Expected a stream of messages"):
|
||||
await ai_task.async_generate_data(
|
||||
hass,
|
||||
task_name="Test Task",
|
||||
entity_id="ai_task.claude_ai_task",
|
||||
instructions="Generate test data",
|
||||
)
|
||||
|
||||
|
||||
@freeze_time("2026-01-01 12:00:00")
|
||||
async def test_generate_structured_data_legacy(
|
||||
hass: HomeAssistant,
|
||||
|
||||
@@ -8,10 +8,13 @@ from anthropic import AuthenticationError, RateLimitError
|
||||
from anthropic.types import (
|
||||
CitationsWebSearchResultLocation,
|
||||
CitationWebSearchResultLocationParam,
|
||||
Message,
|
||||
TextBlock,
|
||||
TextEditorCodeExecutionCreateResultBlock,
|
||||
TextEditorCodeExecutionStrReplaceResultBlock,
|
||||
TextEditorCodeExecutionToolResultError,
|
||||
TextEditorCodeExecutionViewResultBlock,
|
||||
Usage,
|
||||
WebSearchResultBlock,
|
||||
)
|
||||
from anthropic.types.text_editor_code_execution_tool_result_block import (
|
||||
@@ -584,6 +587,68 @@ async def test_refusal(
|
||||
)
|
||||
|
||||
|
||||
async def test_stream_wrong_type(
|
||||
hass: HomeAssistant,
|
||||
mock_config_entry: MockConfigEntry,
|
||||
mock_init_component,
|
||||
mock_create_stream: AsyncMock,
|
||||
) -> None:
|
||||
"""Test error if the response is not a stream."""
|
||||
mock_create_stream.return_value = Message(
|
||||
type="message",
|
||||
id="message_id",
|
||||
model="claude-opus-4-6",
|
||||
role="assistant",
|
||||
content=[TextBlock(type="text", text="This is not a stream")],
|
||||
usage=Usage(input_tokens=42, output_tokens=42),
|
||||
)
|
||||
|
||||
result = await conversation.async_converse(
|
||||
hass,
|
||||
"Hi",
|
||||
None,
|
||||
Context(),
|
||||
agent_id="conversation.claude_conversation",
|
||||
)
|
||||
|
||||
assert result.response.response_type == intent.IntentResponseType.ERROR
|
||||
assert result.response.error_code == "unknown"
|
||||
assert result.response.speech["plain"]["speech"] == "Expected a stream of messages"
|
||||
|
||||
|
||||
async def test_double_system_messages(
|
||||
hass: HomeAssistant,
|
||||
mock_config_entry_with_assist: MockConfigEntry,
|
||||
mock_init_component,
|
||||
mock_create_stream: AsyncMock,
|
||||
) -> None:
|
||||
"""Test error for two or more system prompts."""
|
||||
conversation_id = "conversation_id"
|
||||
with (
|
||||
chat_session.async_get_chat_session(hass, conversation_id) as session,
|
||||
conversation.async_get_chat_log(hass, session) as chat_log,
|
||||
):
|
||||
chat_log.content = [
|
||||
conversation.chat_log.SystemContent("You are a helpful assistant."),
|
||||
conversation.chat_log.SystemContent("And I am the user."),
|
||||
]
|
||||
|
||||
result = await conversation.async_converse(
|
||||
hass,
|
||||
"What time is it?",
|
||||
conversation_id,
|
||||
Context(),
|
||||
agent_id="conversation.claude_conversation",
|
||||
)
|
||||
|
||||
assert result.response.response_type == intent.IntentResponseType.ERROR
|
||||
assert result.response.error_code == "unknown"
|
||||
assert (
|
||||
result.response.speech["plain"]["speech"]
|
||||
== "Unexpected content type in chat log"
|
||||
)
|
||||
|
||||
|
||||
async def test_extended_thinking(
|
||||
hass: HomeAssistant,
|
||||
mock_config_entry: MockConfigEntry,
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -5,6 +5,7 @@ from __future__ import annotations
|
||||
from unittest.mock import AsyncMock
|
||||
|
||||
from powerfox import PowerfoxAuthenticationError, PowerfoxConnectionError
|
||||
import pytest
|
||||
|
||||
from homeassistant.config_entries import ConfigEntryState
|
||||
from homeassistant.core import HomeAssistant
|
||||
@@ -45,16 +46,20 @@ async def test_config_entry_not_ready(
|
||||
assert mock_config_entry.state is ConfigEntryState.SETUP_RETRY
|
||||
|
||||
|
||||
async def test_setup_entry_exception(
|
||||
@pytest.mark.parametrize("method", ["all_devices", "device"])
|
||||
async def test_config_entry_auth_failed(
|
||||
hass: HomeAssistant,
|
||||
mock_powerfox_client: AsyncMock,
|
||||
mock_config_entry: MockConfigEntry,
|
||||
method: str,
|
||||
) -> None:
|
||||
"""Test ConfigEntryNotReady when API raises an exception during entry setup."""
|
||||
"""Test ConfigEntryAuthFailed when authentication fails."""
|
||||
getattr(mock_powerfox_client, method).side_effect = PowerfoxAuthenticationError
|
||||
mock_config_entry.add_to_hass(hass)
|
||||
mock_powerfox_client.device.side_effect = PowerfoxAuthenticationError
|
||||
|
||||
await hass.config_entries.async_setup(mock_config_entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert mock_config_entry.state is ConfigEntryState.SETUP_ERROR
|
||||
|
||||
flows = hass.config_entries.flow.async_progress()
|
||||
|
||||
@@ -6,7 +6,12 @@ from datetime import timedelta
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
from freezegun.api import FrozenDateTimeFactory
|
||||
from powerfox import DeviceReport, PowerfoxConnectionError
|
||||
from powerfox import (
|
||||
DeviceReport,
|
||||
PowerfoxConnectionError,
|
||||
PowerfoxNoDataError,
|
||||
PowerfoxPrivacyError,
|
||||
)
|
||||
import pytest
|
||||
from syrupy.assertion import SnapshotAssertion
|
||||
|
||||
@@ -35,11 +40,16 @@ async def test_all_sensors(
|
||||
await snapshot_platform(hass, entity_registry, snapshot, mock_config_entry.entry_id)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"exception",
|
||||
[PowerfoxConnectionError, PowerfoxNoDataError, PowerfoxPrivacyError],
|
||||
)
|
||||
async def test_update_failed(
|
||||
hass: HomeAssistant,
|
||||
mock_powerfox_client: AsyncMock,
|
||||
mock_config_entry: MockConfigEntry,
|
||||
freezer: FrozenDateTimeFactory,
|
||||
exception: Exception,
|
||||
) -> None:
|
||||
"""Test entities become unavailable after failed update."""
|
||||
await setup_integration(hass, mock_config_entry)
|
||||
@@ -47,7 +57,7 @@ async def test_update_failed(
|
||||
|
||||
assert hass.states.get("sensor.poweropti_energy_usage").state is not None
|
||||
|
||||
mock_powerfox_client.device.side_effect = PowerfoxConnectionError
|
||||
mock_powerfox_client.device.side_effect = exception
|
||||
freezer.tick(timedelta(minutes=5))
|
||||
async_fire_time_changed(hass)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
@@ -5902,7 +5902,7 @@
|
||||
'suggested_object_id': None,
|
||||
'supported_features': 0,
|
||||
'translation_key': None,
|
||||
'unique_id': '123456789ABC-humidity:0-humidity_0',
|
||||
'unique_id': '123456789ABC-humidity:0-humidity_rh',
|
||||
'unit_of_measurement': '%',
|
||||
})
|
||||
# ---
|
||||
@@ -6178,7 +6178,7 @@
|
||||
'suggested_object_id': None,
|
||||
'supported_features': 0,
|
||||
'translation_key': None,
|
||||
'unique_id': '123456789ABC-temperature:0-temperature_0',
|
||||
'unique_id': '123456789ABC-temperature:0-temperature_tc',
|
||||
'unit_of_measurement': <UnitOfTemperature.CELSIUS: '°C'>,
|
||||
})
|
||||
# ---
|
||||
@@ -11364,7 +11364,7 @@
|
||||
'suggested_object_id': None,
|
||||
'supported_features': 0,
|
||||
'translation_key': None,
|
||||
'unique_id': '123456789ABC-temperature:0-temperature_0',
|
||||
'unique_id': '123456789ABC-temperature:0-temperature_tc',
|
||||
'unit_of_measurement': <UnitOfTemperature.CELSIUS: '°C'>,
|
||||
})
|
||||
# ---
|
||||
|
||||
@@ -616,7 +616,7 @@ async def test_rpc_update_entry_sleep_period(
|
||||
hass,
|
||||
SENSOR_DOMAIN,
|
||||
"test_name_temperature",
|
||||
"temperature:0-temperature_0",
|
||||
"temperature:0-temperature_tc",
|
||||
entry,
|
||||
)
|
||||
|
||||
@@ -650,7 +650,7 @@ async def test_rpc_sleeping_device_no_periodic_updates(
|
||||
hass,
|
||||
SENSOR_DOMAIN,
|
||||
"test_name_temperature",
|
||||
"temperature:0-temperature_0",
|
||||
"temperature:0-temperature_tc",
|
||||
entry,
|
||||
)
|
||||
|
||||
|
||||
@@ -637,9 +637,6 @@ async def test_rpc_sleeping_sensor(
|
||||
monkeypatch.setitem(mock_rpc_device.status["sys"], "wakeup_period", 1000)
|
||||
await init_integration(hass, 2, sleep_period=1000)
|
||||
|
||||
# Sensor should be created when device is online
|
||||
assert hass.states.get(entity_id) is None
|
||||
|
||||
# Make device online
|
||||
mock_rpc_device.mock_online()
|
||||
await hass.async_block_till_done(wait_background_tasks=True)
|
||||
@@ -669,9 +666,6 @@ async def test_rpc_sleeping_sensor_with_channel_name(
|
||||
monkeypatch.setitem(mock_rpc_device.status["sys"], "wakeup_period", 1000)
|
||||
await init_integration(hass, 2, sleep_period=1000)
|
||||
|
||||
# Sensor should be created when device is online
|
||||
assert hass.states.get(entity_id) is None
|
||||
|
||||
# Make device online
|
||||
mock_rpc_device.mock_online()
|
||||
await hass.async_block_till_done(wait_background_tasks=True)
|
||||
@@ -700,7 +694,7 @@ async def test_rpc_restored_sleeping_sensor(
|
||||
hass,
|
||||
SENSOR_DOMAIN,
|
||||
"test_name_temperature",
|
||||
"temperature:0-temperature_0",
|
||||
"temperature:0-temperature_tc",
|
||||
entry,
|
||||
device_id=device.id,
|
||||
)
|
||||
@@ -747,7 +741,7 @@ async def test_rpc_restored_sleeping_sensor_no_last_state(
|
||||
hass,
|
||||
SENSOR_DOMAIN,
|
||||
"test_name_temperature",
|
||||
"temperature:0-temperature_0",
|
||||
"temperature:0-temperature_tc",
|
||||
entry,
|
||||
device_id=device.id,
|
||||
)
|
||||
@@ -824,9 +818,6 @@ async def test_rpc_sleeping_update_entity_service(
|
||||
monkeypatch.setitem(mock_rpc_device.status["sys"], "wakeup_period", 1000)
|
||||
await init_integration(hass, 2, sleep_period=1000)
|
||||
|
||||
# Entity should be created when device is online
|
||||
assert hass.states.get(entity_id) is None
|
||||
|
||||
# Make device online
|
||||
mock_rpc_device.mock_online()
|
||||
await hass.async_block_till_done(wait_background_tasks=True)
|
||||
@@ -846,7 +837,7 @@ async def test_rpc_sleeping_update_entity_service(
|
||||
assert state.state == "22.9"
|
||||
|
||||
assert (entry := entity_registry.async_get(entity_id))
|
||||
assert entry.unique_id == "123456789ABC-temperature:0-temperature_0"
|
||||
assert entry.unique_id == "123456789ABC-temperature:0-temperature_tc"
|
||||
|
||||
assert (
|
||||
"Entity sensor.test_name_temperature comes from a sleeping device"
|
||||
@@ -1219,6 +1210,54 @@ async def test_migrate_unique_id_virtual_components_roles(
|
||||
assert "Migrating unique_id for sensor.test_name_test_sensor" in caplog.text
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("old_unique_id", "new_unique_id", "entity_id"),
|
||||
[
|
||||
(
|
||||
"123456789ABC-temperature:0-temperature_0",
|
||||
"123456789ABC-temperature:0-temperature_tc",
|
||||
"sensor.test_name_temperature",
|
||||
),
|
||||
(
|
||||
"123456789ABC-humidity:0-humidity_0",
|
||||
"123456789ABC-humidity:0-humidity_rh",
|
||||
"sensor.test_name_humidity",
|
||||
),
|
||||
],
|
||||
)
|
||||
@pytest.mark.usefixtures("disable_async_remove_shelly_rpc_entities")
|
||||
async def test_migrate_unique_id_rpc_sensor_description_key_rename(
|
||||
hass: HomeAssistant,
|
||||
mock_rpc_device: Mock,
|
||||
entity_registry: EntityRegistry,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
old_unique_id: str,
|
||||
new_unique_id: str,
|
||||
entity_id: str,
|
||||
) -> None:
|
||||
"""Test migration of RPC sensor unique_id after description key rename."""
|
||||
entry = await init_integration(hass, 2, skip_setup=True)
|
||||
|
||||
entity = entity_registry.async_get_or_create(
|
||||
suggested_object_id=entity_id.split(".")[1],
|
||||
disabled_by=None,
|
||||
domain=SENSOR_DOMAIN,
|
||||
platform=DOMAIN,
|
||||
unique_id=old_unique_id,
|
||||
config_entry=entry,
|
||||
)
|
||||
assert entity.unique_id == old_unique_id
|
||||
|
||||
await hass.config_entries.async_setup(entry.entry_id)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
entity_entry = entity_registry.async_get(entity_id)
|
||||
assert entity_entry
|
||||
assert entity_entry.unique_id == new_unique_id
|
||||
|
||||
assert f"Migrating unique_id for {entity_id} entity" in caplog.text
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("disable_async_remove_shelly_rpc_entities")
|
||||
async def test_rpc_remove_text_virtual_sensor_when_mode_field(
|
||||
hass: HomeAssistant,
|
||||
|
||||
Reference in New Issue
Block a user