Add Flume DataUpdateCoordinator class (#77114)

Co-authored-by: J. Nick Koston <nick@koston.org>
This commit is contained in:
Jeef
2022-08-25 10:21:41 -06:00
committed by GitHub
parent ac9ba8f231
commit 462ec4ced3
6 changed files with 61 additions and 55 deletions

View File

@ -393,6 +393,7 @@ omit =
homeassistant/components/flick_electric/sensor.py
homeassistant/components/flock/notify.py
homeassistant/components/flume/__init__.py
homeassistant/components/flume/coordinator.py
homeassistant/components/flume/sensor.py
homeassistant/components/flunearyou/__init__.py
homeassistant/components/flunearyou/repairs.py

View File

@ -349,8 +349,8 @@ build.json @home-assistant/supervisor
/tests/components/flipr/ @cnico
/homeassistant/components/flo/ @dmulcahey
/tests/components/flo/ @dmulcahey
/homeassistant/components/flume/ @ChrisMandich @bdraco
/tests/components/flume/ @ChrisMandich @bdraco
/homeassistant/components/flume/ @ChrisMandich @bdraco @jeeftor
/tests/components/flume/ @ChrisMandich @bdraco @jeeftor
/homeassistant/components/flunearyou/ @bachya
/tests/components/flunearyou/ @bachya
/homeassistant/components/flux_led/ @icemanch @bdraco

View File

@ -1,6 +1,9 @@
"""The Flume component."""
from __future__ import annotations
from datetime import timedelta
import logging
from homeassistant.components.sensor import SensorEntityDescription
from homeassistant.const import Platform
@ -10,6 +13,11 @@ PLATFORMS = [Platform.SENSOR]
DEFAULT_NAME = "Flume Sensor"
NOTIFICATION_SCAN_INTERVAL = timedelta(minutes=1)
DEVICE_SCAN_INTERVAL = timedelta(minutes=1)
_LOGGER = logging.getLogger(__package__)
FLUME_TYPE_SENSOR = 2
FLUME_QUERIES_SENSOR: tuple[SensorEntityDescription, ...] = (
SensorEntityDescription(

View File

@ -0,0 +1,35 @@
"""The IntelliFire integration."""
from __future__ import annotations
from homeassistant.core import HomeAssistant
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import _LOGGER, DEVICE_SCAN_INTERVAL, DOMAIN
class FlumeDeviceDataUpdateCoordinator(DataUpdateCoordinator[None]):
"""Data update coordinator for an individual flume device."""
def __init__(self, hass: HomeAssistant, flume_device) -> None:
"""Initialize the Coordinator."""
super().__init__(
hass,
name=DOMAIN,
logger=_LOGGER,
update_interval=DEVICE_SCAN_INTERVAL,
)
self.flume_device = flume_device
async def _async_update_data(self) -> None:
"""Get the latest data from the Flume."""
_LOGGER.debug("Updating Flume data")
try:
await self.hass.async_add_executor_job(self.flume_device.update_force)
except Exception as ex:
raise UpdateFailed(f"Error communicating with flume API: {ex}") from ex
_LOGGER.debug(
"Flume update details: values=%s query_payload=%s",
self.flume_device.values,
self.flume_device.query_payload,
)

View File

@ -3,7 +3,7 @@
"name": "Flume",
"documentation": "https://www.home-assistant.io/integrations/flume/",
"requirements": ["pyflume==0.6.5"],
"codeowners": ["@ChrisMandich", "@bdraco"],
"codeowners": ["@ChrisMandich", "@bdraco", "@jeeftor"],
"config_flow": true,
"dhcp": [
{

View File

@ -1,6 +1,4 @@
"""Sensor for displaying the number of result from Flume."""
from datetime import timedelta
import logging
from numbers import Number
from pyflume import FlumeData
@ -11,14 +9,11 @@ from homeassistant.const import CONF_NAME
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity import DeviceInfo
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.update_coordinator import (
CoordinatorEntity,
DataUpdateCoordinator,
UpdateFailed,
)
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from .const import (
DEFAULT_NAME,
DEVICE_SCAN_INTERVAL,
DOMAIN,
FLUME_AUTH,
FLUME_DEVICES,
@ -31,11 +26,7 @@ from .const import (
KEY_DEVICE_LOCATION_TIMEZONE,
KEY_DEVICE_TYPE,
)
_LOGGER = logging.getLogger(__name__)
MIN_TIME_BETWEEN_UPDATES = timedelta(seconds=15)
SCAN_INTERVAL = timedelta(minutes=1)
from .coordinator import FlumeDeviceDataUpdateCoordinator
async def async_setup_entry(
@ -62,25 +53,27 @@ async def async_setup_entry(
device_name = device[KEY_DEVICE_LOCATION][KEY_DEVICE_LOCATION_NAME]
device_timezone = device[KEY_DEVICE_LOCATION][KEY_DEVICE_LOCATION_TIMEZONE]
device_friendly_name = f"{name} {device_name}"
flume_device = FlumeData(
flume_auth,
device_id,
device_timezone,
SCAN_INTERVAL,
scan_interval=DEVICE_SCAN_INTERVAL,
update_on_init=False,
http_session=http_session,
)
coordinator = _create_flume_device_coordinator(hass, flume_device)
coordinator = FlumeDeviceDataUpdateCoordinator(
hass=hass, flume_device=flume_device
)
flume_entity_list.extend(
[
FlumeSensor(
coordinator,
flume_device,
device_friendly_name,
device_id,
description,
coordinator=coordinator,
description=description,
name=device_friendly_name,
device_id=device_id,
)
for description in FLUME_QUERIES_SENSOR
]
@ -96,7 +89,6 @@ class FlumeSensor(CoordinatorEntity, SensorEntity):
def __init__(
self,
coordinator,
flume_device,
name,
device_id,
description: SensorEntityDescription,
@ -104,7 +96,6 @@ class FlumeSensor(CoordinatorEntity, SensorEntity):
"""Initialize the Flume sensor."""
super().__init__(coordinator)
self.entity_description = description
self._flume_device = flume_device
self._attr_name = f"{name} {description.name}"
self._attr_unique_id = f"{description.key}_{device_id}"
@ -119,10 +110,10 @@ class FlumeSensor(CoordinatorEntity, SensorEntity):
def native_value(self):
"""Return the state of the sensor."""
sensor_key = self.entity_description.key
if sensor_key not in self._flume_device.values:
if sensor_key not in self.coordinator.flume_device.values:
return None
return _format_state_value(self._flume_device.values[sensor_key])
return _format_state_value(self.coordinator.flume_device.values[sensor_key])
async def async_added_to_hass(self) -> None:
"""Request an update when added."""
@ -134,32 +125,3 @@ class FlumeSensor(CoordinatorEntity, SensorEntity):
def _format_state_value(value):
return round(value, 1) if isinstance(value, Number) else None
def _create_flume_device_coordinator(hass, flume_device):
"""Create a data coordinator for the flume device."""
async def _async_update_data():
"""Get the latest data from the Flume."""
_LOGGER.debug("Updating Flume data")
try:
await hass.async_add_executor_job(flume_device.update_force)
except Exception as ex:
raise UpdateFailed(f"Error communicating with flume API: {ex}") from ex
_LOGGER.debug(
"Flume update details: %s",
{
"values": flume_device.values,
"query_payload": flume_device.query_payload,
},
)
return DataUpdateCoordinator(
hass,
_LOGGER,
# Name of the data. For logging purposes.
name=flume_device.device_id,
update_method=_async_update_data,
# Polling interval. Will only be polled if there are subscribers.
update_interval=SCAN_INTERVAL,
)