Compare commits

...

24 Commits

Author SHA1 Message Date
Jan Bouwhuis
87b83dcc1b Remove the MQTT object_id option after 6 months of deprecation (#164460)
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-02-28 20:12:23 +01:00
Erik Montnemery
be9b47539d Revert "Remove unnecessary volume_up/volume_down overrides from frontier_silicon media player" (#164463) 2026-02-28 20:11:52 +01:00
Joost Lekkerkerker
be6ddc314c Add sound detection switch to SmartThings (#164470) 2026-02-28 20:11:13 +01:00
David Bonnes
c6f8a7b7e4 Harden test of an invalid service call for Evohome (#164458) 2026-02-28 20:10:11 +01:00
Joost Lekkerkerker
53da5612e9 Add fan speed to SmartThings vacuum (#164452)
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-02-28 20:09:43 +01:00
Michael Davie
6cc56b76f9 Bump env-canada to 0.13.2 (#164480)
Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-28 20:08:17 +01:00
Tom Matheussen
03cb65d555 Require user code to be set when toggling Satel Integra switches (#164483) 2026-02-28 20:06:56 +01:00
Abílio Costa
73dd024933 Add merged PR count sensor to Github integration (#164405) 2026-02-28 15:13:17 +01:00
Barry vd. Heuvel
1c8c92bf8f Bump weheat to 2026.2.28 (#164456) 2026-02-28 14:40:58 +01:00
Khole
7e041a6759 Hive - Bump pyhive-integration to v1.0.8 (#164453) 2026-02-28 12:32:37 +00:00
Alex Brown
ee05f14530 Add Matter lock user and credential management services (#161936)
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-28 10:43:09 +01:00
Simone Chemelli
f0ba5178b7 Fix RpcSensorDescription for Shelly (#150719) 2026-02-28 09:28:53 +01:00
Denis Shulyaka
df51ac932b Improve Anthropic service exceptions (#164418)
Co-authored-by: Abílio Costa <abmantis@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-02-28 09:20:17 +01:00
Paulus Schoutsen
e96b5f2eb1 Remove unnecessary volume_up/volume_down overrides from mpd media player (#164428)
Co-authored-by: Claude <noreply@anthropic.com>
2026-02-28 09:16:53 +01:00
Paulus Schoutsen
4e59c89327 Remove unnecessary volume_up/volume_down overrides from bluesound media player (#164426)
Co-authored-by: Claude <noreply@anthropic.com>
2026-02-28 08:57:53 +01:00
Paulus Schoutsen
15676021a9 Remove unnecessary volume_up/volume_down overrides from demo media player (#164424)
Co-authored-by: Claude <noreply@anthropic.com>
2026-02-28 08:57:30 +01:00
Paulus Schoutsen
d3197a0d1e Remove unnecessary volume_up/volume_down overrides from aquostv media player (#164431)
Co-authored-by: Claude <noreply@anthropic.com>
2026-02-28 08:56:09 +01:00
Paulus Schoutsen
35692b335c Remove unnecessary volume_up/volume_down overrides from frontier_silicon media player (#164430)
Co-authored-by: Claude <noreply@anthropic.com>
2026-02-28 08:49:47 +01:00
Paulus Schoutsen
cc5c810501 Remove unnecessary volume_up/volume_down overrides from NADtcp media player (#164434)
Co-authored-by: Claude <noreply@anthropic.com>
2026-02-28 08:47:08 +01:00
Paulus Schoutsen
f2681f2dc8 Remove unnecessary volume_up/volume_down overrides from monoprice media player (#164429)
Co-authored-by: Claude <noreply@anthropic.com>
2026-02-28 08:45:43 +01:00
Brett Adams
fe0a22c790 Complete strict typing for Teslemetry integration (#164416) 2026-02-28 08:33:45 +01:00
Norman Yee
186ab50458 Bump govee-ble to 1.2.0 (#164438) 2026-02-28 08:24:38 +01:00
mettolen
b524c40176 Remove error translation placeholders from Airobot (#164436) 2026-02-28 06:18:19 +01:00
Klaas Schoute
642864959a Update translatable exceptions for Powerfox integration (#164322) 2026-02-28 01:57:02 +00:00
74 changed files with 4326 additions and 590 deletions

View File

@@ -545,6 +545,7 @@ homeassistant.components.tcp.*
homeassistant.components.technove.*
homeassistant.components.tedee.*
homeassistant.components.telegram_bot.*
homeassistant.components.teslemetry.*
homeassistant.components.text.*
homeassistant.components.thethingsnetwork.*
homeassistant.components.threshold.*

View File

@@ -93,7 +93,6 @@ class AirobotNumber(AirobotEntity, NumberEntity):
raise ServiceValidationError(
translation_domain=DOMAIN,
translation_key="set_value_failed",
translation_placeholders={"error": str(err)},
) from err
else:
await self.coordinator.async_request_refresh()

View File

@@ -112,7 +112,7 @@
"message": "Failed to set temperature to {temperature}."
},
"set_value_failed": {
"message": "Failed to set value: {error}"
"message": "Failed to set value."
},
"switch_turn_off_failed": {
"message": "Failed to turn off {switch}."

View File

@@ -400,8 +400,8 @@ def _convert_content(
# If there is only one text block, simplify the content to a string
messages[-1]["content"] = messages[-1]["content"][0]["text"]
else:
# Note: We don't pass SystemContent here as its passed to the API as the prompt
raise TypeError(f"Unexpected content type: {type(content)}")
# Note: We don't pass SystemContent here as it's passed to the API as the prompt
raise HomeAssistantError("Unexpected content type in chat log")
return messages, container_id
@@ -442,8 +442,8 @@ async def _transform_stream( # noqa: C901 - This is complex, but better to have
Each message could contain multiple blocks of the same type.
"""
if stream is None:
raise TypeError("Expected a stream of messages")
if stream is None or not hasattr(stream, "__aiter__"):
raise HomeAssistantError("Expected a stream of messages")
current_tool_block: ToolUseBlockParam | ServerToolUseBlockParam | None = None
current_tool_args: str
@@ -456,8 +456,6 @@ async def _transform_stream( # noqa: C901 - This is complex, but better to have
LOGGER.debug("Received response: %s", response)
if isinstance(response, RawMessageStartEvent):
if response.message.role != "assistant":
raise ValueError("Unexpected message role")
input_usage = response.message.usage
first_block = True
elif isinstance(response, RawContentBlockStartEvent):
@@ -666,7 +664,7 @@ class AnthropicBaseLLMEntity(Entity):
system = chat_log.content[0]
if not isinstance(system, conversation.SystemContent):
raise TypeError("First message must be a system message")
raise HomeAssistantError("First message must be a system message")
# System prompt with caching enabled
system_prompt: list[TextBlockParam] = [

View File

@@ -31,10 +31,7 @@ rules:
test-before-setup: done
unique-config-entry: done
# Silver
action-exceptions:
status: todo
comment: |
Reevaluate exceptions for entity services.
action-exceptions: done
config-entry-unloading: done
docs-configuration-parameters: done
docs-installation-parameters: done

View File

@@ -117,6 +117,7 @@ class SharpAquosTVDevice(MediaPlayerEntity):
| MediaPlayerEntityFeature.VOLUME_SET
| MediaPlayerEntityFeature.PLAY
)
_attr_volume_step = 2 / 60
def __init__(
self, name: str, remote: sharp_aquos_rc.TV, power_on_enabled: bool = False
@@ -161,22 +162,6 @@ class SharpAquosTVDevice(MediaPlayerEntity):
"""Turn off tvplayer."""
self._remote.power(0)
@_retry
def volume_up(self) -> None:
"""Volume up the media player."""
if self.volume_level is None:
_LOGGER.debug("Unknown volume in volume_up")
return
self._remote.volume(int(self.volume_level * 60) + 2)
@_retry
def volume_down(self) -> None:
"""Volume down media player."""
if self.volume_level is None:
_LOGGER.debug("Unknown volume in volume_down")
return
self._remote.volume(int(self.volume_level * 60) - 2)
@_retry
def set_volume_level(self, volume: float) -> None:
"""Set Volume media player."""

View File

@@ -85,6 +85,7 @@ class BluesoundPlayer(CoordinatorEntity[BluesoundCoordinator], MediaPlayerEntity
_attr_media_content_type = MediaType.MUSIC
_attr_has_entity_name = True
_attr_name = None
_attr_volume_step = 0.01
def __init__(
self,
@@ -688,24 +689,6 @@ class BluesoundPlayer(CoordinatorEntity[BluesoundCoordinator], MediaPlayerEntity
await self._player.play_url(url)
async def async_volume_up(self) -> None:
"""Volume up the media player."""
if self.volume_level is None:
return
new_volume = self.volume_level + 0.01
new_volume = min(1, new_volume)
await self.async_set_volume_level(new_volume)
async def async_volume_down(self) -> None:
"""Volume down the media player."""
if self.volume_level is None:
return
new_volume = self.volume_level - 0.01
new_volume = max(0, new_volume)
await self.async_set_volume_level(new_volume)
async def async_set_volume_level(self, volume: float) -> None:
"""Send volume_up command to media player."""
volume = int(round(volume * 100))

View File

@@ -139,18 +139,6 @@ class AbstractDemoPlayer(MediaPlayerEntity):
self._attr_is_volume_muted = mute
self.schedule_update_ha_state()
def volume_up(self) -> None:
"""Increase volume."""
assert self.volume_level is not None
self._attr_volume_level = min(1.0, self.volume_level + 0.1)
self.schedule_update_ha_state()
def volume_down(self) -> None:
"""Decrease volume."""
assert self.volume_level is not None
self._attr_volume_level = max(0.0, self.volume_level - 0.1)
self.schedule_update_ha_state()
def set_volume_level(self, volume: float) -> None:
"""Set the volume level, range 0..1."""
self._attr_volume_level = volume

View File

@@ -7,5 +7,5 @@
"integration_type": "service",
"iot_class": "cloud_polling",
"loggers": ["env_canada"],
"requirements": ["env-canada==0.12.4"]
"requirements": ["env-canada==0.13.2"]
}

View File

@@ -78,6 +78,12 @@ query ($owner: String!, $repository: String!) {
number
}
}
merged_pull_request: pullRequests(
first:1
states: MERGED
) {
total: totalCount
}
release: latestRelease {
name
url

View File

@@ -28,6 +28,9 @@
"latest_tag": {
"default": "mdi:tag"
},
"merged_pulls_count": {
"default": "mdi:source-merge"
},
"pulls_count": {
"default": "mdi:source-pull"
},

View File

@@ -75,6 +75,13 @@ SENSOR_DESCRIPTIONS: tuple[GitHubSensorEntityDescription, ...] = (
state_class=SensorStateClass.MEASUREMENT,
value_fn=lambda data: data["pull_request"]["total"],
),
GitHubSensorEntityDescription(
key="merged_pulls_count",
translation_key="merged_pulls_count",
entity_category=EntityCategory.DIAGNOSTIC,
state_class=SensorStateClass.TOTAL,
value_fn=lambda data: data["merged_pull_request"]["total"],
),
GitHubSensorEntityDescription(
key="latest_commit",
translation_key="latest_commit",

View File

@@ -48,6 +48,10 @@
"latest_tag": {
"name": "Latest tag"
},
"merged_pulls_count": {
"name": "Merged pull requests",
"unit_of_measurement": "pull requests"
},
"pulls_count": {
"name": "Pull requests",
"unit_of_measurement": "pull requests"

View File

@@ -140,5 +140,5 @@
"documentation": "https://www.home-assistant.io/integrations/govee_ble",
"integration_type": "device",
"iot_class": "local_push",
"requirements": ["govee-ble==0.44.0"]
"requirements": ["govee-ble==1.2.0"]
}

View File

@@ -10,5 +10,5 @@
"integration_type": "hub",
"iot_class": "cloud_polling",
"loggers": ["apyhiveapi"],
"requirements": ["pyhive-integration==1.0.7"]
"requirements": ["pyhive-integration==1.0.8"]
}

View File

@@ -2,6 +2,8 @@
import logging
from chip.clusters import Objects as clusters
ADDON_SLUG = "core_matter_server"
CONF_INTEGRATION_CREATED_ADDON = "integration_created_addon"
@@ -15,3 +17,100 @@ ID_TYPE_DEVICE_ID = "deviceid"
ID_TYPE_SERIAL = "serial"
FEATUREMAP_ATTRIBUTE_ID = 65532
# --- Lock domain constants ---
# Shared field keys
ATTR_CREDENTIAL_RULE = "credential_rule"
ATTR_MAX_CREDENTIALS_PER_USER = "max_credentials_per_user"
ATTR_MAX_PIN_USERS = "max_pin_users"
ATTR_MAX_RFID_USERS = "max_rfid_users"
ATTR_MAX_USERS = "max_users"
ATTR_SUPPORTS_USER_MGMT = "supports_user_management"
ATTR_USER_INDEX = "user_index"
ATTR_USER_NAME = "user_name"
ATTR_USER_STATUS = "user_status"
ATTR_USER_TYPE = "user_type"
# Magic values
CLEAR_ALL_INDEX = 0xFFFE # Matter spec: pass to ClearUser/ClearCredential to clear all
# Timed request timeout for lock commands that modify state.
# 10 seconds accounts for Thread network latency and retransmissions.
LOCK_TIMED_REQUEST_TIMEOUT_MS = 10000
# Credential field keys
ATTR_CREDENTIAL_DATA = "credential_data"
ATTR_CREDENTIAL_INDEX = "credential_index"
ATTR_CREDENTIAL_TYPE = "credential_type"
# Credential type strings
CRED_TYPE_FACE = "face"
CRED_TYPE_FINGERPRINT = "fingerprint"
CRED_TYPE_FINGER_VEIN = "finger_vein"
CRED_TYPE_PIN = "pin"
CRED_TYPE_RFID = "rfid"
# User status mapping (Matter DoorLock UserStatusEnum)
_UserStatus = clusters.DoorLock.Enums.UserStatusEnum
USER_STATUS_MAP: dict[int, str] = {
_UserStatus.kAvailable: "available",
_UserStatus.kOccupiedEnabled: "occupied_enabled",
_UserStatus.kOccupiedDisabled: "occupied_disabled",
}
USER_STATUS_REVERSE_MAP: dict[str, int] = {v: k for k, v in USER_STATUS_MAP.items()}
# User type mapping (Matter DoorLock UserTypeEnum)
_UserType = clusters.DoorLock.Enums.UserTypeEnum
USER_TYPE_MAP: dict[int, str] = {
_UserType.kUnrestrictedUser: "unrestricted_user",
_UserType.kYearDayScheduleUser: "year_day_schedule_user",
_UserType.kWeekDayScheduleUser: "week_day_schedule_user",
_UserType.kProgrammingUser: "programming_user",
_UserType.kNonAccessUser: "non_access_user",
_UserType.kForcedUser: "forced_user",
_UserType.kDisposableUser: "disposable_user",
_UserType.kExpiringUser: "expiring_user",
_UserType.kScheduleRestrictedUser: "schedule_restricted_user",
_UserType.kRemoteOnlyUser: "remote_only_user",
}
USER_TYPE_REVERSE_MAP: dict[str, int] = {v: k for k, v in USER_TYPE_MAP.items()}
# Credential type mapping (Matter DoorLock CredentialTypeEnum)
_CredentialType = clusters.DoorLock.Enums.CredentialTypeEnum
CREDENTIAL_TYPE_MAP: dict[int, str] = {
_CredentialType.kProgrammingPIN: "programming_pin",
_CredentialType.kPin: CRED_TYPE_PIN,
_CredentialType.kRfid: CRED_TYPE_RFID,
_CredentialType.kFingerprint: CRED_TYPE_FINGERPRINT,
_CredentialType.kFingerVein: CRED_TYPE_FINGER_VEIN,
_CredentialType.kFace: CRED_TYPE_FACE,
_CredentialType.kAliroCredentialIssuerKey: "aliro_credential_issuer_key",
_CredentialType.kAliroEvictableEndpointKey: "aliro_evictable_endpoint_key",
_CredentialType.kAliroNonEvictableEndpointKey: "aliro_non_evictable_endpoint_key",
}
# Credential rule mapping (Matter DoorLock CredentialRuleEnum)
_CredentialRule = clusters.DoorLock.Enums.CredentialRuleEnum
CREDENTIAL_RULE_MAP: dict[int, str] = {
_CredentialRule.kSingle: "single",
_CredentialRule.kDual: "dual",
_CredentialRule.kTri: "tri",
}
CREDENTIAL_RULE_REVERSE_MAP: dict[str, int] = {
v: k for k, v in CREDENTIAL_RULE_MAP.items()
}
# Reverse mapping for credential types (str -> int)
CREDENTIAL_TYPE_REVERSE_MAP: dict[str, int] = {
v: k for k, v in CREDENTIAL_TYPE_MAP.items()
}
# Credential types allowed in set/clear services (excludes programming_pin, aliro_*)
SERVICE_CREDENTIAL_TYPES = [
CRED_TYPE_PIN,
CRED_TYPE_RFID,
CRED_TYPE_FINGERPRINT,
CRED_TYPE_FINGER_VEIN,
CRED_TYPE_FACE,
]

View File

@@ -174,6 +174,27 @@
}
},
"services": {
"clear_lock_credential": {
"service": "mdi:key-remove"
},
"clear_lock_user": {
"service": "mdi:account-remove"
},
"get_lock_credential_status": {
"service": "mdi:key-chain"
},
"get_lock_info": {
"service": "mdi:lock-question"
},
"get_lock_users": {
"service": "mdi:account-multiple"
},
"set_lock_credential": {
"service": "mdi:key-plus"
},
"set_lock_user": {
"service": "mdi:account-lock"
},
"water_heater_boost": {
"service": "mdi:water-boiler"
}

View File

@@ -7,6 +7,7 @@ from dataclasses import dataclass
from typing import Any
from chip.clusters import Objects as clusters
from matter_server.common.errors import MatterError
from matter_server.common.models import EventType, MatterNodeEvent
from homeassistant.components.lock import (
@@ -17,32 +18,56 @@ from homeassistant.components.lock import (
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import ATTR_CODE, Platform
from homeassistant.core import HomeAssistant, callback
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from .const import LOGGER
from .const import (
ATTR_CREDENTIAL_DATA,
ATTR_CREDENTIAL_INDEX,
ATTR_CREDENTIAL_RULE,
ATTR_CREDENTIAL_TYPE,
ATTR_USER_INDEX,
ATTR_USER_NAME,
ATTR_USER_STATUS,
ATTR_USER_TYPE,
LOCK_TIMED_REQUEST_TIMEOUT_MS,
LOGGER,
)
from .entity import MatterEntity, MatterEntityDescription
from .helpers import get_matter
from .lock_helpers import (
DoorLockFeature,
GetLockCredentialStatusResult,
GetLockInfoResult,
GetLockUsersResult,
SetLockCredentialResult,
clear_lock_credential,
clear_lock_user,
get_lock_credential_status,
get_lock_info,
get_lock_users,
set_lock_credential,
set_lock_user,
)
from .models import MatterDiscoverySchema
DOOR_LOCK_OPERATION_SOURCE = {
# mapping from operation source id's to textual representation
0: "Unspecified",
1: "Manual", # [Optional]
2: "Proprietary Remote", # [Optional]
3: "Keypad", # [Optional]
4: "Auto", # [Optional]
5: "Button", # [Optional]
6: "Schedule", # [HDSCH]
7: "Remote", # [M]
8: "RFID", # [RID]
9: "Biometric", # [USR]
10: "Aliro", # [Aliro]
# Door lock operation source mapping (Matter DoorLock OperationSourceEnum)
_OperationSource = clusters.DoorLock.Enums.OperationSourceEnum
DOOR_LOCK_OPERATION_SOURCE: dict[int, str] = {
_OperationSource.kUnspecified: "Unspecified",
_OperationSource.kManual: "Manual",
_OperationSource.kProprietaryRemote: "Proprietary Remote",
_OperationSource.kKeypad: "Keypad",
_OperationSource.kAuto: "Auto",
_OperationSource.kButton: "Button",
_OperationSource.kSchedule: "Schedule",
_OperationSource.kRemote: "Remote",
_OperationSource.kRfid: "RFID",
_OperationSource.kBiometric: "Biometric",
_OperationSource.kAliro: "Aliro",
}
DoorLockFeature = clusters.DoorLock.Bitmaps.Feature
async def async_setup_entry(
hass: HomeAssistant,
config_entry: ConfigEntry,
@@ -98,17 +123,15 @@ class MatterLock(MatterEntity, LockEntity):
node_event.data,
)
# handle the DoorLock events
# Handle the DoorLock events
node_event_data: dict[str, int] = node_event.data or {}
match node_event.event_id:
case (
clusters.DoorLock.Events.LockOperation.event_id
): # Lock cluster event 2
# update the changed_by attribute to indicate lock operation source
case clusters.DoorLock.Events.LockOperation.event_id:
operation_source: int = node_event_data.get("operationSource", -1)
self._attr_changed_by = DOOR_LOCK_OPERATION_SOURCE.get(
source_name = DOOR_LOCK_OPERATION_SOURCE.get(
operation_source, "Unknown"
)
self._attr_changed_by = source_name
self.async_write_ha_state()
@property
@@ -146,7 +169,7 @@ class MatterLock(MatterEntity, LockEntity):
code_bytes = code.encode() if code else None
await self.send_device_command(
command=clusters.DoorLock.Commands.LockDoor(code_bytes),
timed_request_timeout_ms=1000,
timed_request_timeout_ms=LOCK_TIMED_REQUEST_TIMEOUT_MS,
)
async def async_unlock(self, **kwargs: Any) -> None:
@@ -168,12 +191,12 @@ class MatterLock(MatterEntity, LockEntity):
# and unlatch on the HA 'open' command.
await self.send_device_command(
command=clusters.DoorLock.Commands.UnboltDoor(code_bytes),
timed_request_timeout_ms=1000,
timed_request_timeout_ms=LOCK_TIMED_REQUEST_TIMEOUT_MS,
)
else:
await self.send_device_command(
command=clusters.DoorLock.Commands.UnlockDoor(code_bytes),
timed_request_timeout_ms=1000,
timed_request_timeout_ms=LOCK_TIMED_REQUEST_TIMEOUT_MS,
)
async def async_open(self, **kwargs: Any) -> None:
@@ -190,7 +213,7 @@ class MatterLock(MatterEntity, LockEntity):
code_bytes = code.encode() if code else None
await self.send_device_command(
command=clusters.DoorLock.Commands.UnlockDoor(code_bytes),
timed_request_timeout_ms=1000,
timed_request_timeout_ms=LOCK_TIMED_REQUEST_TIMEOUT_MS,
)
@callback
@@ -256,6 +279,109 @@ class MatterLock(MatterEntity, LockEntity):
supported_features |= LockEntityFeature.OPEN
self._attr_supported_features = supported_features
# --- Entity service methods ---
async def async_set_lock_user(self, **kwargs: Any) -> None:
"""Set a lock user (full CRUD)."""
try:
await set_lock_user(
self.matter_client,
self._endpoint.node,
user_index=kwargs.get(ATTR_USER_INDEX),
user_name=kwargs.get(ATTR_USER_NAME),
user_type=kwargs.get(ATTR_USER_TYPE),
credential_rule=kwargs.get(ATTR_CREDENTIAL_RULE),
)
except MatterError as err:
raise HomeAssistantError(
f"Failed to set lock user on {self.entity_id}: {err}"
) from err
async def async_clear_lock_user(self, **kwargs: Any) -> None:
"""Clear a lock user."""
try:
await clear_lock_user(
self.matter_client,
self._endpoint.node,
kwargs[ATTR_USER_INDEX],
)
except MatterError as err:
raise HomeAssistantError(
f"Failed to clear lock user on {self.entity_id}: {err}"
) from err
async def async_get_lock_info(self) -> GetLockInfoResult:
"""Get lock capabilities and configuration info."""
try:
return await get_lock_info(
self.matter_client,
self._endpoint.node,
)
except MatterError as err:
raise HomeAssistantError(
f"Failed to get lock info for {self.entity_id}: {err}"
) from err
async def async_get_lock_users(self) -> GetLockUsersResult:
"""Get all users from the lock."""
try:
return await get_lock_users(
self.matter_client,
self._endpoint.node,
)
except MatterError as err:
raise HomeAssistantError(
f"Failed to get lock users for {self.entity_id}: {err}"
) from err
async def async_set_lock_credential(self, **kwargs: Any) -> SetLockCredentialResult:
"""Set a credential on the lock."""
try:
return await set_lock_credential(
self.matter_client,
self._endpoint.node,
credential_type=kwargs[ATTR_CREDENTIAL_TYPE],
credential_data=kwargs[ATTR_CREDENTIAL_DATA],
credential_index=kwargs.get(ATTR_CREDENTIAL_INDEX),
user_index=kwargs.get(ATTR_USER_INDEX),
user_status=kwargs.get(ATTR_USER_STATUS),
user_type=kwargs.get(ATTR_USER_TYPE),
)
except MatterError as err:
raise HomeAssistantError(
f"Failed to set lock credential on {self.entity_id}: {err}"
) from err
async def async_clear_lock_credential(self, **kwargs: Any) -> None:
"""Clear a credential from the lock."""
try:
await clear_lock_credential(
self.matter_client,
self._endpoint.node,
credential_type=kwargs[ATTR_CREDENTIAL_TYPE],
credential_index=kwargs[ATTR_CREDENTIAL_INDEX],
)
except MatterError as err:
raise HomeAssistantError(
f"Failed to clear lock credential on {self.entity_id}: {err}"
) from err
async def async_get_lock_credential_status(
self, **kwargs: Any
) -> GetLockCredentialStatusResult:
"""Get the status of a credential slot on the lock."""
try:
return await get_lock_credential_status(
self.matter_client,
self._endpoint.node,
credential_type=kwargs[ATTR_CREDENTIAL_TYPE],
credential_index=kwargs[ATTR_CREDENTIAL_INDEX],
)
except MatterError as err:
raise HomeAssistantError(
f"Failed to get credential status for {self.entity_id}: {err}"
) from err
DISCOVERY_SCHEMAS = [
MatterDiscoverySchema(

View File

@@ -0,0 +1,881 @@
"""Lock-specific helpers for the Matter integration.
Provides DoorLock cluster endpoint resolution, feature detection, and
business logic for lock user/credential management.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Any, TypedDict
from chip.clusters import Objects as clusters
from homeassistant.exceptions import HomeAssistantError, ServiceValidationError
from .const import (
CLEAR_ALL_INDEX,
CRED_TYPE_FACE,
CRED_TYPE_FINGER_VEIN,
CRED_TYPE_FINGERPRINT,
CRED_TYPE_PIN,
CRED_TYPE_RFID,
CREDENTIAL_RULE_MAP,
CREDENTIAL_RULE_REVERSE_MAP,
CREDENTIAL_TYPE_MAP,
CREDENTIAL_TYPE_REVERSE_MAP,
LOCK_TIMED_REQUEST_TIMEOUT_MS,
USER_STATUS_MAP,
USER_STATUS_REVERSE_MAP,
USER_TYPE_MAP,
USER_TYPE_REVERSE_MAP,
)
# Error translation keys (used in ServiceValidationError/HomeAssistantError)
ERR_CREDENTIAL_TYPE_NOT_SUPPORTED = "credential_type_not_supported"
ERR_INVALID_CREDENTIAL_DATA = "invalid_credential_data"
# SetCredential response status mapping (Matter DlStatus)
_DlStatus = clusters.DoorLock.Enums.DlStatus
SET_CREDENTIAL_STATUS_MAP: dict[int, str] = {
_DlStatus.kSuccess: "success",
_DlStatus.kFailure: "failure",
_DlStatus.kDuplicate: "duplicate",
_DlStatus.kOccupied: "occupied",
}
if TYPE_CHECKING:
from matter_server.client import MatterClient
from matter_server.client.models.node import MatterEndpoint, MatterNode
# DoorLock Feature bitmap from Matter SDK
DoorLockFeature = clusters.DoorLock.Bitmaps.Feature
# --- TypedDicts for service action responses ---
class LockUserCredentialData(TypedDict):
"""Credential data within a user response."""
type: str
index: int | None
class LockUserData(TypedDict):
"""User data returned from lock queries."""
user_index: int | None
user_name: str | None
user_unique_id: int | None
user_status: str
user_type: str
credential_rule: str
credentials: list[LockUserCredentialData]
next_user_index: int | None
class SetLockUserResult(TypedDict):
"""Result of set_lock_user service action."""
user_index: int
class GetLockUsersResult(TypedDict):
"""Result of get_lock_users service action."""
max_users: int
users: list[LockUserData]
class GetLockInfoResult(TypedDict):
"""Result of get_lock_info service action."""
supports_user_management: bool
supported_credential_types: list[str]
max_users: int | None
max_pin_users: int | None
max_rfid_users: int | None
max_credentials_per_user: int | None
min_pin_length: int | None
max_pin_length: int | None
min_rfid_length: int | None
max_rfid_length: int | None
class SetLockCredentialResult(TypedDict):
"""Result of set_lock_credential service action."""
credential_index: int
user_index: int | None
next_credential_index: int | None
class GetLockCredentialStatusResult(TypedDict):
"""Result of get_lock_credential_status service action."""
credential_exists: bool
user_index: int | None
next_credential_index: int | None
def _get_lock_endpoint_from_node(node: MatterNode) -> MatterEndpoint | None:
"""Get the DoorLock endpoint from a node.
Returns the first endpoint that has the DoorLock cluster, or None if not found.
"""
for endpoint in node.endpoints.values():
if endpoint.has_cluster(clusters.DoorLock):
return endpoint
return None
def _get_feature_map(endpoint: MatterEndpoint) -> int | None:
"""Read the DoorLock FeatureMap attribute from an endpoint."""
value: int | None = endpoint.get_attribute_value(
None, clusters.DoorLock.Attributes.FeatureMap
)
return value
def _lock_supports_usr_feature(endpoint: MatterEndpoint) -> bool:
"""Check if lock endpoint supports USR (User) feature.
The USR feature indicates the lock supports user and credential management
commands like SetUser, GetUser, SetCredential, etc.
"""
feature_map = _get_feature_map(endpoint)
if feature_map is None:
return False
return bool(feature_map & DoorLockFeature.kUser)
# --- Pure utility functions ---
def _get_attr(obj: Any, attr: str) -> Any:
"""Get attribute from object or dict.
Matter SDK responses can be either dataclass objects or dicts depending on
the SDK version and serialization context.
"""
if isinstance(obj, dict):
return obj.get(attr)
return getattr(obj, attr, None)
def _get_supported_credential_types(feature_map: int) -> list[str]:
"""Get list of supported credential types from feature map."""
types = []
if feature_map & DoorLockFeature.kPinCredential:
types.append(CRED_TYPE_PIN)
if feature_map & DoorLockFeature.kRfidCredential:
types.append(CRED_TYPE_RFID)
if feature_map & DoorLockFeature.kFingerCredentials:
types.append(CRED_TYPE_FINGERPRINT)
if feature_map & DoorLockFeature.kFaceCredentials:
types.append(CRED_TYPE_FACE)
return types
def _format_user_response(user_data: Any) -> LockUserData | None:
"""Format GetUser response to API response format.
Returns None if the user slot is empty (no userStatus).
"""
if user_data is None:
return None
user_status = _get_attr(user_data, "userStatus")
if user_status is None:
return None
creds = _get_attr(user_data, "credentials")
credentials: list[LockUserCredentialData] = [
LockUserCredentialData(
type=CREDENTIAL_TYPE_MAP.get(_get_attr(cred, "credentialType"), "unknown"),
index=_get_attr(cred, "credentialIndex"),
)
for cred in (creds or [])
]
return LockUserData(
user_index=_get_attr(user_data, "userIndex"),
user_name=_get_attr(user_data, "userName"),
user_unique_id=_get_attr(user_data, "userUniqueID"),
user_status=USER_STATUS_MAP.get(user_status, "unknown"),
user_type=USER_TYPE_MAP.get(_get_attr(user_data, "userType"), "unknown"),
credential_rule=CREDENTIAL_RULE_MAP.get(
_get_attr(user_data, "credentialRule"), "unknown"
),
credentials=credentials,
next_user_index=_get_attr(user_data, "nextUserIndex"),
)
# --- Credential management helpers ---
async def _clear_user_credentials(
matter_client: MatterClient,
node_id: int,
endpoint_id: int,
user_index: int,
) -> None:
"""Clear all credentials for a specific user.
Fetches the user to get credential list, then clears each credential.
"""
get_user_response = await matter_client.send_device_command(
node_id=node_id,
endpoint_id=endpoint_id,
command=clusters.DoorLock.Commands.GetUser(userIndex=user_index),
)
creds = _get_attr(get_user_response, "credentials")
if not creds:
return
for cred in creds:
cred_type = _get_attr(cred, "credentialType")
cred_index = _get_attr(cred, "credentialIndex")
await matter_client.send_device_command(
node_id=node_id,
endpoint_id=endpoint_id,
command=clusters.DoorLock.Commands.ClearCredential(
credential=clusters.DoorLock.Structs.CredentialStruct(
credentialType=cred_type,
credentialIndex=cred_index,
),
),
timed_request_timeout_ms=LOCK_TIMED_REQUEST_TIMEOUT_MS,
)
class LockEndpointNotFoundError(HomeAssistantError):
"""Lock endpoint not found on node."""
class UsrFeatureNotSupportedError(ServiceValidationError):
"""Lock does not support USR (user management) feature."""
class UserSlotEmptyError(ServiceValidationError):
"""User slot is empty."""
class NoAvailableUserSlotsError(ServiceValidationError):
"""No available user slots on the lock."""
class CredentialTypeNotSupportedError(ServiceValidationError):
"""Lock does not support the requested credential type."""
class CredentialDataInvalidError(ServiceValidationError):
"""Credential data fails validation."""
class SetCredentialFailedError(HomeAssistantError):
"""SetCredential command returned a non-success status."""
def _get_lock_endpoint_or_raise(node: MatterNode) -> MatterEndpoint:
"""Get the DoorLock endpoint from a node or raise an error."""
lock_endpoint = _get_lock_endpoint_from_node(node)
if lock_endpoint is None:
raise LockEndpointNotFoundError("No lock endpoint found on this device")
return lock_endpoint
def _ensure_usr_support(lock_endpoint: MatterEndpoint) -> None:
"""Ensure the lock endpoint supports USR (user management) feature.
Raises UsrFeatureNotSupportedError if the lock doesn't support user management.
"""
if not _lock_supports_usr_feature(lock_endpoint):
raise UsrFeatureNotSupportedError(
"Lock does not support user/credential management"
)
# --- High-level business logic functions ---
async def get_lock_info(
matter_client: MatterClient,
node: MatterNode,
) -> GetLockInfoResult:
"""Get lock capabilities and configuration info.
Returns a typed dict with lock capability information.
Raises HomeAssistantError if lock endpoint not found.
"""
lock_endpoint = _get_lock_endpoint_or_raise(node)
supports_usr = _lock_supports_usr_feature(lock_endpoint)
# Get feature map for credential type detection
feature_map = (
lock_endpoint.get_attribute_value(None, clusters.DoorLock.Attributes.FeatureMap)
or 0
)
result = GetLockInfoResult(
supports_user_management=supports_usr,
supported_credential_types=_get_supported_credential_types(feature_map),
max_users=None,
max_pin_users=None,
max_rfid_users=None,
max_credentials_per_user=None,
min_pin_length=None,
max_pin_length=None,
min_rfid_length=None,
max_rfid_length=None,
)
# Populate capacity info if USR feature is supported
if supports_usr:
result["max_users"] = lock_endpoint.get_attribute_value(
None, clusters.DoorLock.Attributes.NumberOfTotalUsersSupported
)
result["max_pin_users"] = lock_endpoint.get_attribute_value(
None, clusters.DoorLock.Attributes.NumberOfPINUsersSupported
)
result["max_rfid_users"] = lock_endpoint.get_attribute_value(
None, clusters.DoorLock.Attributes.NumberOfRFIDUsersSupported
)
result["max_credentials_per_user"] = lock_endpoint.get_attribute_value(
None, clusters.DoorLock.Attributes.NumberOfCredentialsSupportedPerUser
)
result["min_pin_length"] = lock_endpoint.get_attribute_value(
None, clusters.DoorLock.Attributes.MinPINCodeLength
)
result["max_pin_length"] = lock_endpoint.get_attribute_value(
None, clusters.DoorLock.Attributes.MaxPINCodeLength
)
result["min_rfid_length"] = lock_endpoint.get_attribute_value(
None, clusters.DoorLock.Attributes.MinRFIDCodeLength
)
result["max_rfid_length"] = lock_endpoint.get_attribute_value(
None, clusters.DoorLock.Attributes.MaxRFIDCodeLength
)
return result
async def set_lock_user(
matter_client: MatterClient,
node: MatterNode,
*,
user_index: int | None = None,
user_name: str | None = None,
user_unique_id: int | None = None,
user_status: str | None = None,
user_type: str | None = None,
credential_rule: str | None = None,
) -> SetLockUserResult:
"""Add or update a user on the lock.
When user_status, user_type, or credential_rule is None, defaults are used
for new users and existing values are preserved for modifications.
Returns typed dict with user_index on success.
Raises HomeAssistantError on failure.
"""
lock_endpoint = _get_lock_endpoint_or_raise(node)
_ensure_usr_support(lock_endpoint)
if user_index is None:
# Adding new user - find first available slot
max_users = (
lock_endpoint.get_attribute_value(
None, clusters.DoorLock.Attributes.NumberOfTotalUsersSupported
)
or 0
)
for idx in range(1, max_users + 1):
get_user_response = await matter_client.send_device_command(
node_id=node.node_id,
endpoint_id=lock_endpoint.endpoint_id,
command=clusters.DoorLock.Commands.GetUser(userIndex=idx),
)
if _get_attr(get_user_response, "userStatus") is None:
user_index = idx
break
if user_index is None:
raise NoAvailableUserSlotsError("No available user slots on the lock")
user_status_enum = (
USER_STATUS_REVERSE_MAP.get(
user_status,
clusters.DoorLock.Enums.UserStatusEnum.kOccupiedEnabled,
)
if user_status is not None
else clusters.DoorLock.Enums.UserStatusEnum.kOccupiedEnabled
)
await matter_client.send_device_command(
node_id=node.node_id,
endpoint_id=lock_endpoint.endpoint_id,
command=clusters.DoorLock.Commands.SetUser(
operationType=clusters.DoorLock.Enums.DataOperationTypeEnum.kAdd,
userIndex=user_index,
userName=user_name,
userUniqueID=user_unique_id,
userStatus=user_status_enum,
userType=USER_TYPE_REVERSE_MAP.get(
user_type,
clusters.DoorLock.Enums.UserTypeEnum.kUnrestrictedUser,
)
if user_type is not None
else clusters.DoorLock.Enums.UserTypeEnum.kUnrestrictedUser,
credentialRule=CREDENTIAL_RULE_REVERSE_MAP.get(
credential_rule,
clusters.DoorLock.Enums.CredentialRuleEnum.kSingle,
)
if credential_rule is not None
else clusters.DoorLock.Enums.CredentialRuleEnum.kSingle,
),
timed_request_timeout_ms=LOCK_TIMED_REQUEST_TIMEOUT_MS,
)
else:
# Updating existing user - preserve existing values when not specified
get_user_response = await matter_client.send_device_command(
node_id=node.node_id,
endpoint_id=lock_endpoint.endpoint_id,
command=clusters.DoorLock.Commands.GetUser(userIndex=user_index),
)
if _get_attr(get_user_response, "userStatus") is None:
raise UserSlotEmptyError(f"User slot {user_index} is empty")
resolved_user_name = (
user_name
if user_name is not None
else _get_attr(get_user_response, "userName")
)
resolved_unique_id = (
user_unique_id
if user_unique_id is not None
else _get_attr(get_user_response, "userUniqueID")
)
resolved_status = (
USER_STATUS_REVERSE_MAP[user_status]
if user_status is not None
else _get_attr(get_user_response, "userStatus")
)
resolved_type = (
USER_TYPE_REVERSE_MAP[user_type]
if user_type is not None
else _get_attr(get_user_response, "userType")
)
resolved_rule = (
CREDENTIAL_RULE_REVERSE_MAP[credential_rule]
if credential_rule is not None
else _get_attr(get_user_response, "credentialRule")
)
await matter_client.send_device_command(
node_id=node.node_id,
endpoint_id=lock_endpoint.endpoint_id,
command=clusters.DoorLock.Commands.SetUser(
operationType=clusters.DoorLock.Enums.DataOperationTypeEnum.kModify,
userIndex=user_index,
userName=resolved_user_name,
userUniqueID=resolved_unique_id,
userStatus=resolved_status,
userType=resolved_type,
credentialRule=resolved_rule,
),
timed_request_timeout_ms=LOCK_TIMED_REQUEST_TIMEOUT_MS,
)
return SetLockUserResult(user_index=user_index)
async def get_lock_users(
matter_client: MatterClient,
node: MatterNode,
) -> GetLockUsersResult:
"""Get all users from the lock.
Returns typed dict with users list and max_users capacity.
Raises HomeAssistantError on failure.
"""
lock_endpoint = _get_lock_endpoint_or_raise(node)
_ensure_usr_support(lock_endpoint)
max_users = (
lock_endpoint.get_attribute_value(
None, clusters.DoorLock.Attributes.NumberOfTotalUsersSupported
)
or 0
)
users: list[LockUserData] = []
current_index = 1
# Iterate through users using next_user_index for efficiency
while current_index is not None and current_index <= max_users:
get_user_response = await matter_client.send_device_command(
node_id=node.node_id,
endpoint_id=lock_endpoint.endpoint_id,
command=clusters.DoorLock.Commands.GetUser(
userIndex=current_index,
),
)
user_data = _format_user_response(get_user_response)
if user_data is not None:
users.append(user_data)
# Move to next user index
next_index = _get_attr(get_user_response, "nextUserIndex")
if next_index is None or next_index <= current_index:
break
current_index = next_index
return GetLockUsersResult(
max_users=max_users,
users=users,
)
async def clear_lock_user(
matter_client: MatterClient,
node: MatterNode,
user_index: int,
) -> None:
"""Clear a user from the lock, cleaning up credentials first.
Use index 0xFFFE (CLEAR_ALL_INDEX) to clear all users.
Raises HomeAssistantError on failure.
"""
lock_endpoint = _get_lock_endpoint_or_raise(node)
_ensure_usr_support(lock_endpoint)
if user_index == CLEAR_ALL_INDEX:
# Clear all: clear all credentials first, then all users
await matter_client.send_device_command(
node_id=node.node_id,
endpoint_id=lock_endpoint.endpoint_id,
command=clusters.DoorLock.Commands.ClearCredential(
credential=None,
),
timed_request_timeout_ms=LOCK_TIMED_REQUEST_TIMEOUT_MS,
)
else:
# Clear credentials for this specific user before deleting them
await _clear_user_credentials(
matter_client,
node.node_id,
lock_endpoint.endpoint_id,
user_index,
)
await matter_client.send_device_command(
node_id=node.node_id,
endpoint_id=lock_endpoint.endpoint_id,
command=clusters.DoorLock.Commands.ClearUser(
userIndex=user_index,
),
timed_request_timeout_ms=LOCK_TIMED_REQUEST_TIMEOUT_MS,
)
# --- Credential validation helpers ---
# Map credential type strings to the feature bit that must be set
_CREDENTIAL_TYPE_FEATURE_MAP: dict[str, int] = {
CRED_TYPE_PIN: DoorLockFeature.kPinCredential,
CRED_TYPE_RFID: DoorLockFeature.kRfidCredential,
CRED_TYPE_FINGERPRINT: DoorLockFeature.kFingerCredentials,
CRED_TYPE_FINGER_VEIN: DoorLockFeature.kFingerCredentials,
CRED_TYPE_FACE: DoorLockFeature.kFaceCredentials,
}
def _validate_credential_type_support(
lock_endpoint: MatterEndpoint, credential_type: str
) -> None:
"""Validate the lock supports the requested credential type.
Raises CredentialTypeNotSupportedError if not supported.
"""
required_bit = _CREDENTIAL_TYPE_FEATURE_MAP.get(credential_type)
if required_bit is None:
raise CredentialTypeNotSupportedError(
translation_domain="matter",
translation_key=ERR_CREDENTIAL_TYPE_NOT_SUPPORTED,
translation_placeholders={"credential_type": credential_type},
)
feature_map = _get_feature_map(lock_endpoint) or 0
if not (feature_map & required_bit):
raise CredentialTypeNotSupportedError(
translation_domain="matter",
translation_key=ERR_CREDENTIAL_TYPE_NOT_SUPPORTED,
translation_placeholders={"credential_type": credential_type},
)
def _validate_credential_data(
lock_endpoint: MatterEndpoint, credential_type: str, credential_data: str
) -> None:
"""Validate credential data against lock constraints.
For PIN: checks digits-only and length against Min/MaxPINCodeLength.
For RFID: checks valid hex and byte length against Min/MaxRFIDCodeLength.
Raises CredentialDataInvalidError on failure.
"""
if credential_type == CRED_TYPE_PIN:
if not credential_data.isdigit():
raise CredentialDataInvalidError(
translation_domain="matter",
translation_key=ERR_INVALID_CREDENTIAL_DATA,
translation_placeholders={"reason": "PIN must contain only digits"},
)
min_len = (
lock_endpoint.get_attribute_value(
None, clusters.DoorLock.Attributes.MinPINCodeLength
)
or 0
)
max_len = (
lock_endpoint.get_attribute_value(
None, clusters.DoorLock.Attributes.MaxPINCodeLength
)
or 255
)
if not min_len <= len(credential_data) <= max_len:
raise CredentialDataInvalidError(
translation_domain="matter",
translation_key=ERR_INVALID_CREDENTIAL_DATA,
translation_placeholders={
"reason": (f"PIN length must be between {min_len} and {max_len}")
},
)
elif credential_type == CRED_TYPE_RFID:
try:
rfid_bytes = bytes.fromhex(credential_data)
except ValueError as err:
raise CredentialDataInvalidError(
translation_domain="matter",
translation_key=ERR_INVALID_CREDENTIAL_DATA,
translation_placeholders={
"reason": "RFID data must be valid hexadecimal"
},
) from err
min_len = (
lock_endpoint.get_attribute_value(
None, clusters.DoorLock.Attributes.MinRFIDCodeLength
)
or 0
)
max_len = (
lock_endpoint.get_attribute_value(
None, clusters.DoorLock.Attributes.MaxRFIDCodeLength
)
or 255
)
if not min_len <= len(rfid_bytes) <= max_len:
raise CredentialDataInvalidError(
translation_domain="matter",
translation_key=ERR_INVALID_CREDENTIAL_DATA,
translation_placeholders={
"reason": (
f"RFID data length must be between"
f" {min_len} and {max_len} bytes"
)
},
)
def _credential_data_to_bytes(credential_type: str, credential_data: str) -> bytes:
"""Convert credential data string to bytes for the Matter command."""
if credential_type == CRED_TYPE_RFID:
return bytes.fromhex(credential_data)
# PIN and other types: encode as UTF-8
return credential_data.encode()
# --- Credential business logic functions ---
async def set_lock_credential(
matter_client: MatterClient,
node: MatterNode,
*,
credential_type: str,
credential_data: str,
credential_index: int | None = None,
user_index: int | None = None,
user_status: str | None = None,
user_type: str | None = None,
) -> SetLockCredentialResult:
"""Add or modify a credential on the lock.
Returns typed dict with credential_index, user_index, and next_credential_index.
Raises ServiceValidationError for validation failures.
Raises HomeAssistantError for device communication failures.
"""
lock_endpoint = _get_lock_endpoint_or_raise(node)
_ensure_usr_support(lock_endpoint)
_validate_credential_type_support(lock_endpoint, credential_type)
_validate_credential_data(lock_endpoint, credential_type, credential_data)
cred_type_int = CREDENTIAL_TYPE_REVERSE_MAP[credential_type]
cred_data_bytes = _credential_data_to_bytes(credential_type, credential_data)
# Determine operation type and credential index
operation_type = clusters.DoorLock.Enums.DataOperationTypeEnum.kAdd
if credential_index is None:
# Auto-find first available credential slot
max_creds = (
lock_endpoint.get_attribute_value(
None,
clusters.DoorLock.Attributes.NumberOfCredentialsSupportedPerUser,
)
or 5
)
for idx in range(1, max_creds + 1):
status_response = await matter_client.send_device_command(
node_id=node.node_id,
endpoint_id=lock_endpoint.endpoint_id,
command=clusters.DoorLock.Commands.GetCredentialStatus(
credential=clusters.DoorLock.Structs.CredentialStruct(
credentialType=cred_type_int,
credentialIndex=idx,
),
),
)
if not _get_attr(status_response, "credentialExists"):
credential_index = idx
break
if credential_index is None:
raise NoAvailableUserSlotsError("No available credential slots on the lock")
else:
# Check if slot is occupied to determine Add vs Modify
status_response = await matter_client.send_device_command(
node_id=node.node_id,
endpoint_id=lock_endpoint.endpoint_id,
command=clusters.DoorLock.Commands.GetCredentialStatus(
credential=clusters.DoorLock.Structs.CredentialStruct(
credentialType=cred_type_int,
credentialIndex=credential_index,
),
),
)
if _get_attr(status_response, "credentialExists"):
operation_type = clusters.DoorLock.Enums.DataOperationTypeEnum.kModify
# Resolve optional user_status and user_type enums
resolved_user_status = (
USER_STATUS_REVERSE_MAP.get(user_status) if user_status is not None else None
)
resolved_user_type = (
USER_TYPE_REVERSE_MAP.get(user_type) if user_type is not None else None
)
set_cred_response = await matter_client.send_device_command(
node_id=node.node_id,
endpoint_id=lock_endpoint.endpoint_id,
command=clusters.DoorLock.Commands.SetCredential(
operationType=operation_type,
credential=clusters.DoorLock.Structs.CredentialStruct(
credentialType=cred_type_int,
credentialIndex=credential_index,
),
credentialData=cred_data_bytes,
userIndex=user_index,
userStatus=resolved_user_status,
userType=resolved_user_type,
),
timed_request_timeout_ms=LOCK_TIMED_REQUEST_TIMEOUT_MS,
)
status_code = _get_attr(set_cred_response, "status")
status_str = SET_CREDENTIAL_STATUS_MAP.get(status_code, f"unknown({status_code})")
if status_str != "success":
raise SetCredentialFailedError(
translation_domain="matter",
translation_key="set_credential_failed",
translation_placeholders={"status": status_str},
)
return SetLockCredentialResult(
credential_index=credential_index,
user_index=_get_attr(set_cred_response, "userIndex"),
next_credential_index=_get_attr(set_cred_response, "nextCredentialIndex"),
)
async def clear_lock_credential(
matter_client: MatterClient,
node: MatterNode,
*,
credential_type: str,
credential_index: int,
) -> None:
"""Clear a credential from the lock.
Raises HomeAssistantError on failure.
"""
lock_endpoint = _get_lock_endpoint_or_raise(node)
_ensure_usr_support(lock_endpoint)
cred_type_int = CREDENTIAL_TYPE_REVERSE_MAP[credential_type]
await matter_client.send_device_command(
node_id=node.node_id,
endpoint_id=lock_endpoint.endpoint_id,
command=clusters.DoorLock.Commands.ClearCredential(
credential=clusters.DoorLock.Structs.CredentialStruct(
credentialType=cred_type_int,
credentialIndex=credential_index,
),
),
timed_request_timeout_ms=LOCK_TIMED_REQUEST_TIMEOUT_MS,
)
async def get_lock_credential_status(
matter_client: MatterClient,
node: MatterNode,
*,
credential_type: str,
credential_index: int,
) -> GetLockCredentialStatusResult:
"""Get the status of a credential slot on the lock.
Returns typed dict with credential_exists, user_index, next_credential_index.
Raises HomeAssistantError on failure.
"""
lock_endpoint = _get_lock_endpoint_or_raise(node)
_ensure_usr_support(lock_endpoint)
cred_type_int = CREDENTIAL_TYPE_REVERSE_MAP[credential_type]
response = await matter_client.send_device_command(
node_id=node.node_id,
endpoint_id=lock_endpoint.endpoint_id,
command=clusters.DoorLock.Commands.GetCredentialStatus(
credential=clusters.DoorLock.Structs.CredentialStruct(
credentialType=cred_type_int,
credentialIndex=credential_index,
),
),
)
return GetLockCredentialStatusResult(
credential_exists=bool(_get_attr(response, "credentialExists")),
user_index=_get_attr(response, "userIndex"),
next_credential_index=_get_attr(response, "nextCredentialIndex"),
)

View File

@@ -4,11 +4,27 @@ from __future__ import annotations
import voluptuous as vol
from homeassistant.components.lock import DOMAIN as LOCK_DOMAIN
from homeassistant.components.water_heater import DOMAIN as WATER_HEATER_DOMAIN
from homeassistant.core import HomeAssistant, callback
from homeassistant.core import HomeAssistant, SupportsResponse, callback
from homeassistant.helpers import config_validation as cv, service
from .const import DOMAIN
from .const import (
ATTR_CREDENTIAL_DATA,
ATTR_CREDENTIAL_INDEX,
ATTR_CREDENTIAL_RULE,
ATTR_CREDENTIAL_TYPE,
ATTR_USER_INDEX,
ATTR_USER_NAME,
ATTR_USER_STATUS,
ATTR_USER_TYPE,
CLEAR_ALL_INDEX,
CREDENTIAL_RULE_REVERSE_MAP,
CREDENTIAL_TYPE_REVERSE_MAP,
DOMAIN,
SERVICE_CREDENTIAL_TYPES,
USER_TYPE_REVERSE_MAP,
)
ATTR_DURATION = "duration"
ATTR_EMERGENCY_BOOST = "emergency_boost"
@@ -36,3 +52,108 @@ def async_setup_services(hass: HomeAssistant) -> None:
},
func="async_set_boost",
)
# Lock services - Full user CRUD
service.async_register_platform_entity_service(
hass,
DOMAIN,
"set_lock_user",
entity_domain=LOCK_DOMAIN,
schema={
vol.Optional(ATTR_USER_INDEX): vol.All(vol.Coerce(int), vol.Range(min=1)),
vol.Optional(ATTR_USER_NAME): vol.Any(str, None),
vol.Optional(ATTR_USER_TYPE): vol.In(USER_TYPE_REVERSE_MAP.keys()),
vol.Optional(ATTR_CREDENTIAL_RULE): vol.In(
CREDENTIAL_RULE_REVERSE_MAP.keys()
),
},
func="async_set_lock_user",
)
service.async_register_platform_entity_service(
hass,
DOMAIN,
"clear_lock_user",
entity_domain=LOCK_DOMAIN,
schema={
vol.Required(ATTR_USER_INDEX): vol.All(
vol.Coerce(int),
vol.Any(vol.Range(min=1), CLEAR_ALL_INDEX),
),
},
func="async_clear_lock_user",
)
# Lock services - Query operations
service.async_register_platform_entity_service(
hass,
DOMAIN,
"get_lock_info",
entity_domain=LOCK_DOMAIN,
schema={},
func="async_get_lock_info",
supports_response=SupportsResponse.ONLY,
)
service.async_register_platform_entity_service(
hass,
DOMAIN,
"get_lock_users",
entity_domain=LOCK_DOMAIN,
schema={},
func="async_get_lock_users",
supports_response=SupportsResponse.ONLY,
)
# Lock services - Credential management
service.async_register_platform_entity_service(
hass,
DOMAIN,
"set_lock_credential",
entity_domain=LOCK_DOMAIN,
schema={
vol.Required(ATTR_CREDENTIAL_TYPE): vol.In(SERVICE_CREDENTIAL_TYPES),
vol.Required(ATTR_CREDENTIAL_DATA): str,
vol.Optional(ATTR_CREDENTIAL_INDEX): vol.All(
vol.Coerce(int), vol.Range(min=0)
),
vol.Optional(ATTR_USER_INDEX): vol.All(vol.Coerce(int), vol.Range(min=1)),
vol.Optional(ATTR_USER_STATUS): vol.In(
["occupied_enabled", "occupied_disabled"]
),
vol.Optional(ATTR_USER_TYPE): vol.In(USER_TYPE_REVERSE_MAP.keys()),
},
func="async_set_lock_credential",
supports_response=SupportsResponse.ONLY,
)
service.async_register_platform_entity_service(
hass,
DOMAIN,
"clear_lock_credential",
entity_domain=LOCK_DOMAIN,
schema={
vol.Required(ATTR_CREDENTIAL_TYPE): vol.In(SERVICE_CREDENTIAL_TYPES),
vol.Required(ATTR_CREDENTIAL_INDEX): vol.All(
vol.Coerce(int), vol.Range(min=0)
),
},
func="async_clear_lock_credential",
)
service.async_register_platform_entity_service(
hass,
DOMAIN,
"get_lock_credential_status",
entity_domain=LOCK_DOMAIN,
schema={
vol.Required(ATTR_CREDENTIAL_TYPE): vol.In(
CREDENTIAL_TYPE_REVERSE_MAP.keys()
),
vol.Required(ATTR_CREDENTIAL_INDEX): vol.All(
vol.Coerce(int), vol.Range(min=0)
),
},
func="async_get_lock_credential_status",
supports_response=SupportsResponse.ONLY,
)

View File

@@ -1,3 +1,177 @@
clear_lock_credential:
target:
entity:
domain: lock
integration: matter
fields:
credential_type:
selector:
select:
options:
- pin
- rfid
- fingerprint
- finger_vein
- face
required: true
credential_index:
selector:
number:
min: 0
max: 65534
step: 1
mode: box
required: true
clear_lock_user:
target:
entity:
domain: lock
integration: matter
fields:
user_index:
selector:
number:
min: 1
max: 65534
step: 1
mode: box
required: true
get_lock_credential_status:
target:
entity:
domain: lock
integration: matter
fields:
credential_type:
selector:
select:
options:
- programming_pin
- pin
- rfid
- fingerprint
- finger_vein
- face
- aliro_credential_issuer_key
- aliro_evictable_endpoint_key
- aliro_non_evictable_endpoint_key
required: true
credential_index:
selector:
number:
min: 0
max: 65534
step: 1
mode: box
required: true
get_lock_info:
target:
entity:
domain: lock
integration: matter
get_lock_users:
target:
entity:
domain: lock
integration: matter
set_lock_credential:
target:
entity:
domain: lock
integration: matter
fields:
credential_type:
selector:
select:
options:
- pin
- rfid
- fingerprint
- finger_vein
- face
required: true
credential_data:
selector:
text:
required: true
credential_index:
selector:
number:
min: 0
max: 65534
step: 1
mode: box
user_index:
selector:
number:
min: 1
max: 65534
step: 1
mode: box
user_status:
selector:
select:
options:
- occupied_enabled
- occupied_disabled
user_type:
selector:
select:
options:
- unrestricted_user
- year_day_schedule_user
- week_day_schedule_user
- programming_user
- non_access_user
- forced_user
- disposable_user
- expiring_user
- schedule_restricted_user
- remote_only_user
set_lock_user:
target:
entity:
domain: lock
integration: matter
fields:
user_index:
selector:
number:
min: 1
max: 255
step: 1
mode: box
user_name:
selector:
text:
user_type:
selector:
select:
options:
- unrestricted_user
- year_day_schedule_user
- week_day_schedule_user
- programming_user
- non_access_user
- forced_user
- disposable_user
- expiring_user
- schedule_restricted_user
- remote_only_user
credential_rule:
selector:
select:
options:
- single
- dual
- tri
water_heater_boost:
target:
entity:

View File

@@ -619,6 +619,17 @@
}
}
},
"exceptions": {
"credential_type_not_supported": {
"message": "The lock does not support credential type `{credential_type}`."
},
"invalid_credential_data": {
"message": "Invalid credential data: {reason}."
},
"set_credential_failed": {
"message": "Failed to set credential: lock returned status `{status}`."
}
},
"issues": {
"server_version_version_too_new": {
"description": "The version of the Matter Server you are currently running is too new for this version of Home Assistant. Please update Home Assistant or downgrade the Matter Server to an older version to fix this issue.",
@@ -630,6 +641,52 @@
}
},
"services": {
"clear_lock_credential": {
"description": "Removes a credential from the lock.",
"fields": {
"credential_index": {
"description": "The credential slot index to clear.",
"name": "Credential index"
},
"credential_type": {
"description": "The type of credential to clear.",
"name": "Credential type"
}
},
"name": "Clear lock credential"
},
"clear_lock_user": {
"description": "Deletes a lock user and all associated credentials. Use index 65534 to clear all users.",
"fields": {
"user_index": {
"description": "The user slot index (1-based) to clear, or 65534 to clear all.",
"name": "User index"
}
},
"name": "Clear lock user"
},
"get_lock_credential_status": {
"description": "Returns the status of a credential slot on the lock.",
"fields": {
"credential_index": {
"description": "The credential slot index to query.",
"name": "Credential index"
},
"credential_type": {
"description": "The type of credential to query.",
"name": "Credential type"
}
},
"name": "Get lock credential status"
},
"get_lock_info": {
"description": "Returns lock capabilities including supported credential types, user capacity, and PIN length constraints.",
"name": "Get lock info"
},
"get_lock_users": {
"description": "Returns all users configured on the lock with their credentials.",
"name": "Get lock users"
},
"open_commissioning_window": {
"description": "Allows adding one of your devices to another Matter network by opening the commissioning window for this Matter device for 60 seconds.",
"fields": {
@@ -640,6 +697,58 @@
},
"name": "Open commissioning window"
},
"set_lock_credential": {
"description": "Adds or updates a credential on the lock.",
"fields": {
"credential_data": {
"description": "The credential data. For PIN: digits only. For RFID: hexadecimal string.",
"name": "Credential data"
},
"credential_index": {
"description": "The credential slot index. Leave empty to auto-find an available slot.",
"name": "Credential index"
},
"credential_type": {
"description": "The type of credential (e.g., pin, rfid, fingerprint).",
"name": "Credential type"
},
"user_index": {
"description": "The user index to associate the credential with. Leave empty for automatic assignment.",
"name": "User index"
},
"user_status": {
"description": "The user status to set when creating a new user for this credential.",
"name": "User status"
},
"user_type": {
"description": "The user type to set when creating a new user for this credential.",
"name": "User type"
}
},
"name": "Set lock credential"
},
"set_lock_user": {
"description": "Creates or updates a lock user.",
"fields": {
"credential_rule": {
"description": "The credential rule for the user.",
"name": "Credential rule"
},
"user_index": {
"description": "The user slot index (1-based). Leave empty to auto-find an available slot.",
"name": "User index"
},
"user_name": {
"description": "The name for the user.",
"name": "User name"
},
"user_type": {
"description": "The type of user to create.",
"name": "User type"
}
},
"name": "Set lock user"
},
"water_heater_boost": {
"description": "Enables water heater boost for a specific duration.",
"fields": {

View File

@@ -128,6 +128,7 @@ class MonopriceZone(MediaPlayerEntity):
)
_attr_has_entity_name = True
_attr_name = None
_attr_volume_step = 1 / MAX_VOLUME
def __init__(self, monoprice, sources, namespace, zone_id):
"""Initialize new zone."""
@@ -211,17 +212,3 @@ class MonopriceZone(MediaPlayerEntity):
def set_volume_level(self, volume: float) -> None:
"""Set volume level, range 0..1."""
self._monoprice.set_volume(self._zone_id, round(volume * MAX_VOLUME))
def volume_up(self) -> None:
"""Volume up the media player."""
if self.volume_level is None:
return
volume = round(self.volume_level * MAX_VOLUME)
self._monoprice.set_volume(self._zone_id, min(volume + 1, MAX_VOLUME))
def volume_down(self) -> None:
"""Volume down media player."""
if self.volume_level is None:
return
volume = round(self.volume_level * MAX_VOLUME)
self._monoprice.set_volume(self._zone_id, max(volume - 1, 0))

View File

@@ -93,6 +93,7 @@ class MpdDevice(MediaPlayerEntity):
_attr_media_content_type = MediaType.MUSIC
_attr_has_entity_name = True
_attr_name = None
_attr_volume_step = 0.05
def __init__(
self, server: str, port: int, password: str | None, unique_id: str
@@ -393,24 +394,6 @@ class MpdDevice(MediaPlayerEntity):
if "volume" in self._status:
await self._client.setvol(int(volume * 100))
async def async_volume_up(self) -> None:
"""Service to send the MPD the command for volume up."""
async with self.connection():
if "volume" in self._status:
current_volume = int(self._status["volume"])
if current_volume <= 100:
self._client.setvol(current_volume + 5)
async def async_volume_down(self) -> None:
"""Service to send the MPD the command for volume down."""
async with self.connection():
if "volume" in self._status:
current_volume = int(self._status["volume"])
if current_volume >= 0:
await self._client.setvol(current_volume - 5)
async def async_media_play(self) -> None:
"""Service to send the MPD the command for play/pause."""
async with self.connection():

View File

@@ -107,7 +107,6 @@ ABBREVIATIONS = {
"modes": "modes",
"name": "name",
"o": "origin",
"obj_id": "object_id",
"off_dly": "off_delay",
"on_cmd_type": "on_command_type",
"ops": "options",

View File

@@ -268,7 +268,6 @@ CONF_VIA_DEVICE = "via_device"
CONF_DEPRECATED_VIA_HUB = "via_hub"
CONF_SUGGESTED_AREA = "suggested_area"
CONF_CONFIGURATION_URL = "configuration_url"
CONF_OBJECT_ID = "object_id"
CONF_SUPPORT_URL = "support_url"
DEFAULT_ALARM_CONTROL_PANEL_COMMAND_TEMPLATE = "{{action}}"

View File

@@ -29,7 +29,6 @@ from homeassistant.const import (
CONF_MODEL_ID,
CONF_NAME,
CONF_UNIQUE_ID,
CONF_URL,
CONF_VALUE_TEMPLATE,
)
from homeassistant.core import Event, HassJobType, HomeAssistant, callback
@@ -84,8 +83,6 @@ from .const import (
CONF_JSON_ATTRS_TEMPLATE,
CONF_JSON_ATTRS_TOPIC,
CONF_MANUFACTURER,
CONF_OBJECT_ID,
CONF_ORIGIN,
CONF_PAYLOAD_AVAILABLE,
CONF_PAYLOAD_NOT_AVAILABLE,
CONF_QOS,
@@ -1412,58 +1409,12 @@ class MqttEntity(
"""Set entity_id from default_entity_id if defined in config."""
object_id: str
default_entity_id: str | None
# Setting the default entity_id through the CONF_OBJECT_ID is deprecated
# Support will be removed with HA Core 2026.4
if (
CONF_DEFAULT_ENTITY_ID not in self._config
and CONF_OBJECT_ID not in self._config
):
return
if (default_entity_id := self._config.get(CONF_DEFAULT_ENTITY_ID)) is None:
object_id = self._config[CONF_OBJECT_ID]
else:
_, _, object_id = default_entity_id.partition(".")
return
_, _, object_id = default_entity_id.partition(".")
self.entity_id = async_generate_entity_id(
self._entity_id_format, object_id, None, self.hass
)
if CONF_OBJECT_ID in self._config:
domain = self.entity_id.split(".")[0]
if not self._discovery:
async_create_issue(
self.hass,
DOMAIN,
self.entity_id,
issue_domain=DOMAIN,
is_fixable=False,
breaks_in_ha_version="2026.4",
severity=IssueSeverity.WARNING,
learn_more_url=f"{learn_more_url(domain)}#default_enity_id",
translation_placeholders={
"entity_id": self.entity_id,
"object_id": self._config[CONF_OBJECT_ID],
"domain": domain,
},
translation_key="deprecated_object_id",
)
elif CONF_DEFAULT_ENTITY_ID not in self._config:
if CONF_ORIGIN in self._config:
origin_name = self._config[CONF_ORIGIN][CONF_NAME]
url = self._config[CONF_ORIGIN].get(CONF_URL)
origin = f"[{origin_name}]({url})" if url else origin_name
else:
origin = "the integration"
_LOGGER.warning(
"The configuration for entity %s uses the deprecated option "
"`object_id` to set the default entity id. Replace the "
'`"object_id": "%s"` option with `"default_entity_id": '
'"%s"` in your published discovery configuration to fix this '
"issue, or contact the maintainer of %s that published this config "
"to fix this. This will stop working in Home Assistant Core 2026.4",
self.entity_id,
self._config[CONF_OBJECT_ID],
f"{domain}.{self._config[CONF_OBJECT_ID]}",
origin,
)
if self.unique_id is None:
return
@@ -1475,7 +1426,8 @@ class MqttEntity(
(entity_platform, DOMAIN, self.unique_id)
)
) and deleted_entry.entity_id != self.entity_id:
# Plan to update the entity_id basis on `object_id` if a deleted entity was found
# Plan to update the entity_id based on `default_entity_id`
# if a deleted entity was found
self._update_registry_entity_id = self.entity_id
@final

View File

@@ -42,7 +42,6 @@ from .const import (
CONF_JSON_ATTRS_TEMPLATE,
CONF_JSON_ATTRS_TOPIC,
CONF_MANUFACTURER,
CONF_OBJECT_ID,
CONF_ORIGIN,
CONF_PAYLOAD_AVAILABLE,
CONF_PAYLOAD_NOT_AVAILABLE,
@@ -173,7 +172,6 @@ MQTT_ENTITY_COMMON_SCHEMA = _MQTT_AVAILABILITY_SCHEMA.extend(
vol.Optional(CONF_JSON_ATTRS_TOPIC): valid_subscribe_topic,
vol.Optional(CONF_JSON_ATTRS_TEMPLATE): cv.template,
vol.Optional(CONF_DEFAULT_ENTITY_ID): cv.string,
vol.Optional(CONF_OBJECT_ID): cv.string,
vol.Optional(CONF_UNIQUE_ID): cv.string,
}
)

View File

@@ -1116,10 +1116,6 @@
}
},
"issues": {
"deprecated_object_id": {
"description": "Entity {entity_id} uses the `object_id` option which is deprecated. To fix the issue, replace the `object_id: {object_id}` option with `default_entity_id: {domain}.{object_id}` in your \"configuration.yaml\", and restart Home Assistant.",
"title": "Deprecated option object_id used"
},
"invalid_platform_config": {
"description": "Home Assistant detected an invalid config for a manually configured item.\n\nPlatform domain: **{domain}**\nConfiguration file: **{config_file}**\nNear line: **{line}**\nConfiguration found:\n```yaml\n{config}\n```\nError: **{error}**.\n\nMake sure the configuration is valid and [reload](/config/developer-tools/yaml) the manually configured MQTT items or restart Home Assistant to fix this issue.",
"title": "Invalid config found for MQTT {domain} item"

View File

@@ -198,8 +198,10 @@ class NADtcp(MediaPlayerEntity):
self._nad_receiver = NADReceiverTCP(config.get(CONF_HOST))
self._min_vol = (config[CONF_MIN_VOLUME] + 90) * 2 # from dB to nad vol (0-200)
self._max_vol = (config[CONF_MAX_VOLUME] + 90) * 2 # from dB to nad vol (0-200)
self._volume_step = config[CONF_VOLUME_STEP]
self._nad_volume = None
vol_range = self._max_vol - self._min_vol
if vol_range:
self._attr_volume_step = 2 * config[CONF_VOLUME_STEP] / vol_range
self._source_list = self._nad_receiver.available_sources()
def turn_off(self) -> None:
@@ -210,14 +212,6 @@ class NADtcp(MediaPlayerEntity):
"""Turn the media player on."""
self._nad_receiver.power_on()
def volume_up(self) -> None:
"""Step volume up in the configured increments."""
self._nad_receiver.set_volume(self._nad_volume + 2 * self._volume_step)
def volume_down(self) -> None:
"""Step volume down in the configured increments."""
self._nad_receiver.set_volume(self._nad_volume - 2 * self._volume_step)
def set_volume_level(self, volume: float) -> None:
"""Set volume level, range 0..1."""
nad_volume_to_set = int(

View File

@@ -4,13 +4,19 @@ from __future__ import annotations
import asyncio
from powerfox import DeviceType, Powerfox, PowerfoxConnectionError
from powerfox import (
DeviceType,
Powerfox,
PowerfoxAuthenticationError,
PowerfoxConnectionError,
)
from homeassistant.const import CONF_EMAIL, CONF_PASSWORD, Platform
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.exceptions import ConfigEntryAuthFailed, ConfigEntryNotReady
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from .const import DOMAIN
from .coordinator import (
PowerfoxConfigEntry,
PowerfoxDataUpdateCoordinator,
@@ -30,9 +36,18 @@ async def async_setup_entry(hass: HomeAssistant, entry: PowerfoxConfigEntry) ->
try:
devices = await client.all_devices()
except PowerfoxAuthenticationError as err:
await client.close()
raise ConfigEntryAuthFailed(
translation_domain=DOMAIN,
translation_key="auth_failed",
) from err
except PowerfoxConnectionError as err:
await client.close()
raise ConfigEntryNotReady from err
raise ConfigEntryNotReady(
translation_domain=DOMAIN,
translation_key="connection_error",
) from err
coordinators: list[
PowerfoxDataUpdateCoordinator | PowerfoxReportDataUpdateCoordinator

View File

@@ -59,18 +59,24 @@ class PowerfoxBaseCoordinator[T](DataUpdateCoordinator[T]):
except PowerfoxAuthenticationError as err:
raise ConfigEntryAuthFailed(
translation_domain=DOMAIN,
translation_key="invalid_auth",
translation_placeholders={"error": str(err)},
translation_key="auth_failed",
) from err
except (
PowerfoxConnectionError,
PowerfoxNoDataError,
PowerfoxPrivacyError,
) as err:
except PowerfoxConnectionError as err:
raise UpdateFailed(
translation_domain=DOMAIN,
translation_key="update_failed",
translation_placeholders={"error": str(err)},
translation_key="connection_error",
) from err
except PowerfoxNoDataError as err:
raise UpdateFailed(
translation_domain=DOMAIN,
translation_key="no_data_error",
translation_placeholders={"device_name": self.device.name},
) from err
except PowerfoxPrivacyError as err:
raise UpdateFailed(
translation_domain=DOMAIN,
translation_key="privacy_error",
translation_placeholders={"device_name": self.device.name},
) from err
async def _async_fetch_data(self) -> T:

View File

@@ -116,11 +116,17 @@
}
},
"exceptions": {
"invalid_auth": {
"message": "Error while authenticating with the Powerfox service: {error}"
"auth_failed": {
"message": "Authentication with the Powerfox service failed. Please re-authenticate your account."
},
"update_failed": {
"message": "Error while updating the Powerfox service: {error}"
"connection_error": {
"message": "Could not connect to the Powerfox service. Please check your network connection."
},
"no_data_error": {
"message": "No data available for device \"{device_name}\". The device may not have reported data yet."
},
"privacy_error": {
"message": "Data for device \"{device_name}\" is restricted due to privacy settings in the Powerfox app."
}
}
}

View File

@@ -162,6 +162,11 @@
}
}
},
"exceptions": {
"missing_output_access_code": {
"message": "Cannot control switchable outputs because no user code is configured for this Satel Integra entry. Configure a code in the integration options to enable output control."
}
},
"issues": {
"deprecated_yaml_import_issue_cannot_connect": {
"description": "Configuring {integration_title} using YAML is being removed but there was an connection error importing your existing configuration.\n\nEnsure connection to {integration_title} works and restart Home Assistant to try again or remove the `{domain}` YAML configuration from your configuration.yaml file and add the {integration_title} integration manually.",

View File

@@ -8,9 +8,14 @@ from homeassistant.components.switch import SwitchEntity
from homeassistant.config_entries import ConfigSubentry
from homeassistant.const import CONF_CODE
from homeassistant.core import HomeAssistant, callback
from homeassistant.exceptions import ServiceValidationError
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from .const import CONF_SWITCHABLE_OUTPUT_NUMBER, SUBENTRY_TYPE_SWITCHABLE_OUTPUT
from .const import (
CONF_SWITCHABLE_OUTPUT_NUMBER,
DOMAIN,
SUBENTRY_TYPE_SWITCHABLE_OUTPUT,
)
from .coordinator import SatelConfigEntry, SatelIntegraOutputsCoordinator
from .entity import SatelIntegraEntity
@@ -83,12 +88,24 @@ class SatelIntegraSwitch(
async def async_turn_on(self, **kwargs: Any) -> None:
"""Turn the device on."""
if self._code is None:
raise ServiceValidationError(
translation_domain=DOMAIN,
translation_key="missing_output_access_code",
)
await self._controller.set_output(self._code, self._device_number, True)
self._attr_is_on = True
self.async_write_ha_state()
async def async_turn_off(self, **kwargs: Any) -> None:
"""Turn the device off."""
if self._code is None:
raise ServiceValidationError(
translation_domain=DOMAIN,
translation_key="missing_output_access_code",
)
await self._controller.set_output(self._code, self._device_number, False)
self._attr_is_on = False
self.async_write_ha_state()

View File

@@ -66,6 +66,7 @@ from .repairs import (
from .services import async_setup_services
from .utils import (
async_create_issue_unsupported_firmware,
async_migrate_rpc_sensor_description_unique_ids,
async_migrate_rpc_virtual_components_unique_ids,
get_coap_context,
get_device_entry_gen,
@@ -296,6 +297,12 @@ async def _async_setup_rpc_entry(hass: HomeAssistant, entry: ShellyConfigEntry)
runtime_data = entry.runtime_data
runtime_data.platforms = RPC_SLEEPING_PLATFORMS
await er.async_migrate_entries(
hass,
entry.entry_id,
async_migrate_rpc_sensor_description_unique_ids,
)
if sleep_period == 0:
# Not a sleeping device, finish setup
LOGGER.debug("Setting up online RPC device %s", entry.title)

View File

@@ -1220,7 +1220,7 @@ RPC_SENSORS: Final = {
entity_category=EntityCategory.DIAGNOSTIC,
use_polling_coordinator=True,
),
"temperature_0": RpcSensorDescription(
"temperature_tc": RpcSensorDescription(
key="temperature",
sub_key="tC",
native_unit_of_measurement=UnitOfTemperature.CELSIUS,
@@ -1249,7 +1249,7 @@ RPC_SENSORS: Final = {
entity_category=EntityCategory.DIAGNOSTIC,
use_polling_coordinator=True,
),
"humidity_0": RpcSensorDescription(
"humidity_rh": RpcSensorDescription(
key="humidity",
sub_key="rh",
native_unit_of_measurement=PERCENTAGE,

View File

@@ -969,6 +969,30 @@ def format_ble_addr(ble_addr: str) -> str:
return ble_addr.replace(":", "").upper()
@callback
def async_migrate_rpc_sensor_description_unique_ids(
entity_entry: er.RegistryEntry,
) -> dict[str, Any] | None:
"""Migrate RPC sensor unique_ids after sensor description key rename."""
unique_id_map = {
"-temperature_0": "-temperature_tc",
"-humidity_0": "-humidity_rh",
}
for old_suffix, new_suffix in unique_id_map.items():
if entity_entry.unique_id.endswith(old_suffix):
new_unique_id = entity_entry.unique_id.removesuffix(old_suffix) + new_suffix
LOGGER.debug(
"Migrating unique_id for %s entity from [%s] to [%s]",
entity_entry.entity_id,
entity_entry.unique_id,
new_unique_id,
)
return {"new_unique_id": new_unique_id}
return None
@callback
def async_migrate_rpc_virtual_components_unique_ids(
config: dict[str, Any], entity_entry: er.RegistryEntry

View File

@@ -219,6 +219,9 @@
"sanitizing_wash": {
"default": "mdi:lotion"
},
"sound_detection": {
"default": "mdi:home-sound-in"
},
"sound_effect": {
"default": "mdi:volume-high",
"state": {

View File

@@ -904,6 +904,9 @@
"sanitizing_wash": {
"name": "Sanitizing wash"
},
"sound_detection": {
"name": "Sound detection"
},
"sound_effect": {
"name": "Sound effect"
},
@@ -919,6 +922,20 @@
"wrinkle_prevent": {
"name": "Wrinkle prevent"
}
},
"vacuum": {
"vacuum": {
"state_attributes": {
"fan_speed": {
"state": {
"maximum": "Maximum",
"normal": "Normal",
"quiet": "Quiet",
"smart": "Smart"
}
}
}
}
}
},
"exceptions": {

View File

@@ -170,6 +170,15 @@ CAPABILITY_TO_SWITCHES: dict[Capability | str, SmartThingsSwitchEntityDescriptio
on_command=Command.DO_NOT_DISTURB_ON,
off_command=Command.DO_NOT_DISTURB_OFF,
),
Capability.SOUND_DETECTION: SmartThingsSwitchEntityDescription(
key=Capability.SOUND_DETECTION,
translation_key="sound_detection",
status_attribute=Attribute.SOUND_DETECTION_STATE,
entity_category=EntityCategory.CONFIG,
on_key="enabled",
on_command=Command.ENABLE_SOUND_DETECTION,
off_command=Command.DISABLE_SOUND_DETECTION,
),
}
DISHWASHER_WASHING_OPTIONS_TO_SWITCHES: dict[
Attribute | str, SmartThingsDishwasherWashingOptionSwitchEntityDescription

View File

@@ -22,6 +22,15 @@ from .entity import SmartThingsEntity
_LOGGER = logging.getLogger(__name__)
TURBO_MODE_TO_FAN_SPEED = {
"silence": "normal",
"on": "maximum",
"off": "smart",
"extraSilence": "quiet",
}
FAN_SPEED_TO_TURBO_MODE = {v: k for k, v in TURBO_MODE_TO_FAN_SPEED.items()}
async def async_setup_entry(
hass: HomeAssistant,
@@ -41,20 +50,26 @@ class SamsungJetBotVacuum(SmartThingsEntity, StateVacuumEntity):
"""Representation of a Vacuum."""
_attr_name = None
_attr_supported_features = (
VacuumEntityFeature.START
| VacuumEntityFeature.RETURN_HOME
| VacuumEntityFeature.PAUSE
| VacuumEntityFeature.STATE
)
_attr_translation_key = "vacuum"
def __init__(self, client: SmartThings, device: FullDevice) -> None:
"""Initialize the Samsung robot cleaner vacuum entity."""
super().__init__(
client,
device,
{Capability.SAMSUNG_CE_ROBOT_CLEANER_OPERATING_STATE},
{
Capability.SAMSUNG_CE_ROBOT_CLEANER_OPERATING_STATE,
Capability.ROBOT_CLEANER_TURBO_MODE,
},
)
self._attr_supported_features = (
VacuumEntityFeature.START
| VacuumEntityFeature.RETURN_HOME
| VacuumEntityFeature.PAUSE
| VacuumEntityFeature.STATE
)
if self.supports_capability(Capability.ROBOT_CLEANER_TURBO_MODE):
self._attr_supported_features |= VacuumEntityFeature.FAN_SPEED
@property
def activity(self) -> VacuumActivity | None:
@@ -74,6 +89,23 @@ class SamsungJetBotVacuum(SmartThingsEntity, StateVacuumEntity):
"charging": VacuumActivity.DOCKED,
}.get(status)
@property
def fan_speed_list(self) -> list[str]:
"""Return the list of available fan speeds."""
if not self.supports_capability(Capability.ROBOT_CLEANER_TURBO_MODE):
return []
return list(TURBO_MODE_TO_FAN_SPEED.values())
@property
def fan_speed(self) -> str | None:
"""Return the current fan speed."""
if not self.supports_capability(Capability.ROBOT_CLEANER_TURBO_MODE):
return None
turbo_mode = self.get_attribute_value(
Capability.ROBOT_CLEANER_TURBO_MODE, Attribute.ROBOT_CLEANER_TURBO_MODE
)
return TURBO_MODE_TO_FAN_SPEED.get(turbo_mode)
async def async_start(self) -> None:
"""Start the vacuum's operation."""
await self.execute_device_command(
@@ -93,3 +125,12 @@ class SamsungJetBotVacuum(SmartThingsEntity, StateVacuumEntity):
Capability.SAMSUNG_CE_ROBOT_CLEANER_OPERATING_STATE,
Command.RETURN_TO_HOME,
)
async def async_set_fan_speed(self, fan_speed: str, **kwargs: Any) -> None:
"""Set the fan speed."""
turbo_mode = FAN_SPEED_TO_TURBO_MODE[fan_speed]
await self.execute_device_command(
Capability.ROBOT_CLEANER_TURBO_MODE,
Command.SET_ROBOT_CLEANER_TURBO_MODE,
turbo_mode,
)

View File

@@ -3,7 +3,7 @@
import asyncio
from collections.abc import Callable
from functools import partial
from typing import Final
from typing import Any, Final
from aiohttp import ClientError, ClientResponseError
from tesla_fleet_api.const import Scope
@@ -106,7 +106,7 @@ async def _get_access_token(oauth_session: OAuth2Session) -> str:
translation_domain=DOMAIN,
translation_key="not_ready_connection_error",
) from err
return oauth_session.token[CONF_ACCESS_TOKEN]
return str(oauth_session.token[CONF_ACCESS_TOKEN])
async def async_setup_entry(hass: HomeAssistant, entry: TeslemetryConfigEntry) -> bool:
@@ -227,7 +227,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: TeslemetryConfigEntry) -
stream=stream,
stream_vehicle=stream_vehicle,
vin=vin,
firmware=firmware,
firmware=firmware or "",
device=device,
)
)
@@ -398,10 +398,12 @@ async def async_migrate_entry(
return True
def create_handle_vehicle_stream(vin: str, coordinator) -> Callable[[dict], None]:
def create_handle_vehicle_stream(
vin: str, coordinator: TeslemetryVehicleDataCoordinator
) -> Callable[[dict[str, Any]], None]:
"""Create a handle vehicle stream function."""
def handle_vehicle_stream(data: dict) -> None:
def handle_vehicle_stream(data: dict[str, Any]) -> None:
"""Handle vehicle data from the stream."""
if "vehicle_data" in data:
LOGGER.debug("Streaming received vehicle data from %s", vin)
@@ -450,7 +452,7 @@ def async_setup_energy_device(
async def async_setup_stream(
hass: HomeAssistant, entry: TeslemetryConfigEntry, vehicle: TeslemetryVehicleData
):
) -> None:
"""Set up the stream for a vehicle."""
await vehicle.stream_vehicle.get_config()

View File

@@ -329,11 +329,11 @@ class TeslemetryStreamingClimateEntity(
)
)
def _async_handle_inside_temp(self, data: float | None):
def _async_handle_inside_temp(self, data: float | None) -> None:
self._attr_current_temperature = data
self.async_write_ha_state()
def _async_handle_hvac_power(self, data: str | None):
def _async_handle_hvac_power(self, data: str | None) -> None:
self._attr_hvac_mode = (
None
if data is None
@@ -343,15 +343,15 @@ class TeslemetryStreamingClimateEntity(
)
self.async_write_ha_state()
def _async_handle_climate_keeper_mode(self, data: str | None):
def _async_handle_climate_keeper_mode(self, data: str | None) -> None:
self._attr_preset_mode = PRESET_MODES.get(data) if data else None
self.async_write_ha_state()
def _async_handle_hvac_temperature_request(self, data: float | None):
def _async_handle_hvac_temperature_request(self, data: float | None) -> None:
self._attr_target_temperature = data
self.async_write_ha_state()
def _async_handle_rhd(self, data: bool | None):
def _async_handle_rhd(self, data: bool | None) -> None:
if data is not None:
self.rhd = data
@@ -538,15 +538,15 @@ class TeslemetryStreamingCabinOverheatProtectionEntity(
)
)
def _async_handle_inside_temp(self, value: float | None):
def _async_handle_inside_temp(self, value: float | None) -> None:
self._attr_current_temperature = value
self.async_write_ha_state()
def _async_handle_protection_mode(self, value: str | None):
def _async_handle_protection_mode(self, value: str | None) -> None:
self._attr_hvac_mode = COP_MODES.get(value) if value is not None else None
self.async_write_ha_state()
def _async_handle_temperature_limit(self, value: str | None):
def _async_handle_temperature_limit(self, value: str | None) -> None:
self._attr_target_temperature = (
COP_LEVELS.get(value) if value is not None else None
)

View File

@@ -70,7 +70,7 @@ class TeslemetryVehicleDataCoordinator(DataUpdateCoordinator[dict[str, Any]]):
hass: HomeAssistant,
config_entry: TeslemetryConfigEntry,
api: Vehicle,
product: dict,
product: dict[str, Any],
) -> None:
"""Initialize Teslemetry Vehicle Update Coordinator."""
super().__init__(
@@ -119,7 +119,7 @@ class TeslemetryEnergySiteLiveCoordinator(DataUpdateCoordinator[dict[str, Any]])
hass: HomeAssistant,
config_entry: TeslemetryConfigEntry,
api: EnergySite,
data: dict,
data: dict[str, Any],
) -> None:
"""Initialize Teslemetry Energy Site Live coordinator."""
super().__init__(
@@ -140,7 +140,7 @@ class TeslemetryEnergySiteLiveCoordinator(DataUpdateCoordinator[dict[str, Any]])
async def _async_update_data(self) -> dict[str, Any]:
"""Update energy site data using Teslemetry API."""
try:
data = (await self.api.live_status())["response"]
data: dict[str, Any] = (await self.api.live_status())["response"]
except (InvalidToken, SubscriptionRequired) as e:
raise ConfigEntryAuthFailed from e
except RETRY_EXCEPTIONS as e:
@@ -171,7 +171,7 @@ class TeslemetryEnergySiteInfoCoordinator(DataUpdateCoordinator[dict[str, Any]])
hass: HomeAssistant,
config_entry: TeslemetryConfigEntry,
api: EnergySite,
product: dict,
product: dict[str, Any],
) -> None:
"""Initialize Teslemetry Energy Info coordinator."""
super().__init__(

View File

@@ -199,7 +199,7 @@ class TeslemetryStreamingWindowEntity(
f"Adding field {signal} to {self.vehicle.vin}",
)
def _handle_stream_update(self, data) -> None:
def _handle_stream_update(self, data: dict[str, Any]) -> None:
"""Update the entity attributes."""
change = False

View File

@@ -28,7 +28,7 @@ class TeslemetryRootEntity(Entity):
_attr_has_entity_name = True
scoped: bool
def raise_for_scope(self, scope: Scope):
def raise_for_scope(self, scope: Scope) -> None:
"""Raise an error if a scope is not available."""
if not self.scoped:
raise ServiceValidationError(
@@ -231,11 +231,12 @@ class TeslemetryWallConnectorEntity(TeslemetryPollingEntity):
@property
def _value(self) -> StateType:
"""Return a specific wall connector value from coordinator data."""
return (
value: StateType = (
self.coordinator.data.get("wall_connectors", {})
.get(self.din, {})
.get(self.key)
)
return value
@property
def exists(self) -> bool:

View File

@@ -1,5 +1,6 @@
"""Teslemetry helper functions."""
from collections.abc import Awaitable
from typing import Any
from tesla_fleet_api.exceptions import TeslaFleetError
@@ -30,7 +31,7 @@ def flatten(
return result
async def handle_command(command) -> dict[str, Any]:
async def handle_command(command: Awaitable[dict[str, Any]]) -> dict[str, Any]:
"""Handle a command."""
try:
result = await command
@@ -44,7 +45,7 @@ async def handle_command(command) -> dict[str, Any]:
return result
async def handle_vehicle_command(command) -> Any:
async def handle_vehicle_command(command: Awaitable[dict[str, Any]]) -> Any:
"""Handle a vehicle command."""
result = await handle_command(command)
if (response := result.get("response")) is None:

View File

@@ -3,7 +3,7 @@
from __future__ import annotations
import asyncio
from dataclasses import dataclass
from dataclasses import dataclass, field
from tesla_fleet_api.const import Scope
from tesla_fleet_api.teslemetry import EnergySite, Vehicle
@@ -43,7 +43,7 @@ class TeslemetryVehicleData:
vin: str
firmware: str
device: DeviceInfo
wakelock = asyncio.Lock()
wakelock: asyncio.Lock = field(default_factory=asyncio.Lock)
@dataclass

View File

@@ -66,4 +66,4 @@ rules:
# Platinum
async-dependency: done
inject-websession: done
strict-typing: todo
strict-typing: done

View File

@@ -188,7 +188,7 @@ class TeslemetryStreamingUpdateEntity(
def _async_handle_software_update_download_percent_complete(
self, value: float | None
):
) -> None:
"""Handle software update download percent complete."""
self._download_percentage = round(value) if value is not None else 0
@@ -203,20 +203,22 @@ class TeslemetryStreamingUpdateEntity(
def _async_handle_software_update_installation_percent_complete(
self, value: float | None
):
) -> None:
"""Handle software update installation percent complete."""
self._install_percentage = round(value) if value is not None else 0
self._async_update_progress()
self.async_write_ha_state()
def _async_handle_software_update_scheduled_start_time(self, value: str | None):
def _async_handle_software_update_scheduled_start_time(
self, value: str | None
) -> None:
"""Handle software update scheduled start time."""
self._attr_in_progress = value is not None
self.async_write_ha_state()
def _async_handle_software_update_version(self, value: str | None):
def _async_handle_software_update_version(self, value: str | None) -> None:
"""Handle software update version."""
self._attr_latest_version = (
@@ -224,7 +226,7 @@ class TeslemetryStreamingUpdateEntity(
)
self.async_write_ha_state()
def _async_handle_version(self, value: str | None):
def _async_handle_version(self, value: str | None) -> None:
"""Handle version."""
if value is not None:

View File

@@ -7,5 +7,5 @@
"documentation": "https://www.home-assistant.io/integrations/weheat",
"integration_type": "hub",
"iot_class": "cloud_polling",
"requirements": ["weheat==2026.1.25"]
"requirements": ["weheat==2026.2.28"]
}

10
mypy.ini generated
View File

@@ -5208,6 +5208,16 @@ disallow_untyped_defs = true
warn_return_any = true
warn_unreachable = true
[mypy-homeassistant.components.teslemetry.*]
check_untyped_defs = true
disallow_incomplete_defs = true
disallow_subclassing_any = true
disallow_untyped_calls = true
disallow_untyped_decorators = true
disallow_untyped_defs = true
warn_return_any = true
warn_unreachable = true
[mypy-homeassistant.components.text.*]
check_untyped_defs = true
disallow_incomplete_defs = true

8
requirements_all.txt generated
View File

@@ -906,7 +906,7 @@ enocean==0.50
enturclient==0.2.4
# homeassistant.components.environment_canada
env-canada==0.12.4
env-canada==0.13.2
# homeassistant.components.season
ephem==4.1.6
@@ -1116,7 +1116,7 @@ goslide-api==0.7.0
gotailwind==0.3.0
# homeassistant.components.govee_ble
govee-ble==0.44.0
govee-ble==1.2.0
# homeassistant.components.govee_light_local
govee-local-api==2.3.0
@@ -2128,7 +2128,7 @@ pyhaversion==22.8.0
pyheos==1.0.6
# homeassistant.components.hive
pyhive-integration==1.0.7
pyhive-integration==1.0.8
# homeassistant.components.homematic
pyhomematic==0.1.77
@@ -3256,7 +3256,7 @@ webio-api==0.1.12
webmin-xmlrpc==0.0.2
# homeassistant.components.weheat
weheat==2026.1.25
weheat==2026.2.28
# homeassistant.components.whirlpool
whirlpool-sixth-sense==1.0.3

View File

@@ -797,7 +797,7 @@ energyzero==4.0.1
enocean==0.50
# homeassistant.components.environment_canada
env-canada==0.12.4
env-canada==0.13.2
# homeassistant.components.season
ephem==4.1.6
@@ -992,7 +992,7 @@ goslide-api==0.7.0
gotailwind==0.3.0
# homeassistant.components.govee_ble
govee-ble==0.44.0
govee-ble==1.2.0
# homeassistant.components.govee_light_local
govee-local-api==2.3.0
@@ -1817,7 +1817,7 @@ pyhaversion==22.8.0
pyheos==1.0.6
# homeassistant.components.hive
pyhive-integration==1.0.7
pyhive-integration==1.0.8
# homeassistant.components.homematic
pyhomematic==0.1.77
@@ -2741,7 +2741,7 @@ webio-api==0.1.12
webmin-xmlrpc==0.0.2
# homeassistant.components.weheat
weheat==2026.1.25
weheat==2026.2.28
# homeassistant.components.whirlpool
whirlpool-sixth-sense==1.0.3

View File

@@ -2,7 +2,7 @@
from collections.abc import AsyncGenerator, Generator, Iterable
import datetime
from unittest.mock import AsyncMock, patch
from unittest.mock import DEFAULT, AsyncMock, patch
from anthropic.pagination import AsyncPage
from anthropic.types import (
@@ -239,8 +239,10 @@ def mock_create_stream() -> Generator[AsyncMock]:
"anthropic.resources.messages.AsyncMessages.create",
new_callable=AsyncMock,
) as mock_create:
mock_create.side_effect = lambda **kwargs: mock_generator(
mock_create.return_value.pop(0), **kwargs
mock_create.side_effect = lambda **kwargs: (
mock_generator(mock_create.return_value.pop(0), **kwargs)
if isinstance(mock_create.return_value, list)
else DEFAULT
)
yield mock_create

View File

@@ -3,6 +3,7 @@
from pathlib import Path
from unittest.mock import AsyncMock, patch
from anthropic.types import Message, TextBlock, Usage
from freezegun import freeze_time
import pytest
from syrupy.assertion import SnapshotAssertion
@@ -71,7 +72,6 @@ async def test_empty_data(
mock_config_entry: MockConfigEntry,
mock_init_component,
mock_create_stream: AsyncMock,
entity_registry: er.EntityRegistry,
) -> None:
"""Test AI Task data generation but the data returned is empty."""
mock_create_stream.return_value = [create_content_block(0, [""])]
@@ -87,6 +87,31 @@ async def test_empty_data(
)
async def test_stream_wrong_type(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
mock_init_component,
mock_create_stream: AsyncMock,
) -> None:
"""Test error if the response is not a stream."""
mock_create_stream.return_value = Message(
type="message",
id="message_id",
model="claude-opus-4-6",
role="assistant",
content=[TextBlock(type="text", text="This is not a stream")],
usage=Usage(input_tokens=42, output_tokens=42),
)
with pytest.raises(HomeAssistantError, match="Expected a stream of messages"):
await ai_task.async_generate_data(
hass,
task_name="Test Task",
entity_id="ai_task.claude_ai_task",
instructions="Generate test data",
)
@freeze_time("2026-01-01 12:00:00")
async def test_generate_structured_data_legacy(
hass: HomeAssistant,

View File

@@ -8,10 +8,13 @@ from anthropic import AuthenticationError, RateLimitError
from anthropic.types import (
CitationsWebSearchResultLocation,
CitationWebSearchResultLocationParam,
Message,
TextBlock,
TextEditorCodeExecutionCreateResultBlock,
TextEditorCodeExecutionStrReplaceResultBlock,
TextEditorCodeExecutionToolResultError,
TextEditorCodeExecutionViewResultBlock,
Usage,
WebSearchResultBlock,
)
from anthropic.types.text_editor_code_execution_tool_result_block import (
@@ -584,6 +587,68 @@ async def test_refusal(
)
async def test_stream_wrong_type(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
mock_init_component,
mock_create_stream: AsyncMock,
) -> None:
"""Test error if the response is not a stream."""
mock_create_stream.return_value = Message(
type="message",
id="message_id",
model="claude-opus-4-6",
role="assistant",
content=[TextBlock(type="text", text="This is not a stream")],
usage=Usage(input_tokens=42, output_tokens=42),
)
result = await conversation.async_converse(
hass,
"Hi",
None,
Context(),
agent_id="conversation.claude_conversation",
)
assert result.response.response_type == intent.IntentResponseType.ERROR
assert result.response.error_code == "unknown"
assert result.response.speech["plain"]["speech"] == "Expected a stream of messages"
async def test_double_system_messages(
hass: HomeAssistant,
mock_config_entry_with_assist: MockConfigEntry,
mock_init_component,
mock_create_stream: AsyncMock,
) -> None:
"""Test error for two or more system prompts."""
conversation_id = "conversation_id"
with (
chat_session.async_get_chat_session(hass, conversation_id) as session,
conversation.async_get_chat_log(hass, session) as chat_log,
):
chat_log.content = [
conversation.chat_log.SystemContent("You are a helpful assistant."),
conversation.chat_log.SystemContent("And I am the user."),
]
result = await conversation.async_converse(
hass,
"What time is it?",
conversation_id,
Context(),
agent_id="conversation.claude_conversation",
)
assert result.response.response_type == intent.IntentResponseType.ERROR
assert result.response.error_code == "unknown"
assert (
result.response.speech["plain"]["speech"]
== "Unexpected content type in chat log"
)
async def test_extended_thinking(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,

View File

@@ -194,7 +194,7 @@ async def test_zone_services_with_ctl_id(
) -> None:
"""Test calling zone-only services with a non-zone entity_id fail."""
with pytest.raises(ServiceValidationError) as excinfo:
with pytest.raises(ServiceValidationError) as exc_info:
await hass.services.async_call(
DOMAIN,
service,
@@ -203,4 +203,5 @@ async def test_zone_services_with_ctl_id(
blocking=True,
)
assert excinfo.value.translation_key == "zone_only_service"
assert exc_info.value.translation_key == "zone_only_service"
assert exc_info.value.translation_placeholders == {"service": service}

View File

@@ -49,6 +49,9 @@
}
]
},
"merged_pull_request": {
"total": 42
},
"release": {
"name": "v1.0.0",
"url": "https://github.com/octocat/Hello-World/releases/v1.0.0",

File diff suppressed because it is too large Load Diff

View File

@@ -1330,257 +1330,6 @@ async def test_discover_alarm_control_panel(
].discovery_already_discovered
@pytest.mark.parametrize(
("topic", "config", "entity_id", "name", "domain", "deprecation_warning"),
[
(
"homeassistant/alarm_control_panel/object/bla/config",
'{ "name": "Hello World 1", "obj_id": "hello_id", "state_topic": "test-topic", "command_topic": "test-topic" }',
"alarm_control_panel.hello_id",
"Hello World 1",
"alarm_control_panel",
True,
),
(
"homeassistant/binary_sensor/object/bla/config",
'{ "name": "Hello World 2", "obj_id": "hello_id", "state_topic": "test-topic" }',
"binary_sensor.hello_id",
"Hello World 2",
"binary_sensor",
True,
),
(
"homeassistant/button/object/bla/config",
'{ "name": "Hello World button", "obj_id": "hello_id", "command_topic": "test-topic" }',
"button.hello_id",
"Hello World button",
"button",
True,
),
(
"homeassistant/camera/object/bla/config",
'{ "name": "Hello World 3", "obj_id": "hello_id", "state_topic": "test-topic", "topic": "test-topic" }',
"camera.hello_id",
"Hello World 3",
"camera",
True,
),
(
"homeassistant/climate/object/bla/config",
'{ "name": "Hello World 4", "obj_id": "hello_id", "state_topic": "test-topic" }',
"climate.hello_id",
"Hello World 4",
"climate",
True,
),
(
"homeassistant/cover/object/bla/config",
'{ "name": "Hello World 5", "obj_id": "hello_id", "state_topic": "test-topic" }',
"cover.hello_id",
"Hello World 5",
"cover",
True,
),
(
"homeassistant/fan/object/bla/config",
'{ "name": "Hello World 6", "obj_id": "hello_id", "state_topic": "test-topic", "command_topic": "test-topic" }',
"fan.hello_id",
"Hello World 6",
"fan",
True,
),
(
"homeassistant/humidifier/object/bla/config",
'{ "name": "Hello World 7", "obj_id": "hello_id", "state_topic": "test-topic", "target_humidity_command_topic": "test-topic", "command_topic": "test-topic" }',
"humidifier.hello_id",
"Hello World 7",
"humidifier",
True,
),
(
"homeassistant/number/object/bla/config",
'{ "name": "Hello World 8", "obj_id": "hello_id", "state_topic": "test-topic", "command_topic": "test-topic" }',
"number.hello_id",
"Hello World 8",
"number",
True,
),
(
"homeassistant/scene/object/bla/config",
'{ "name": "Hello World 9", "obj_id": "hello_id", "state_topic": "test-topic", "command_topic": "test-topic" }',
"scene.hello_id",
"Hello World 9",
"scene",
True,
),
(
"homeassistant/select/object/bla/config",
'{ "name": "Hello World 10", "obj_id": "hello_id", "state_topic": "test-topic", "options": [ "opt1", "opt2" ], "command_topic": "test-topic" }',
"select.hello_id",
"Hello World 10",
"select",
True,
),
(
"homeassistant/sensor/object/bla/config",
'{ "name": "Hello World 11", "obj_id": "hello_id", "state_topic": "test-topic" }',
"sensor.hello_id",
"Hello World 11",
"sensor",
True,
),
(
"homeassistant/switch/object/bla/config",
'{ "name": "Hello World 12", "obj_id": "hello_id", "state_topic": "test-topic", "command_topic": "test-topic" }',
"switch.hello_id",
"Hello World 12",
"switch",
True,
),
(
"homeassistant/light/object/bla/config",
'{ "name": "Hello World 13", "obj_id": "hello_id", "state_topic": "test-topic", "command_topic": "test-topic" }',
"light.hello_id",
"Hello World 13",
"light",
True,
),
(
"homeassistant/light/object/bla/config",
'{ "name": "Hello World 14", "obj_id": "hello_id", "state_topic": "test-topic", "command_topic": "test-topic", "schema": "json" }',
"light.hello_id",
"Hello World 14",
"light",
True,
),
(
"homeassistant/light/object/bla/config",
'{ "name": "Hello World 15", "obj_id": "hello_id", "state_topic": "test-topic", "command_off_template": "template", "command_on_template": "template", "command_topic": "test-topic", "schema": "template" }',
"light.hello_id",
"Hello World 15",
"light",
True,
),
(
"homeassistant/vacuum/object/bla/config",
'{ "name": "Hello World 16", "obj_id": "hello_id", "state_topic": "test-topic", "schema": "state" }',
"vacuum.hello_id",
"Hello World 16",
"vacuum",
True,
),
(
"homeassistant/valve/object/bla/config",
'{ "name": "Hello World 17", "obj_id": "hello_id", "state_topic": "test-topic" }',
"valve.hello_id",
"Hello World 17",
"valve",
True,
),
(
"homeassistant/lock/object/bla/config",
'{ "name": "Hello World 18", "obj_id": "hello_id", "state_topic": "test-topic", "command_topic": "test-topic" }',
"lock.hello_id",
"Hello World 18",
"lock",
True,
),
(
"homeassistant/device_tracker/object/bla/config",
'{ "name": "Hello World 19", "obj_id": "hello_id", "state_topic": "test-topic" }',
"device_tracker.hello_id",
"Hello World 19",
"device_tracker",
True,
),
(
"homeassistant/binary_sensor/object/bla/config",
'{ "name": "Hello World 2", "obj_id": "hello_id", '
'"o": {"name": "X2mqtt"}, "state_topic": "test-topic" }',
"binary_sensor.hello_id",
"Hello World 2",
"binary_sensor",
True,
),
(
"homeassistant/button/object/bla/config",
'{ "name": "Hello World button", "obj_id": "hello_id", '
'"o": {"name": "X2mqtt", "url": "https://example.com/x2mqtt"}, '
'"command_topic": "test-topic" }',
"button.hello_id",
"Hello World button",
"button",
True,
),
(
"homeassistant/alarm_control_panel/object/bla/config",
'{ "name": "Hello World 1", "def_ent_id": "alarm_control_panel.hello_id", '
'"state_topic": "test-topic", "command_topic": "test-topic" }',
"alarm_control_panel.hello_id",
"Hello World 1",
"alarm_control_panel",
False,
),
(
"homeassistant/binary_sensor/object/bla/config",
'{ "name": "Hello World 2", "def_ent_id": "binary_sensor.hello_id", '
'"o": {"name": "X2mqtt"}, "state_topic": "test-topic" }',
"binary_sensor.hello_id",
"Hello World 2",
"binary_sensor",
False,
),
(
"homeassistant/button/object/bla/config",
'{ "name": "Hello World button", "def_ent_id": "button.hello_id", '
'"o": {"name": "X2mqtt", "url": "https://example.com/x2mqtt"}, '
'"command_topic": "test-topic" }',
"button.hello_id",
"Hello World button",
"button",
False,
),
(
"homeassistant/button/object/bla/config",
'{ "name": "Hello World button", "def_ent_id": "button.hello_id", '
'"obj_id": "hello_id_old", '
'"o": {"name": "X2mqtt", "url": "https://example.com/x2mqtt"}, '
'"command_topic": "test-topic" }',
"button.hello_id",
"Hello World button",
"button",
False,
),
],
)
async def test_discovery_with_object_id(
hass: HomeAssistant,
mqtt_mock_entry: MqttMockHAClientGenerator,
caplog: pytest.LogCaptureFixture,
topic: str,
config: str,
entity_id: str,
name: str,
domain: str,
deprecation_warning: bool,
) -> None:
"""Test discovering an MQTT entity with object_id."""
await mqtt_mock_entry()
async_fire_mqtt_message(hass, topic, config)
await hass.async_block_till_done()
state = hass.states.get(entity_id)
assert state is not None
assert state.name == name
assert (domain, "object bla") in hass.data["mqtt"].discovery_already_discovered
assert (
f"The configuration for entity {domain}.hello_id uses the deprecated option `object_id`"
in caplog.text
) is deprecation_warning
async def test_discovery_with_default_entity_id_for_previous_deleted_entity(
hass: HomeAssistant,
mqtt_mock_entry: MqttMockHAClientGenerator,

View File

@@ -468,40 +468,6 @@ async def test_value_template_fails(
)
@pytest.mark.parametrize(
"hass_config",
[
{
mqtt.DOMAIN: {
sensor.DOMAIN: {
"name": "test",
"state_topic": "test-topic",
"object_id": "test",
}
}
},
],
)
async def test_deprecated_option_object_id_is_used_in_yaml(
hass: HomeAssistant, mqtt_mock_entry: MqttMockHAClientGenerator
) -> None:
"""Test issue registry in case the deprecated option object_id was used in YAML."""
await mqtt_mock_entry()
await hass.async_block_till_done()
state = hass.states.get("sensor.test")
assert state is not None
issue_registry = ir.async_get(hass)
issue = issue_registry.async_get_issue(mqtt.DOMAIN, "sensor.test")
assert issue is not None
assert issue.translation_placeholders == {
"entity_id": "sensor.test",
"object_id": "test",
"domain": "sensor",
}
@pytest.mark.parametrize(
"mqtt_config_subentries_data",
[

View File

@@ -5,6 +5,7 @@ from __future__ import annotations
from unittest.mock import AsyncMock
from powerfox import PowerfoxAuthenticationError, PowerfoxConnectionError
import pytest
from homeassistant.config_entries import ConfigEntryState
from homeassistant.core import HomeAssistant
@@ -45,16 +46,20 @@ async def test_config_entry_not_ready(
assert mock_config_entry.state is ConfigEntryState.SETUP_RETRY
async def test_setup_entry_exception(
@pytest.mark.parametrize("method", ["all_devices", "device"])
async def test_config_entry_auth_failed(
hass: HomeAssistant,
mock_powerfox_client: AsyncMock,
mock_config_entry: MockConfigEntry,
method: str,
) -> None:
"""Test ConfigEntryNotReady when API raises an exception during entry setup."""
"""Test ConfigEntryAuthFailed when authentication fails."""
getattr(mock_powerfox_client, method).side_effect = PowerfoxAuthenticationError
mock_config_entry.add_to_hass(hass)
mock_powerfox_client.device.side_effect = PowerfoxAuthenticationError
await hass.config_entries.async_setup(mock_config_entry.entry_id)
await hass.async_block_till_done()
assert mock_config_entry.state is ConfigEntryState.SETUP_ERROR
flows = hass.config_entries.flow.async_progress()

View File

@@ -6,7 +6,12 @@ from datetime import timedelta
from unittest.mock import AsyncMock, patch
from freezegun.api import FrozenDateTimeFactory
from powerfox import DeviceReport, PowerfoxConnectionError
from powerfox import (
DeviceReport,
PowerfoxConnectionError,
PowerfoxNoDataError,
PowerfoxPrivacyError,
)
import pytest
from syrupy.assertion import SnapshotAssertion
@@ -35,11 +40,16 @@ async def test_all_sensors(
await snapshot_platform(hass, entity_registry, snapshot, mock_config_entry.entry_id)
@pytest.mark.parametrize(
"exception",
[PowerfoxConnectionError, PowerfoxNoDataError, PowerfoxPrivacyError],
)
async def test_update_failed(
hass: HomeAssistant,
mock_powerfox_client: AsyncMock,
mock_config_entry: MockConfigEntry,
freezer: FrozenDateTimeFactory,
exception: Exception,
) -> None:
"""Test entities become unavailable after failed update."""
await setup_integration(hass, mock_config_entry)
@@ -47,7 +57,7 @@ async def test_update_failed(
assert hass.states.get("sensor.poweropti_energy_usage").state is not None
mock_powerfox_client.device.side_effect = PowerfoxConnectionError
mock_powerfox_client.device.side_effect = exception
freezer.tick(timedelta(minutes=5))
async_fire_time_changed(hass)
await hass.async_block_till_done()

View File

@@ -15,12 +15,14 @@ from homeassistant.components.switch import (
)
from homeassistant.const import (
ATTR_ENTITY_ID,
CONF_CODE,
STATE_OFF,
STATE_ON,
STATE_UNKNOWN,
Platform,
)
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ServiceValidationError
from homeassistant.helpers.device_registry import DeviceRegistry
from homeassistant.helpers.entity_registry import EntityRegistry
@@ -176,3 +178,35 @@ async def test_switch_last_reported(
assert first_reported != hass.states.get("switch.switchable_output").last_reported
assert len(events) == 1 # last_reported shall not fire state_changed
async def test_switch_actions_require_code(
hass: HomeAssistant,
mock_satel: AsyncMock,
mock_config_entry_with_subentries: MockConfigEntry,
) -> None:
"""Test switch actions fail when access code is missing."""
await setup_integration(hass, mock_config_entry_with_subentries)
hass.config_entries.async_update_entry(
mock_config_entry_with_subentries, options={CONF_CODE: None}
)
await hass.async_block_till_done()
# Turning the device on or off should raise ServiceValidationError.
with pytest.raises(ServiceValidationError):
await hass.services.async_call(
SWITCH_DOMAIN,
SERVICE_TURN_ON,
{ATTR_ENTITY_ID: "switch.switchable_output"},
blocking=True,
)
with pytest.raises(ServiceValidationError):
await hass.services.async_call(
SWITCH_DOMAIN,
SERVICE_TURN_OFF,
{ATTR_ENTITY_ID: "switch.switchable_output"},
blocking=True,
)

View File

@@ -5902,7 +5902,7 @@
'suggested_object_id': None,
'supported_features': 0,
'translation_key': None,
'unique_id': '123456789ABC-humidity:0-humidity_0',
'unique_id': '123456789ABC-humidity:0-humidity_rh',
'unit_of_measurement': '%',
})
# ---
@@ -6178,7 +6178,7 @@
'suggested_object_id': None,
'supported_features': 0,
'translation_key': None,
'unique_id': '123456789ABC-temperature:0-temperature_0',
'unique_id': '123456789ABC-temperature:0-temperature_tc',
'unit_of_measurement': <UnitOfTemperature.CELSIUS: '°C'>,
})
# ---
@@ -11364,7 +11364,7 @@
'suggested_object_id': None,
'supported_features': 0,
'translation_key': None,
'unique_id': '123456789ABC-temperature:0-temperature_0',
'unique_id': '123456789ABC-temperature:0-temperature_tc',
'unit_of_measurement': <UnitOfTemperature.CELSIUS: '°C'>,
})
# ---

View File

@@ -616,7 +616,7 @@ async def test_rpc_update_entry_sleep_period(
hass,
SENSOR_DOMAIN,
"test_name_temperature",
"temperature:0-temperature_0",
"temperature:0-temperature_tc",
entry,
)
@@ -650,7 +650,7 @@ async def test_rpc_sleeping_device_no_periodic_updates(
hass,
SENSOR_DOMAIN,
"test_name_temperature",
"temperature:0-temperature_0",
"temperature:0-temperature_tc",
entry,
)

View File

@@ -637,9 +637,6 @@ async def test_rpc_sleeping_sensor(
monkeypatch.setitem(mock_rpc_device.status["sys"], "wakeup_period", 1000)
await init_integration(hass, 2, sleep_period=1000)
# Sensor should be created when device is online
assert hass.states.get(entity_id) is None
# Make device online
mock_rpc_device.mock_online()
await hass.async_block_till_done(wait_background_tasks=True)
@@ -669,9 +666,6 @@ async def test_rpc_sleeping_sensor_with_channel_name(
monkeypatch.setitem(mock_rpc_device.status["sys"], "wakeup_period", 1000)
await init_integration(hass, 2, sleep_period=1000)
# Sensor should be created when device is online
assert hass.states.get(entity_id) is None
# Make device online
mock_rpc_device.mock_online()
await hass.async_block_till_done(wait_background_tasks=True)
@@ -700,7 +694,7 @@ async def test_rpc_restored_sleeping_sensor(
hass,
SENSOR_DOMAIN,
"test_name_temperature",
"temperature:0-temperature_0",
"temperature:0-temperature_tc",
entry,
device_id=device.id,
)
@@ -747,7 +741,7 @@ async def test_rpc_restored_sleeping_sensor_no_last_state(
hass,
SENSOR_DOMAIN,
"test_name_temperature",
"temperature:0-temperature_0",
"temperature:0-temperature_tc",
entry,
device_id=device.id,
)
@@ -824,9 +818,6 @@ async def test_rpc_sleeping_update_entity_service(
monkeypatch.setitem(mock_rpc_device.status["sys"], "wakeup_period", 1000)
await init_integration(hass, 2, sleep_period=1000)
# Entity should be created when device is online
assert hass.states.get(entity_id) is None
# Make device online
mock_rpc_device.mock_online()
await hass.async_block_till_done(wait_background_tasks=True)
@@ -846,7 +837,7 @@ async def test_rpc_sleeping_update_entity_service(
assert state.state == "22.9"
assert (entry := entity_registry.async_get(entity_id))
assert entry.unique_id == "123456789ABC-temperature:0-temperature_0"
assert entry.unique_id == "123456789ABC-temperature:0-temperature_tc"
assert (
"Entity sensor.test_name_temperature comes from a sleeping device"
@@ -1219,6 +1210,54 @@ async def test_migrate_unique_id_virtual_components_roles(
assert "Migrating unique_id for sensor.test_name_test_sensor" in caplog.text
@pytest.mark.parametrize(
("old_unique_id", "new_unique_id", "entity_id"),
[
(
"123456789ABC-temperature:0-temperature_0",
"123456789ABC-temperature:0-temperature_tc",
"sensor.test_name_temperature",
),
(
"123456789ABC-humidity:0-humidity_0",
"123456789ABC-humidity:0-humidity_rh",
"sensor.test_name_humidity",
),
],
)
@pytest.mark.usefixtures("disable_async_remove_shelly_rpc_entities")
async def test_migrate_unique_id_rpc_sensor_description_key_rename(
hass: HomeAssistant,
mock_rpc_device: Mock,
entity_registry: EntityRegistry,
caplog: pytest.LogCaptureFixture,
old_unique_id: str,
new_unique_id: str,
entity_id: str,
) -> None:
"""Test migration of RPC sensor unique_id after description key rename."""
entry = await init_integration(hass, 2, skip_setup=True)
entity = entity_registry.async_get_or_create(
suggested_object_id=entity_id.split(".")[1],
disabled_by=None,
domain=SENSOR_DOMAIN,
platform=DOMAIN,
unique_id=old_unique_id,
config_entry=entry,
)
assert entity.unique_id == old_unique_id
await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done()
entity_entry = entity_registry.async_get(entity_id)
assert entity_entry
assert entity_entry.unique_id == new_unique_id
assert f"Migrating unique_id for {entity_id} entity" in caplog.text
@pytest.mark.usefixtures("disable_async_remove_shelly_rpc_entities")
async def test_rpc_remove_text_virtual_sensor_when_mode_field(
hass: HomeAssistant,

View File

@@ -100,7 +100,7 @@
},
"robotCleanerTurboMode": {
"robotCleanerTurboMode": {
"value": "off",
"value": "on",
"timestamp": "2026-02-27T10:49:04.309Z"
}
},

View File

@@ -11016,7 +11016,7 @@
'last_changed': <ANY>,
'last_reported': <ANY>,
'last_updated': <ANY>,
'state': 'off',
'state': 'on',
})
# ---
# name: test_all_entities[da_rvc_normal_000001][sensor.robot_vacuum_battery-entry]

View File

@@ -1028,6 +1028,55 @@
'state': 'on',
})
# ---
# name: test_all_entities[da_rvc_map_01011][switch.robot_vacuum_sound_detection-entry]
EntityRegistryEntrySnapshot({
'aliases': set({
}),
'area_id': None,
'capabilities': None,
'config_entry_id': <ANY>,
'config_subentry_id': <ANY>,
'device_class': None,
'device_id': <ANY>,
'disabled_by': None,
'domain': 'switch',
'entity_category': <EntityCategory.CONFIG: 'config'>,
'entity_id': 'switch.robot_vacuum_sound_detection',
'has_entity_name': True,
'hidden_by': None,
'icon': None,
'id': <ANY>,
'labels': set({
}),
'name': None,
'object_id_base': 'Sound detection',
'options': dict({
}),
'original_device_class': None,
'original_icon': None,
'original_name': 'Sound detection',
'platform': 'smartthings',
'previous_unique_id': None,
'suggested_object_id': None,
'supported_features': 0,
'translation_key': 'sound_detection',
'unique_id': '01b28624-5907-c8bc-0325-8ad23f03a637_main_soundDetection_soundDetectionState_soundDetectionState',
'unit_of_measurement': None,
})
# ---
# name: test_all_entities[da_rvc_map_01011][switch.robot_vacuum_sound_detection-state]
StateSnapshot({
'attributes': ReadOnlyDict({
'friendly_name': 'Robot Vacuum Sound detection',
}),
'context': <ANY>,
'entity_id': 'switch.robot_vacuum_sound_detection',
'last_changed': <ANY>,
'last_reported': <ANY>,
'last_updated': <ANY>,
'state': 'off',
})
# ---
# name: test_all_entities[da_rvc_normal_000001][switch.robot_vacuum-entry]
EntityRegistryEntrySnapshot({
'aliases': set({

View File

@@ -4,7 +4,14 @@
'aliases': set({
}),
'area_id': None,
'capabilities': None,
'capabilities': dict({
'fan_speed_list': list([
'normal',
'maximum',
'smart',
'quiet',
]),
}),
'config_entry_id': <ANY>,
'config_subentry_id': <ANY>,
'device_class': None,
@@ -29,8 +36,8 @@
'platform': 'smartthings',
'previous_unique_id': None,
'suggested_object_id': None,
'supported_features': <VacuumEntityFeature: 12308>,
'translation_key': None,
'supported_features': <VacuumEntityFeature: 12340>,
'translation_key': 'vacuum',
'unique_id': '01b28624-5907-c8bc-0325-8ad23f03a637_main',
'unit_of_measurement': None,
})
@@ -38,8 +45,15 @@
# name: test_all_entities[da_rvc_map_01011][vacuum.robot_vacuum-state]
StateSnapshot({
'attributes': ReadOnlyDict({
'fan_speed': 'maximum',
'fan_speed_list': list([
'normal',
'maximum',
'smart',
'quiet',
]),
'friendly_name': 'Robot Vacuum',
'supported_features': <VacuumEntityFeature: 12308>,
'supported_features': <VacuumEntityFeature: 12340>,
}),
'context': <ANY>,
'entity_id': 'vacuum.robot_vacuum',
@@ -54,7 +68,14 @@
'aliases': set({
}),
'area_id': None,
'capabilities': None,
'capabilities': dict({
'fan_speed_list': list([
'normal',
'maximum',
'smart',
'quiet',
]),
}),
'config_entry_id': <ANY>,
'config_subentry_id': <ANY>,
'device_class': None,
@@ -79,8 +100,8 @@
'platform': 'smartthings',
'previous_unique_id': None,
'suggested_object_id': None,
'supported_features': <VacuumEntityFeature: 12308>,
'translation_key': None,
'supported_features': <VacuumEntityFeature: 12340>,
'translation_key': 'vacuum',
'unique_id': '3442dfc6-17c0-a65f-dae0-4c6e01786f44_main',
'unit_of_measurement': None,
})
@@ -88,8 +109,15 @@
# name: test_all_entities[da_rvc_normal_000001][vacuum.robot_vacuum-state]
StateSnapshot({
'attributes': ReadOnlyDict({
'fan_speed': 'smart',
'fan_speed_list': list([
'normal',
'maximum',
'smart',
'quiet',
]),
'friendly_name': 'Robot vacuum',
'supported_features': <VacuumEntityFeature: 12308>,
'supported_features': <VacuumEntityFeature: 12340>,
}),
'context': <ANY>,
'entity_id': 'vacuum.robot_vacuum',

View File

@@ -9,9 +9,11 @@ from syrupy.assertion import SnapshotAssertion
from homeassistant.components.smartthings import MAIN
from homeassistant.components.vacuum import (
ATTR_FAN_SPEED,
DOMAIN as VACUUM_DOMAIN,
SERVICE_PAUSE,
SERVICE_RETURN_TO_BASE,
SERVICE_SET_FAN_SPEED,
SERVICE_START,
VacuumActivity,
)
@@ -131,3 +133,52 @@ async def test_availability_at_start(
"""Test unavailable at boot."""
await setup_integration(hass, mock_config_entry)
assert hass.states.get("vacuum.robot_vacuum").state == STATE_UNAVAILABLE
@pytest.mark.parametrize("device_fixture", ["da_rvc_map_01011"])
async def test_fan_speed_update(
hass: HomeAssistant,
devices: AsyncMock,
mock_config_entry: MockConfigEntry,
) -> None:
"""Test fan speed state update."""
await setup_integration(hass, mock_config_entry)
assert (
hass.states.get("vacuum.robot_vacuum").attributes[ATTR_FAN_SPEED] == "maximum"
)
await trigger_update(
hass,
devices,
"01b28624-5907-c8bc-0325-8ad23f03a637",
Capability.ROBOT_CLEANER_TURBO_MODE,
Attribute.ROBOT_CLEANER_TURBO_MODE,
"extraSilence",
)
assert hass.states.get("vacuum.robot_vacuum").attributes[ATTR_FAN_SPEED] == "quiet"
@pytest.mark.parametrize("device_fixture", ["da_rvc_map_01011"])
async def test_vacuum_set_fan_speed(
hass: HomeAssistant,
devices: AsyncMock,
mock_config_entry: MockConfigEntry,
) -> None:
"""Test setting fan speed."""
await setup_integration(hass, mock_config_entry)
await hass.services.async_call(
VACUUM_DOMAIN,
SERVICE_SET_FAN_SPEED,
{ATTR_ENTITY_ID: "vacuum.robot_vacuum", ATTR_FAN_SPEED: "normal"},
blocking=True,
)
devices.execute_device_command.assert_called_once_with(
"01b28624-5907-c8bc-0325-8ad23f03a637",
Capability.ROBOT_CLEANER_TURBO_MODE,
Command.SET_ROBOT_CLEANER_TURBO_MODE,
MAIN,
"silence",
)