Compare commits

...

5 Commits

Author SHA1 Message Date
Daniel Hjelseth Høyer
15b4d07c13 fix test
Signed-off-by: Daniel Hjelseth Høyer <github@dahoiv.net>
2026-03-04 07:09:29 +01:00
Daniel Hjelseth Høyer
6aeb95c25c minor
Signed-off-by: Daniel Hjelseth Høyer <github@dahoiv.net>
2026-03-03 20:37:30 +01:00
Daniel Hjelseth Høyer
b502409a4c Refactor Tibber electricity price sensor to use coordinator pattern
Signed-off-by: Daniel Hjelseth Høyer <github@dahoiv.net>
2026-03-03 08:48:50 +01:00
Daniel Hjelseth Høyer
b925045cc4 Refactor Tibber electricity price sensor to use coordinator pattern
Signed-off-by: Daniel Hjelseth Høyer <github@dahoiv.net>
2026-03-03 07:09:59 +01:00
Daniel Hjelseth Høyer
81cd811150 Refactor Tibber electricity price sensor to use coordinator pattern
Signed-off-by: Daniel Hjelseth Høyer <github@dahoiv.net>
2026-03-02 21:45:01 +01:00
3 changed files with 179 additions and 60 deletions

View File

@@ -2,9 +2,9 @@
from __future__ import annotations
from datetime import timedelta
from datetime import datetime, timedelta
import logging
from typing import TYPE_CHECKING, cast
from typing import TYPE_CHECKING, TypedDict, cast
from aiohttp.client_exceptions import ClientError
import tibber
@@ -38,6 +38,58 @@ FIVE_YEARS = 5 * 365 * 24
_LOGGER = logging.getLogger(__name__)
class TibberHomeData(TypedDict):
"""Data for a Tibber home used by the price sensor."""
currency: str
price_unit: str
current_price: float | None
current_price_time: datetime | None
intraday_price_ranking: float | None
max_price: float
avg_price: float
min_price: float
off_peak_1: float
peak: float
off_peak_2: float
month_cost: float | None
peak_hour: float | None
peak_hour_time: datetime | None
month_cons: float | None
app_nickname: str | None
grid_company: str | None
estimated_annual_consumption: int | None
def _build_home_data(home: tibber.TibberHome) -> TibberHomeData:
"""Build TibberHomeData from a TibberHome for the price sensor."""
current_price, last_updated, price_rank = home.current_price_data()
attributes = home.current_attributes()
result: TibberHomeData = {
"currency": home.currency,
"price_unit": home.price_unit,
"current_price": current_price,
"current_price_time": last_updated,
"intraday_price_ranking": price_rank,
"max_price": attributes["max_price"],
"avg_price": attributes["avg_price"],
"min_price": attributes["min_price"],
"off_peak_1": attributes["off_peak_1"],
"peak": attributes["peak"],
"off_peak_2": attributes["off_peak_2"],
"month_cost": home.month_cost,
"peak_hour": home.peak_hour,
"peak_hour_time": home.peak_hour_time,
"month_cons": home.month_cons,
"app_nickname": home.info["viewer"]["home"].get("appNickname"),
"grid_company": home.info["viewer"]["home"]["meteringPointData"]["gridCompany"],
"estimated_annual_consumption": home.info["viewer"]["home"][
"meteringPointData"
]["estimatedAnnualConsumption"],
}
return result
class TibberDataCoordinator(DataUpdateCoordinator[None]):
"""Handle Tibber data and insert statistics."""
@@ -57,13 +109,16 @@ class TibberDataCoordinator(DataUpdateCoordinator[None]):
name=f"Tibber {tibber_connection.name}",
update_interval=timedelta(minutes=20),
)
self._tibber_connection = tibber_connection
async def _async_update_data(self) -> None:
"""Update data via API."""
tibber_connection = await self.config_entry.runtime_data.async_get_client(
self.hass
)
try:
await self._tibber_connection.fetch_consumption_data_active_homes()
await self._tibber_connection.fetch_production_data_active_homes()
await tibber_connection.fetch_consumption_data_active_homes()
await tibber_connection.fetch_production_data_active_homes()
await self._insert_statistics()
except tibber.RetryableHttpExceptionError as err:
raise UpdateFailed(f"Error communicating with API ({err.status})") from err
@@ -75,7 +130,10 @@ class TibberDataCoordinator(DataUpdateCoordinator[None]):
async def _insert_statistics(self) -> None:
"""Insert Tibber statistics."""
for home in self._tibber_connection.get_homes():
tibber_connection = await self.config_entry.runtime_data.async_get_client(
self.hass
)
for home in tibber_connection.get_homes():
sensors: list[tuple[str, bool, str | None, str]] = []
if home.hourly_consumption_data:
sensors.append(
@@ -194,6 +252,74 @@ class TibberDataCoordinator(DataUpdateCoordinator[None]):
async_add_external_statistics(self.hass, metadata, statistics)
class TibberPriceCoordinator(DataUpdateCoordinator[dict[str, TibberHomeData]]):
"""Handle Tibber price data and insert statistics."""
config_entry: TibberConfigEntry
def __init__(
self,
hass: HomeAssistant,
config_entry: TibberConfigEntry,
) -> None:
"""Initialize the price coordinator."""
super().__init__(
hass,
_LOGGER,
config_entry=config_entry,
name=f"{DOMAIN} price",
update_interval=timedelta(minutes=1),
)
def _seconds_until_next_15_minute(self) -> float:
"""Return seconds until the next 15-minute boundary (0, 15, 30, 45) in UTC."""
now = dt_util.utcnow()
next_minute = ((now.minute // 15) + 1) * 15
if next_minute >= 60:
next_run = now.replace(minute=0, second=0, microsecond=0) + timedelta(
hours=1
)
else:
next_run = now.replace(
minute=next_minute, second=0, microsecond=0, tzinfo=dt_util.UTC
)
return (next_run - now).total_seconds()
async def _async_update_data(self) -> dict[str, TibberHomeData]:
"""Update data via API and return per-home data for sensors."""
tibber_connection = await self.config_entry.runtime_data.async_get_client(
self.hass
)
try:
await tibber_connection.fetch_consumption_data_active_homes()
await tibber_connection.fetch_production_data_active_homes()
now = dt_util.now()
for home in tibber_connection.get_homes(only_active=True):
update_needed = False
last_data_timestamp = home.last_data_timestamp
if last_data_timestamp is None:
update_needed = True
else:
remaining_seconds = (last_data_timestamp - now).total_seconds()
if remaining_seconds < 11 * 3600:
update_needed = True
if update_needed:
await home.update_info_and_price_info()
except tibber.RetryableHttpExceptionError as err:
raise UpdateFailed(f"Error communicating with API ({err.status})") from err
except tibber.FatalHttpExceptionError as err:
raise UpdateFailed(f"Error communicating with API ({err.status})") from err
result: dict[str, TibberHomeData] = {}
for home in tibber_connection.get_homes(only_active=True):
result[home.home_id] = _build_home_data(home)
self.update_interval = timedelta(seconds=self._seconds_until_next_15_minute())
return result
class TibberDataAPICoordinator(DataUpdateCoordinator[dict[str, TibberDevice]]):
"""Fetch and cache Tibber Data API device capabilities."""

View File

@@ -3,10 +3,8 @@
from __future__ import annotations
from collections.abc import Callable
import datetime
from datetime import timedelta
import logging
from random import randrange
from typing import Any
import aiohttp
@@ -42,18 +40,20 @@ from homeassistant.helpers.update_coordinator import (
CoordinatorEntity,
DataUpdateCoordinator,
)
from homeassistant.util import Throttle, dt as dt_util
from homeassistant.util import dt as dt_util
from .const import DOMAIN, MANUFACTURER, TibberConfigEntry
from .coordinator import TibberDataAPICoordinator, TibberDataCoordinator
from .coordinator import (
TibberDataAPICoordinator,
TibberDataCoordinator,
TibberPriceCoordinator,
)
_LOGGER = logging.getLogger(__name__)
ICON = "mdi:currency-usd"
SCAN_INTERVAL = timedelta(minutes=1)
MIN_TIME_BETWEEN_UPDATES = timedelta(minutes=5)
PARALLEL_UPDATES = 0
TWENTY_MINUTES = 20 * 60
RT_SENSORS_UNIQUE_ID_MIGRATION = {
"accumulated_consumption_last_hour": "accumulated consumption current hour",
@@ -610,6 +610,7 @@ async def _async_setup_graphql_sensors(
entity_registry = er.async_get(hass)
coordinator: TibberDataCoordinator | None = None
price_coordinator: TibberPriceCoordinator | None = None
entities: list[TibberSensor] = []
for home in tibber_connection.get_homes(only_active=False):
try:
@@ -626,7 +627,9 @@ async def _async_setup_graphql_sensors(
raise PlatformNotReady from err
if home.has_active_subscription:
entities.append(TibberSensorElPrice(home))
if price_coordinator is None:
price_coordinator = TibberPriceCoordinator(hass, entry)
entities.append(TibberSensorElPrice(price_coordinator, home))
if coordinator is None:
coordinator = TibberDataCoordinator(hass, entry, tibber_connection)
entities.extend(
@@ -737,18 +740,19 @@ class TibberSensor(SensorEntity):
return device_info
class TibberSensorElPrice(TibberSensor):
class TibberSensorElPrice(TibberSensor, CoordinatorEntity[TibberPriceCoordinator]):
"""Representation of a Tibber sensor for el price."""
_attr_state_class = SensorStateClass.MEASUREMENT
_attr_translation_key = "electricity_price"
def __init__(self, tibber_home: TibberHome) -> None:
def __init__(
self,
coordinator: TibberPriceCoordinator,
tibber_home: TibberHome,
) -> None:
"""Initialize the sensor."""
super().__init__(tibber_home=tibber_home)
self._last_updated: datetime.datetime | None = None
self._spread_load_constant = randrange(TWENTY_MINUTES)
super().__init__(coordinator=coordinator, tibber_home=tibber_home)
self._attr_available = False
self._attr_extra_state_attributes = {
"app_nickname": None,
@@ -768,51 +772,36 @@ class TibberSensorElPrice(TibberSensor):
self._device_name = self._home_name
async def async_update(self) -> None:
"""Get the latest data and updates the states."""
now = dt_util.now()
if (
not self._tibber_home.last_data_timestamp
or (self._tibber_home.last_data_timestamp - now).total_seconds()
< 10 * 3600 - self._spread_load_constant
or not self.available
):
_LOGGER.debug("Asking for new data")
await self._fetch_data()
elif (
self._tibber_home.price_total
and self._last_updated
and self._last_updated.hour == now.hour
and now - self._last_updated < timedelta(minutes=15)
and self._tibber_home.last_data_timestamp
):
@callback
def _handle_coordinator_update(self) -> None:
"""Handle updated data from the coordinator."""
data = self.coordinator.data
if not data:
self._attr_available = False
return
res = self._tibber_home.current_price_data()
self._attr_native_value, self._last_updated, price_rank = res
self._attr_extra_state_attributes["intraday_price_ranking"] = price_rank
attrs = self._tibber_home.current_attributes()
self._attr_extra_state_attributes.update(attrs)
self._attr_available = self._attr_native_value is not None
self._attr_native_unit_of_measurement = self._tibber_home.price_unit
@Throttle(MIN_TIME_BETWEEN_UPDATES)
async def _fetch_data(self) -> None:
_LOGGER.debug("Fetching data")
try:
await self._tibber_home.update_info_and_price_info()
except TimeoutError, aiohttp.ClientError:
home_data = data.get(self._tibber_home.home_id)
if home_data is None:
self._attr_available = False
return
data = self._tibber_home.info["viewer"]["home"]
self._attr_extra_state_attributes["app_nickname"] = data["appNickname"]
self._attr_extra_state_attributes["grid_company"] = data["meteringPointData"][
"gridCompany"
self._attr_native_value = home_data.get("current_price")
self._attr_extra_state_attributes["intraday_price_ranking"] = home_data.get(
"intraday_price_ranking"
)
self._attr_extra_state_attributes["max_price"] = home_data["max_price"]
self._attr_extra_state_attributes["avg_price"] = home_data["avg_price"]
self._attr_extra_state_attributes["min_price"] = home_data["min_price"]
self._attr_extra_state_attributes["off_peak_1"] = home_data["off_peak_1"]
self._attr_extra_state_attributes["peak"] = home_data["peak"]
self._attr_extra_state_attributes["off_peak_2"] = home_data["off_peak_2"]
self._attr_extra_state_attributes["app_nickname"] = home_data["app_nickname"]
self._attr_extra_state_attributes["grid_company"] = home_data["grid_company"]
self._attr_extra_state_attributes["estimated_annual_consumption"] = home_data[
"estimated_annual_consumption"
]
self._attr_extra_state_attributes["estimated_annual_consumption"] = data[
"meteringPointData"
]["estimatedAnnualConsumption"]
self._attr_available = self._attr_native_value is not None
self.async_write_ha_state()
class TibberDataSensor(TibberSensor, CoordinatorEntity[TibberDataCoordinator]):

View File

@@ -24,6 +24,10 @@ async def test_async_setup_entry(
tibber_connection.fetch_production_data_active_homes.return_value = None
tibber_connection.get_homes = mock_get_homes
runtime_data = AsyncMock()
runtime_data.async_get_client.return_value = tibber_connection
config_entry.runtime_data = runtime_data
coordinator = TibberDataCoordinator(hass, config_entry, tibber_connection)
await coordinator._async_update_data()
await async_wait_recording_done(hass)