Make Tuya find_dpcode a class method (#158028)

This commit is contained in:
epenet
2025-12-07 22:32:14 +01:00
committed by GitHub
parent ca31134caa
commit 9cb9efeb88
3 changed files with 84 additions and 116 deletions
+19 -16
View File
@@ -9,12 +9,15 @@ from tuya_sharing import CustomerDevice
from homeassistant.util.json import json_loads
from .const import DPType
from .type_information import (
BitmapTypeInformation,
BooleanTypeInformation,
EnumTypeInformation,
IntegerTypeInformation,
JsonTypeInformation,
RawTypeInformation,
StringTypeInformation,
TypeInformation,
find_dpcode,
)
@@ -79,7 +82,7 @@ class DPCodeWrapper(DeviceWrapper):
class DPCodeTypeInformationWrapper[T: TypeInformation](DPCodeWrapper):
"""Base DPCode wrapper with Type Information."""
DPTYPE: DPType
_DPTYPE: type[T]
type_information: T
def __init__(self, dpcode: str, type_information: T) -> None:
@@ -102,8 +105,8 @@ class DPCodeTypeInformationWrapper[T: TypeInformation](DPCodeWrapper):
prefer_function: bool = False,
) -> Self | None:
"""Find and return a DPCodeTypeInformationWrapper for the given DP codes."""
if type_information := find_dpcode( # type: ignore[call-overload]
device, dpcodes, dptype=cls.DPTYPE, prefer_function=prefer_function
if type_information := cls._DPTYPE.find_dpcode(
device, dpcodes, prefer_function=prefer_function
):
return cls(
dpcode=type_information.dpcode, type_information=type_information
@@ -111,10 +114,10 @@ class DPCodeTypeInformationWrapper[T: TypeInformation](DPCodeWrapper):
return None
class DPCodeBase64Wrapper(DPCodeTypeInformationWrapper[TypeInformation]):
class DPCodeBase64Wrapper(DPCodeTypeInformationWrapper[RawTypeInformation]):
"""Wrapper to extract information from a RAW/binary value."""
DPTYPE = DPType.RAW
_DPTYPE = RawTypeInformation
def read_bytes(self, device: CustomerDevice) -> bytes | None:
"""Read the device value for the dpcode."""
@@ -125,13 +128,13 @@ class DPCodeBase64Wrapper(DPCodeTypeInformationWrapper[TypeInformation]):
return decoded
class DPCodeBooleanWrapper(DPCodeTypeInformationWrapper[TypeInformation]):
class DPCodeBooleanWrapper(DPCodeTypeInformationWrapper[BooleanTypeInformation]):
"""Simple wrapper for boolean values.
Supports True/False only.
"""
DPTYPE = DPType.BOOLEAN
_DPTYPE = BooleanTypeInformation
def _convert_value_to_raw_value(
self, device: CustomerDevice, value: Any
@@ -144,10 +147,10 @@ class DPCodeBooleanWrapper(DPCodeTypeInformationWrapper[TypeInformation]):
raise ValueError(f"Invalid boolean value `{value}`")
class DPCodeJsonWrapper(DPCodeTypeInformationWrapper[TypeInformation]):
class DPCodeJsonWrapper(DPCodeTypeInformationWrapper[JsonTypeInformation]):
"""Wrapper to extract information from a JSON value."""
DPTYPE = DPType.JSON
_DPTYPE = JsonTypeInformation
def read_json(self, device: CustomerDevice) -> Any | None:
"""Read the device value for the dpcode."""
@@ -159,7 +162,7 @@ class DPCodeJsonWrapper(DPCodeTypeInformationWrapper[TypeInformation]):
class DPCodeEnumWrapper(DPCodeTypeInformationWrapper[EnumTypeInformation]):
"""Simple wrapper for EnumTypeInformation values."""
DPTYPE = DPType.ENUM
_DPTYPE = EnumTypeInformation
def _convert_value_to_raw_value(self, device: CustomerDevice, value: Any) -> Any:
"""Convert a Home Assistant value back to a raw device value."""
@@ -175,7 +178,7 @@ class DPCodeEnumWrapper(DPCodeTypeInformationWrapper[EnumTypeInformation]):
class DPCodeIntegerWrapper(DPCodeTypeInformationWrapper[IntegerTypeInformation]):
"""Simple wrapper for IntegerTypeInformation values."""
DPTYPE = DPType.INTEGER
_DPTYPE = IntegerTypeInformation
def __init__(self, dpcode: str, type_information: IntegerTypeInformation) -> None:
"""Init DPCodeIntegerWrapper."""
@@ -195,10 +198,10 @@ class DPCodeIntegerWrapper(DPCodeTypeInformationWrapper[IntegerTypeInformation])
)
class DPCodeStringWrapper(DPCodeTypeInformationWrapper[TypeInformation]):
class DPCodeStringWrapper(DPCodeTypeInformationWrapper[StringTypeInformation]):
"""Wrapper to extract information from a STRING value."""
DPTYPE = DPType.STRING
_DPTYPE = StringTypeInformation
class DPCodeBitmapBitWrapper(DPCodeWrapper):
@@ -225,7 +228,7 @@ class DPCodeBitmapBitWrapper(DPCodeWrapper):
) -> Self | None:
"""Find and return a DPCodeBitmapBitWrapper for the given DP codes."""
if (
type_information := find_dpcode(device, dpcodes, dptype=DPType.BITMAP)
type_information := BitmapTypeInformation.find_dpcode(device, dpcodes)
) and bitmap_key in type_information.label:
return cls(
type_information.dpcode, type_information.label.index(bitmap_key)
+1 -2
View File
@@ -36,7 +36,6 @@ from .const import (
TUYA_DISCOVERY_NEW,
DeviceCategory,
DPCode,
DPType,
)
from .entity import TuyaEntity
from .models import (
@@ -54,7 +53,7 @@ from .type_information import EnumTypeInformation
class _WindDirectionWrapper(DPCodeTypeInformationWrapper[EnumTypeInformation]):
"""Custom DPCode Wrapper for converting enum to wind direction."""
DPTYPE = DPType.ENUM
_DPTYPE = EnumTypeInformation
_WIND_DIRECTIONS = {
"north": 0.0,
@@ -3,7 +3,7 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Literal, Self, cast, overload
from typing import Any, ClassVar, Self, cast
from tuya_sharing import CustomerDevice
@@ -38,6 +38,7 @@ class TypeInformation[T]:
As provided by the SDK, from `device.function` / `device.status_range`.
"""
_DPTYPE: ClassVar[DPType]
dpcode: str
type_data: str | None = None
@@ -52,19 +53,57 @@ class TypeInformation[T]:
return raw_value
@classmethod
def from_json(cls, dpcode: str, type_data: str) -> Self | None:
def _from_json(cls, dpcode: str, type_data: str) -> Self | None:
"""Load JSON string and return a TypeInformation object."""
return cls(dpcode=dpcode, type_data=type_data)
@classmethod
def find_dpcode(
cls,
device: CustomerDevice,
dpcodes: str | tuple[str, ...] | None,
*,
prefer_function: bool = False,
) -> Self | None:
"""Find type information for a matching DP code available for this device."""
if dpcodes is None:
return None
if not isinstance(dpcodes, tuple):
dpcodes = (dpcodes,)
lookup_tuple = (
(device.function, device.status_range)
if prefer_function
else (device.status_range, device.function)
)
for dpcode in dpcodes:
for device_specs in lookup_tuple:
if (
(current_definition := device_specs.get(dpcode))
and parse_dptype(current_definition.type) is cls._DPTYPE
and (
type_information := cls._from_json(
dpcode=dpcode, type_data=current_definition.values
)
)
):
return type_information
return None
@dataclass(kw_only=True)
class BitmapTypeInformation(TypeInformation[int]):
"""Bitmap type information."""
_DPTYPE = DPType.BITMAP
label: list[str]
@classmethod
def from_json(cls, dpcode: str, type_data: str) -> Self | None:
def _from_json(cls, dpcode: str, type_data: str) -> Self | None:
"""Load JSON string and return a BitmapTypeInformation object."""
if not (parsed := json_loads_object(type_data)):
return None
@@ -79,6 +118,8 @@ class BitmapTypeInformation(TypeInformation[int]):
class BooleanTypeInformation(TypeInformation[bool]):
"""Boolean type information."""
_DPTYPE = DPType.BOOLEAN
def process_raw_value(
self, raw_value: Any | None, device: CustomerDevice
) -> bool | None:
@@ -107,6 +148,8 @@ class BooleanTypeInformation(TypeInformation[bool]):
class EnumTypeInformation(TypeInformation[str]):
"""Enum type information."""
_DPTYPE = DPType.ENUM
range: list[str]
def process_raw_value(
@@ -133,7 +176,7 @@ class EnumTypeInformation(TypeInformation[str]):
return raw_value
@classmethod
def from_json(cls, dpcode: str, type_data: str) -> Self | None:
def _from_json(cls, dpcode: str, type_data: str) -> Self | None:
"""Load JSON string and return an EnumTypeInformation object."""
if not (parsed := json_loads_object(type_data)):
return None
@@ -148,6 +191,8 @@ class EnumTypeInformation(TypeInformation[str]):
class IntegerTypeInformation(TypeInformation[float]):
"""Integer type information."""
_DPTYPE = DPType.INTEGER
min: int
max: int
scale: int
@@ -223,7 +268,7 @@ class IntegerTypeInformation(TypeInformation[float]):
return raw_value / (10**self.scale)
@classmethod
def from_json(cls, dpcode: str, type_data: str) -> Self | None:
def _from_json(cls, dpcode: str, type_data: str) -> Self | None:
"""Load JSON string and return an IntegerTypeInformation object."""
if not (parsed := cast(dict[str, Any] | None, json_loads_object(type_data))):
return None
@@ -239,101 +284,22 @@ class IntegerTypeInformation(TypeInformation[float]):
)
_TYPE_INFORMATION_MAPPINGS: dict[DPType, type[TypeInformation]] = {
DPType.BITMAP: BitmapTypeInformation,
DPType.BOOLEAN: BooleanTypeInformation,
DPType.ENUM: EnumTypeInformation,
DPType.INTEGER: IntegerTypeInformation,
DPType.JSON: TypeInformation,
DPType.RAW: TypeInformation,
DPType.STRING: TypeInformation,
}
@dataclass(kw_only=True)
class JsonTypeInformation(TypeInformation[Any]):
"""Json type information."""
_DPTYPE = DPType.JSON
@overload
def find_dpcode(
device: CustomerDevice,
dpcodes: str | tuple[str, ...] | None,
*,
prefer_function: bool = False,
dptype: Literal[DPType.BITMAP],
) -> BitmapTypeInformation | None: ...
@dataclass(kw_only=True)
class RawTypeInformation(TypeInformation[Any]):
"""Raw type information."""
_DPTYPE = DPType.RAW
@overload
def find_dpcode(
device: CustomerDevice,
dpcodes: str | tuple[str, ...] | None,
*,
prefer_function: bool = False,
dptype: Literal[DPType.BOOLEAN],
) -> BooleanTypeInformation | None: ...
@dataclass(kw_only=True)
class StringTypeInformation(TypeInformation[str]):
"""String type information."""
@overload
def find_dpcode(
device: CustomerDevice,
dpcodes: str | tuple[str, ...] | None,
*,
prefer_function: bool = False,
dptype: Literal[DPType.ENUM],
) -> EnumTypeInformation | None: ...
@overload
def find_dpcode(
device: CustomerDevice,
dpcodes: str | tuple[str, ...] | None,
*,
prefer_function: bool = False,
dptype: Literal[DPType.INTEGER],
) -> IntegerTypeInformation | None: ...
@overload
def find_dpcode(
device: CustomerDevice,
dpcodes: str | tuple[str, ...] | None,
*,
prefer_function: bool = False,
dptype: Literal[DPType.JSON, DPType.RAW],
) -> TypeInformation | None: ...
def find_dpcode(
device: CustomerDevice,
dpcodes: str | tuple[str, ...] | None,
*,
prefer_function: bool = False,
dptype: DPType,
) -> TypeInformation | None:
"""Find type information for a matching DP code available for this device."""
if not (type_information_cls := _TYPE_INFORMATION_MAPPINGS.get(dptype)):
raise NotImplementedError(f"find_dpcode not supported for {dptype}")
if dpcodes is None:
return None
if not isinstance(dpcodes, tuple):
dpcodes = (dpcodes,)
lookup_tuple = (
(device.function, device.status_range)
if prefer_function
else (device.status_range, device.function)
)
for dpcode in dpcodes:
for device_specs in lookup_tuple:
if (
(current_definition := device_specs.get(dpcode))
and parse_dptype(current_definition.type) is dptype
and (
type_information := type_information_cls.from_json(
dpcode=dpcode, type_data=current_definition.values
)
)
):
return type_information
return None
_DPTYPE = DPType.STRING