Compare commits

...

1 Commits

Author SHA1 Message Date
Daniel Hjelseth Høyer
81cd811150 Refactor Tibber electricity price sensor to use coordinator pattern
Signed-off-by: Daniel Hjelseth Høyer <github@dahoiv.net>
2026-03-02 21:45:01 +01:00
2 changed files with 156 additions and 60 deletions

View File

@@ -2,9 +2,9 @@
from __future__ import annotations
from datetime import timedelta
from datetime import datetime, timedelta
import logging
from typing import TYPE_CHECKING, cast
from typing import TYPE_CHECKING, TypedDict, cast
from aiohttp.client_exceptions import ClientError
import tibber
@@ -38,6 +38,39 @@ FIVE_YEARS = 5 * 365 * 24
_LOGGER = logging.getLogger(__name__)
class TibberHomeData(TypedDict, total=False):
"""Data for a Tibber home used by the price sensor."""
current_price: float | None
last_updated: datetime | None
intraday_price_ranking: float | None
attributes: dict[str, float]
price_unit: str
app_nickname: str | None
grid_company: str | None
estimated_annual_consumption: int | None
def _build_home_data(home: tibber.TibberHome) -> TibberHomeData:
"""Build TibberHomeData from a TibberHome for the price sensor."""
current_price, last_updated, price_rank = home.current_price_data()
attributes = home.current_attributes()
result: TibberHomeData = {
"current_price": current_price,
"last_updated": last_updated,
"intraday_price_ranking": price_rank,
"attributes": attributes,
"price_unit": home.price_unit,
}
data = home.info["viewer"]["home"]
result["app_nickname"] = data.get("appNickname")
result["grid_company"] = data.get("meteringPointData")["gridCompany"]
result["estimated_annual_consumption"] = data["meteringPointData"][
"estimatedAnnualConsumption"
]
return result
class TibberDataCoordinator(DataUpdateCoordinator[None]):
"""Handle Tibber data and insert statistics."""
@@ -57,13 +90,16 @@ class TibberDataCoordinator(DataUpdateCoordinator[None]):
name=f"Tibber {tibber_connection.name}",
update_interval=timedelta(minutes=20),
)
self._tibber_connection = tibber_connection
async def _async_update_data(self) -> None:
"""Update data via API."""
tibber_connection = await self.config_entry.runtime_data.async_get_client(
self.hass
)
try:
await self._tibber_connection.fetch_consumption_data_active_homes()
await self._tibber_connection.fetch_production_data_active_homes()
await tibber_connection.fetch_consumption_data_active_homes()
await tibber_connection.fetch_production_data_active_homes()
await self._insert_statistics()
except tibber.RetryableHttpExceptionError as err:
raise UpdateFailed(f"Error communicating with API ({err.status})") from err
@@ -75,7 +111,10 @@ class TibberDataCoordinator(DataUpdateCoordinator[None]):
async def _insert_statistics(self) -> None:
"""Insert Tibber statistics."""
for home in self._tibber_connection.get_homes():
tibber_connection = await self.config_entry.runtime_data.async_get_client(
self.hass
)
for home in tibber_connection.get_homes():
sensors: list[tuple[str, bool, str | None, str]] = []
if home.hourly_consumption_data:
sensors.append(
@@ -194,6 +233,74 @@ class TibberDataCoordinator(DataUpdateCoordinator[None]):
async_add_external_statistics(self.hass, metadata, statistics)
class TibberPriceCoordinator(DataUpdateCoordinator[dict[str, TibberHomeData]]):
"""Handle Tibber price data and insert statistics."""
config_entry: TibberConfigEntry
def __init__(
self,
hass: HomeAssistant,
config_entry: TibberConfigEntry,
) -> None:
"""Initialize the price coordinator."""
super().__init__(
hass,
_LOGGER,
config_entry=config_entry,
name=f"{DOMAIN} price",
update_interval=timedelta(minutes=1),
)
def _seconds_until_next_15_minute(self) -> float:
"""Return seconds until the next 15-minute boundary (0, 15, 30, 45) in UTC."""
now = dt_util.utcnow()
next_minute = ((now.minute // 15) + 1) * 15
if next_minute >= 60:
next_run = now.replace(minute=0, second=0, microsecond=0) + timedelta(
hours=1
)
else:
next_run = now.replace(
minute=next_minute, second=0, microsecond=0, tzinfo=dt_util.UTC
)
return (next_run - now).total_seconds()
async def _async_update_data(self) -> dict[str, TibberHomeData]:
"""Update data via API and return per-home data for sensors."""
tibber_connection = await self.config_entry.runtime_data.async_get_client(
self.hass
)
try:
await tibber_connection.fetch_consumption_data_active_homes()
await tibber_connection.fetch_production_data_active_homes()
now = dt_util.now()
for home in tibber_connection.get_homes(only_active=True):
update_needed = False
last_data_timestamp = home.last_data_timestamp
if last_data_timestamp is None:
update_needed = True
else:
remaining_seconds = (last_data_timestamp - now).total_seconds()
if remaining_seconds < 11 * 3600:
update_needed = True
if update_needed:
await home.update_info_and_price_info()
except tibber.RetryableHttpExceptionError as err:
raise UpdateFailed(f"Error communicating with API ({err.status})") from err
except tibber.FatalHttpExceptionError as err:
raise UpdateFailed(f"Error communicating with API ({err.status})") from err
result: dict[str, TibberHomeData] = {}
for home in tibber_connection.get_homes(only_active=True):
result[home.home_id] = _build_home_data(home)
self.update_interval = timedelta(seconds=self._seconds_until_next_15_minute())
return result
class TibberDataAPICoordinator(DataUpdateCoordinator[dict[str, TibberDevice]]):
"""Fetch and cache Tibber Data API device capabilities."""

View File

@@ -3,10 +3,8 @@
from __future__ import annotations
from collections.abc import Callable
import datetime
from datetime import timedelta
import logging
from random import randrange
from typing import Any
import aiohttp
@@ -42,18 +40,20 @@ from homeassistant.helpers.update_coordinator import (
CoordinatorEntity,
DataUpdateCoordinator,
)
from homeassistant.util import Throttle, dt as dt_util
from homeassistant.util import dt as dt_util
from .const import DOMAIN, MANUFACTURER, TibberConfigEntry
from .coordinator import TibberDataAPICoordinator, TibberDataCoordinator
from .coordinator import (
TibberDataAPICoordinator,
TibberDataCoordinator,
TibberPriceCoordinator,
)
_LOGGER = logging.getLogger(__name__)
ICON = "mdi:currency-usd"
SCAN_INTERVAL = timedelta(minutes=1)
MIN_TIME_BETWEEN_UPDATES = timedelta(minutes=5)
PARALLEL_UPDATES = 0
TWENTY_MINUTES = 20 * 60
RT_SENSORS_UNIQUE_ID_MIGRATION = {
"accumulated_consumption_last_hour": "accumulated consumption current hour",
@@ -610,6 +610,7 @@ async def _async_setup_graphql_sensors(
entity_registry = er.async_get(hass)
coordinator: TibberDataCoordinator | None = None
price_coordinator: TibberPriceCoordinator | None = None
entities: list[TibberSensor] = []
for home in tibber_connection.get_homes(only_active=False):
try:
@@ -626,7 +627,9 @@ async def _async_setup_graphql_sensors(
raise PlatformNotReady from err
if home.has_active_subscription:
entities.append(TibberSensorElPrice(home))
if price_coordinator is None:
price_coordinator = TibberPriceCoordinator(hass, entry)
entities.append(TibberSensorElPrice(price_coordinator, home))
if coordinator is None:
coordinator = TibberDataCoordinator(hass, entry, tibber_connection)
entities.extend(
@@ -737,18 +740,19 @@ class TibberSensor(SensorEntity):
return device_info
class TibberSensorElPrice(TibberSensor):
class TibberSensorElPrice(TibberSensor, CoordinatorEntity[TibberPriceCoordinator]):
"""Representation of a Tibber sensor for el price."""
_attr_state_class = SensorStateClass.MEASUREMENT
_attr_translation_key = "electricity_price"
def __init__(self, tibber_home: TibberHome) -> None:
def __init__(
self,
coordinator: TibberPriceCoordinator,
tibber_home: TibberHome,
) -> None:
"""Initialize the sensor."""
super().__init__(tibber_home=tibber_home)
self._last_updated: datetime.datetime | None = None
self._spread_load_constant = randrange(TWENTY_MINUTES)
super().__init__(coordinator=coordinator, tibber_home=tibber_home)
self._attr_available = False
self._attr_extra_state_attributes = {
"app_nickname": None,
@@ -768,51 +772,36 @@ class TibberSensorElPrice(TibberSensor):
self._device_name = self._home_name
async def async_update(self) -> None:
"""Get the latest data and updates the states."""
now = dt_util.now()
if (
not self._tibber_home.last_data_timestamp
or (self._tibber_home.last_data_timestamp - now).total_seconds()
< 10 * 3600 - self._spread_load_constant
or not self.available
):
_LOGGER.debug("Asking for new data")
await self._fetch_data()
elif (
self._tibber_home.price_total
and self._last_updated
and self._last_updated.hour == now.hour
and now - self._last_updated < timedelta(minutes=15)
and self._tibber_home.last_data_timestamp
):
@callback
def _handle_coordinator_update(self) -> None:
"""Handle updated data from the coordinator."""
data = self.coordinator.data
if not data:
self._attr_available = False
return
res = self._tibber_home.current_price_data()
self._attr_native_value, self._last_updated, price_rank = res
self._attr_extra_state_attributes["intraday_price_ranking"] = price_rank
home_data = data.get(self._tibber_home.home_id)
if home_data is None:
self._attr_available = False
return
attrs = self._tibber_home.current_attributes()
self._attr_extra_state_attributes.update(attrs)
self._attr_native_value = home_data.get("current_price")
self._attr_extra_state_attributes["intraday_price_ranking"] = home_data.get(
"intraday_price_ranking"
)
if attrs := home_data.get("attributes"):
self._attr_extra_state_attributes.update(attrs)
self._attr_native_unit_of_measurement = home_data.get("price_unit", "")
if (app_nickname := home_data.get("app_nickname")) is not None:
self._attr_extra_state_attributes["app_nickname"] = app_nickname
if (grid_company := home_data.get("grid_company")) is not None:
self._attr_extra_state_attributes["grid_company"] = grid_company
if (est_annual := home_data.get("estimated_annual_consumption")) is not None:
self._attr_extra_state_attributes["estimated_annual_consumption"] = (
est_annual
)
self._attr_available = self._attr_native_value is not None
self._attr_native_unit_of_measurement = self._tibber_home.price_unit
@Throttle(MIN_TIME_BETWEEN_UPDATES)
async def _fetch_data(self) -> None:
_LOGGER.debug("Fetching data")
try:
await self._tibber_home.update_info_and_price_info()
except TimeoutError, aiohttp.ClientError:
return
data = self._tibber_home.info["viewer"]["home"]
self._attr_extra_state_attributes["app_nickname"] = data["appNickname"]
self._attr_extra_state_attributes["grid_company"] = data["meteringPointData"][
"gridCompany"
]
self._attr_extra_state_attributes["estimated_annual_consumption"] = data[
"meteringPointData"
]["estimatedAnnualConsumption"]
self.async_write_ha_state()
class TibberDataSensor(TibberSensor, CoordinatorEntity[TibberDataCoordinator]):