Enable strict typing for isy994 (#65439)

Co-authored-by: Martin Hjelmare <marhje52@gmail.com>
This commit is contained in:
J. Nick Koston
2022-02-03 10:02:05 -06:00
committed by GitHub
parent 2f0d0998a2
commit 6c38a6b569
18 changed files with 351 additions and 279 deletions

View File

@ -1,7 +1,8 @@
"""Support for ISY994 binary sensors."""
from __future__ import annotations
from datetime import timedelta
from datetime import datetime, timedelta
from typing import Any
from pyisy.constants import (
CMD_OFF,
@ -10,6 +11,7 @@ from pyisy.constants import (
PROTO_INSTEON,
PROTO_ZWAVE,
)
from pyisy.helpers import NodeProperty
from pyisy.nodes import Group, Node
from homeassistant.components.binary_sensor import (
@ -18,7 +20,7 @@ from homeassistant.components.binary_sensor import (
BinarySensorEntity,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant, callback
from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.event import async_track_point_in_utc_time
from homeassistant.util import dt as dt_util
@ -55,12 +57,25 @@ async def async_setup_entry(
hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback
) -> None:
"""Set up the ISY994 binary sensor platform."""
devices = []
devices_by_address = {}
child_nodes = []
entities: list[
ISYInsteonBinarySensorEntity
| ISYBinarySensorEntity
| ISYBinarySensorHeartbeat
| ISYBinarySensorProgramEntity
] = []
entities_by_address: dict[
str,
ISYInsteonBinarySensorEntity
| ISYBinarySensorEntity
| ISYBinarySensorHeartbeat
| ISYBinarySensorProgramEntity,
] = {}
child_nodes: list[tuple[Node, str | None, str | None]] = []
entity: ISYInsteonBinarySensorEntity | ISYBinarySensorEntity | ISYBinarySensorHeartbeat | ISYBinarySensorProgramEntity
hass_isy_data = hass.data[ISY994_DOMAIN][entry.entry_id]
for node in hass_isy_data[ISY994_NODES][BINARY_SENSOR]:
assert isinstance(node, Node)
device_class, device_type = _detect_device_type_and_class(node)
if node.protocol == PROTO_INSTEON:
if node.parent_node is not None:
@ -68,38 +83,38 @@ async def async_setup_entry(
# nodes have been processed
child_nodes.append((node, device_class, device_type))
continue
device = ISYInsteonBinarySensorEntity(node, device_class)
entity = ISYInsteonBinarySensorEntity(node, device_class)
else:
device = ISYBinarySensorEntity(node, device_class)
devices.append(device)
devices_by_address[node.address] = device
entity = ISYBinarySensorEntity(node, device_class)
entities.append(entity)
entities_by_address[node.address] = entity
# Handle some special child node cases for Insteon Devices
for (node, device_class, device_type) in child_nodes:
subnode_id = int(node.address.split(" ")[-1], 16)
# Handle Insteon Thermostats
if device_type.startswith(TYPE_CATEGORY_CLIMATE):
if device_type is not None and device_type.startswith(TYPE_CATEGORY_CLIMATE):
if subnode_id == SUBNODE_CLIMATE_COOL:
# Subnode 2 is the "Cool Control" sensor
# It never reports its state until first use is
# detected after an ISY Restart, so we assume it's off.
# As soon as the ISY Event Stream connects if it has a
# valid state, it will be set.
device = ISYInsteonBinarySensorEntity(
entity = ISYInsteonBinarySensorEntity(
node, BinarySensorDeviceClass.COLD, False
)
devices.append(device)
entities.append(entity)
elif subnode_id == SUBNODE_CLIMATE_HEAT:
# Subnode 3 is the "Heat Control" sensor
device = ISYInsteonBinarySensorEntity(
entity = ISYInsteonBinarySensorEntity(
node, BinarySensorDeviceClass.HEAT, False
)
devices.append(device)
entities.append(entity)
continue
if device_class in DEVICE_PARENT_REQUIRED:
parent_device = devices_by_address.get(node.parent_node.address)
if not parent_device:
parent_entity = entities_by_address.get(node.parent_node.address)
if not parent_entity:
_LOGGER.error(
"Node %s has a parent node %s, but no device "
"was created for the parent. Skipping",
@ -115,13 +130,15 @@ async def async_setup_entry(
# These sensors use an optional "negative" subnode 2 to
# snag all state changes
if subnode_id == SUBNODE_NEGATIVE:
parent_device.add_negative_node(node)
assert isinstance(parent_entity, ISYInsteonBinarySensorEntity)
parent_entity.add_negative_node(node)
elif subnode_id == SUBNODE_HEARTBEAT:
assert isinstance(parent_entity, ISYInsteonBinarySensorEntity)
# Subnode 4 is the heartbeat node, which we will
# represent as a separate binary_sensor
device = ISYBinarySensorHeartbeat(node, parent_device)
parent_device.add_heartbeat_device(device)
devices.append(device)
entity = ISYBinarySensorHeartbeat(node, parent_entity)
parent_entity.add_heartbeat_device(entity)
entities.append(entity)
continue
if (
device_class == BinarySensorDeviceClass.MOTION
@ -133,48 +150,49 @@ async def async_setup_entry(
# the initial state is forced "OFF"/"NORMAL" if the
# parent device has a valid state. This is corrected
# upon connection to the ISY event stream if subnode has a valid state.
initial_state = None if parent_device.state is None else False
assert isinstance(parent_entity, ISYInsteonBinarySensorEntity)
initial_state = None if parent_entity.state is None else False
if subnode_id == SUBNODE_DUSK_DAWN:
# Subnode 2 is the Dusk/Dawn sensor
device = ISYInsteonBinarySensorEntity(
entity = ISYInsteonBinarySensorEntity(
node, BinarySensorDeviceClass.LIGHT
)
devices.append(device)
entities.append(entity)
continue
if subnode_id == SUBNODE_LOW_BATTERY:
# Subnode 3 is the low battery node
device = ISYInsteonBinarySensorEntity(
entity = ISYInsteonBinarySensorEntity(
node, BinarySensorDeviceClass.BATTERY, initial_state
)
devices.append(device)
entities.append(entity)
continue
if subnode_id in SUBNODE_TAMPER:
# Tamper Sub-node for MS II. Sometimes reported as "A" sometimes
# reported as "10", which translate from Hex to 10 and 16 resp.
device = ISYInsteonBinarySensorEntity(
entity = ISYInsteonBinarySensorEntity(
node, BinarySensorDeviceClass.PROBLEM, initial_state
)
devices.append(device)
entities.append(entity)
continue
if subnode_id in SUBNODE_MOTION_DISABLED:
# Motion Disabled Sub-node for MS II ("D" or "13")
device = ISYInsteonBinarySensorEntity(node)
devices.append(device)
entity = ISYInsteonBinarySensorEntity(node)
entities.append(entity)
continue
# We don't yet have any special logic for other sensor
# types, so add the nodes as individual devices
device = ISYBinarySensorEntity(node, device_class)
devices.append(device)
entity = ISYBinarySensorEntity(node, device_class)
entities.append(entity)
for name, status, _ in hass_isy_data[ISY994_PROGRAMS][BINARY_SENSOR]:
devices.append(ISYBinarySensorProgramEntity(name, status))
entities.append(ISYBinarySensorProgramEntity(name, status))
await migrate_old_unique_ids(hass, BINARY_SENSOR, devices)
async_add_entities(devices)
await migrate_old_unique_ids(hass, BINARY_SENSOR, entities)
async_add_entities(entities)
def _detect_device_type_and_class(node: Group | Node) -> (str, str):
def _detect_device_type_and_class(node: Group | Node) -> tuple[str | None, str | None]:
try:
device_type = node.type
except AttributeError:
@ -199,20 +217,25 @@ def _detect_device_type_and_class(node: Group | Node) -> (str, str):
class ISYBinarySensorEntity(ISYNodeEntity, BinarySensorEntity):
"""Representation of a basic ISY994 binary sensor device."""
def __init__(self, node, force_device_class=None, unknown_state=None) -> None:
def __init__(
self,
node: Node,
force_device_class: str | None = None,
unknown_state: bool | None = None,
) -> None:
"""Initialize the ISY994 binary sensor device."""
super().__init__(node)
self._device_class = force_device_class
@property
def is_on(self) -> bool:
def is_on(self) -> bool | None:
"""Get whether the ISY994 binary sensor device is on."""
if self._node.status == ISY_VALUE_UNKNOWN:
return None
return bool(self._node.status)
@property
def device_class(self) -> str:
def device_class(self) -> str | None:
"""Return the class of this device.
This was discovered by parsing the device type code during init
@ -229,11 +252,16 @@ class ISYInsteonBinarySensorEntity(ISYBinarySensorEntity):
Assistant entity and handles both ways that ISY binary sensors can work.
"""
def __init__(self, node, force_device_class=None, unknown_state=None) -> None:
def __init__(
self,
node: Node,
force_device_class: str | None = None,
unknown_state: bool | None = None,
) -> None:
"""Initialize the ISY994 binary sensor device."""
super().__init__(node, force_device_class)
self._negative_node = None
self._heartbeat_device = None
self._negative_node: Node | None = None
self._heartbeat_device: ISYBinarySensorHeartbeat | None = None
if self._node.status == ISY_VALUE_UNKNOWN:
self._computed_state = unknown_state
self._status_was_unknown = True
@ -252,21 +280,21 @@ class ISYInsteonBinarySensorEntity(ISYBinarySensorEntity):
self._async_negative_node_control_handler
)
def add_heartbeat_device(self, device) -> None:
def add_heartbeat_device(self, entity: ISYBinarySensorHeartbeat | None) -> None:
"""Register a heartbeat device for this sensor.
The heartbeat node beats on its own, but we can gain a little
reliability by considering any node activity for this sensor
to be a heartbeat as well.
"""
self._heartbeat_device = device
self._heartbeat_device = entity
def _async_heartbeat(self) -> None:
"""Send a heartbeat to our heartbeat device, if we have one."""
if self._heartbeat_device is not None:
self._heartbeat_device.async_heartbeat()
def add_negative_node(self, child) -> None:
def add_negative_node(self, child: Node) -> None:
"""Add a negative node to this binary sensor device.
The negative node is a node that can receive the 'off' events
@ -287,7 +315,7 @@ class ISYInsteonBinarySensorEntity(ISYBinarySensorEntity):
self._computed_state = None
@callback
def _async_negative_node_control_handler(self, event: object) -> None:
def _async_negative_node_control_handler(self, event: NodeProperty) -> None:
"""Handle an "On" control event from the "negative" node."""
if event.control == CMD_ON:
_LOGGER.debug(
@ -299,7 +327,7 @@ class ISYInsteonBinarySensorEntity(ISYBinarySensorEntity):
self._async_heartbeat()
@callback
def _async_positive_node_control_handler(self, event: object) -> None:
def _async_positive_node_control_handler(self, event: NodeProperty) -> None:
"""Handle On and Off control event coming from the primary node.
Depending on device configuration, sometimes only On events
@ -324,7 +352,7 @@ class ISYInsteonBinarySensorEntity(ISYBinarySensorEntity):
self._async_heartbeat()
@callback
def async_on_update(self, event: object) -> None:
def async_on_update(self, event: NodeProperty) -> None:
"""Primary node status updates.
We MOSTLY ignore these updates, as we listen directly to the Control
@ -341,7 +369,7 @@ class ISYInsteonBinarySensorEntity(ISYBinarySensorEntity):
self._async_heartbeat()
@property
def is_on(self) -> bool:
def is_on(self) -> bool | None:
"""Get whether the ISY994 binary sensor device is on.
Insteon leak sensors set their primary node to On when the state is
@ -361,7 +389,14 @@ class ISYInsteonBinarySensorEntity(ISYBinarySensorEntity):
class ISYBinarySensorHeartbeat(ISYNodeEntity, BinarySensorEntity):
"""Representation of the battery state of an ISY994 sensor."""
def __init__(self, node, parent_device) -> None:
def __init__(
self,
node: Node,
parent_device: ISYInsteonBinarySensorEntity
| ISYBinarySensorEntity
| ISYBinarySensorHeartbeat
| ISYBinarySensorProgramEntity,
) -> None:
"""Initialize the ISY994 binary sensor device.
Computed state is set to UNKNOWN unless the ISY provided a valid
@ -372,8 +407,8 @@ class ISYBinarySensorHeartbeat(ISYNodeEntity, BinarySensorEntity):
"""
super().__init__(node)
self._parent_device = parent_device
self._heartbeat_timer = None
self._computed_state = None
self._heartbeat_timer: CALLBACK_TYPE | None = None
self._computed_state: bool | None = None
if self.state is None:
self._computed_state = False
@ -386,7 +421,7 @@ class ISYBinarySensorHeartbeat(ISYNodeEntity, BinarySensorEntity):
# Start the timer on bootup, so we can change from UNKNOWN to OFF
self._restart_timer()
def _heartbeat_node_control_handler(self, event: object) -> None:
def _heartbeat_node_control_handler(self, event: NodeProperty) -> None:
"""Update the heartbeat timestamp when any ON/OFF event is sent.
The ISY uses both DON and DOF commands (alternating) for a heartbeat.
@ -395,7 +430,7 @@ class ISYBinarySensorHeartbeat(ISYNodeEntity, BinarySensorEntity):
self.async_heartbeat()
@callback
def async_heartbeat(self):
def async_heartbeat(self) -> None:
"""Mark the device as online, and restart the 25 hour timer.
This gets called when the heartbeat node beats, but also when the
@ -407,17 +442,14 @@ class ISYBinarySensorHeartbeat(ISYNodeEntity, BinarySensorEntity):
self._restart_timer()
self.async_write_ha_state()
def _restart_timer(self):
def _restart_timer(self) -> None:
"""Restart the 25 hour timer."""
try:
if self._heartbeat_timer is not None:
self._heartbeat_timer()
self._heartbeat_timer = None
except TypeError:
# No heartbeat timer is active
pass
@callback
def timer_elapsed(now) -> None:
def timer_elapsed(now: datetime) -> None:
"""Heartbeat missed; set state to ON to indicate dead battery."""
self._computed_state = True
self._heartbeat_timer = None
@ -457,7 +489,7 @@ class ISYBinarySensorHeartbeat(ISYNodeEntity, BinarySensorEntity):
return BinarySensorDeviceClass.BATTERY
@property
def extra_state_attributes(self):
def extra_state_attributes(self) -> dict[str, Any]:
"""Get the state attributes for the device."""
attr = super().extra_state_attributes
attr["parent_entity_id"] = self._parent_device.entity_id