mirror of
https://github.com/home-assistant/core.git
synced 2026-03-03 06:17:01 +01:00
Compare commits
1 Commits
trigger/al
...
tibber_ref
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
81cd811150 |
@@ -2,9 +2,9 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import timedelta
|
||||
from datetime import datetime, timedelta
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, cast
|
||||
from typing import TYPE_CHECKING, TypedDict, cast
|
||||
|
||||
from aiohttp.client_exceptions import ClientError
|
||||
import tibber
|
||||
@@ -38,6 +38,39 @@ FIVE_YEARS = 5 * 365 * 24
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TibberHomeData(TypedDict, total=False):
|
||||
"""Data for a Tibber home used by the price sensor."""
|
||||
|
||||
current_price: float | None
|
||||
last_updated: datetime | None
|
||||
intraday_price_ranking: float | None
|
||||
attributes: dict[str, float]
|
||||
price_unit: str
|
||||
app_nickname: str | None
|
||||
grid_company: str | None
|
||||
estimated_annual_consumption: int | None
|
||||
|
||||
|
||||
def _build_home_data(home: tibber.TibberHome) -> TibberHomeData:
|
||||
"""Build TibberHomeData from a TibberHome for the price sensor."""
|
||||
current_price, last_updated, price_rank = home.current_price_data()
|
||||
attributes = home.current_attributes()
|
||||
result: TibberHomeData = {
|
||||
"current_price": current_price,
|
||||
"last_updated": last_updated,
|
||||
"intraday_price_ranking": price_rank,
|
||||
"attributes": attributes,
|
||||
"price_unit": home.price_unit,
|
||||
}
|
||||
data = home.info["viewer"]["home"]
|
||||
result["app_nickname"] = data.get("appNickname")
|
||||
result["grid_company"] = data.get("meteringPointData")["gridCompany"]
|
||||
result["estimated_annual_consumption"] = data["meteringPointData"][
|
||||
"estimatedAnnualConsumption"
|
||||
]
|
||||
return result
|
||||
|
||||
|
||||
class TibberDataCoordinator(DataUpdateCoordinator[None]):
|
||||
"""Handle Tibber data and insert statistics."""
|
||||
|
||||
@@ -57,13 +90,16 @@ class TibberDataCoordinator(DataUpdateCoordinator[None]):
|
||||
name=f"Tibber {tibber_connection.name}",
|
||||
update_interval=timedelta(minutes=20),
|
||||
)
|
||||
self._tibber_connection = tibber_connection
|
||||
|
||||
async def _async_update_data(self) -> None:
|
||||
"""Update data via API."""
|
||||
tibber_connection = await self.config_entry.runtime_data.async_get_client(
|
||||
self.hass
|
||||
)
|
||||
|
||||
try:
|
||||
await self._tibber_connection.fetch_consumption_data_active_homes()
|
||||
await self._tibber_connection.fetch_production_data_active_homes()
|
||||
await tibber_connection.fetch_consumption_data_active_homes()
|
||||
await tibber_connection.fetch_production_data_active_homes()
|
||||
await self._insert_statistics()
|
||||
except tibber.RetryableHttpExceptionError as err:
|
||||
raise UpdateFailed(f"Error communicating with API ({err.status})") from err
|
||||
@@ -75,7 +111,10 @@ class TibberDataCoordinator(DataUpdateCoordinator[None]):
|
||||
|
||||
async def _insert_statistics(self) -> None:
|
||||
"""Insert Tibber statistics."""
|
||||
for home in self._tibber_connection.get_homes():
|
||||
tibber_connection = await self.config_entry.runtime_data.async_get_client(
|
||||
self.hass
|
||||
)
|
||||
for home in tibber_connection.get_homes():
|
||||
sensors: list[tuple[str, bool, str | None, str]] = []
|
||||
if home.hourly_consumption_data:
|
||||
sensors.append(
|
||||
@@ -194,6 +233,74 @@ class TibberDataCoordinator(DataUpdateCoordinator[None]):
|
||||
async_add_external_statistics(self.hass, metadata, statistics)
|
||||
|
||||
|
||||
class TibberPriceCoordinator(DataUpdateCoordinator[dict[str, TibberHomeData]]):
|
||||
"""Handle Tibber price data and insert statistics."""
|
||||
|
||||
config_entry: TibberConfigEntry
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
hass: HomeAssistant,
|
||||
config_entry: TibberConfigEntry,
|
||||
) -> None:
|
||||
"""Initialize the price coordinator."""
|
||||
super().__init__(
|
||||
hass,
|
||||
_LOGGER,
|
||||
config_entry=config_entry,
|
||||
name=f"{DOMAIN} price",
|
||||
update_interval=timedelta(minutes=1),
|
||||
)
|
||||
|
||||
def _seconds_until_next_15_minute(self) -> float:
|
||||
"""Return seconds until the next 15-minute boundary (0, 15, 30, 45) in UTC."""
|
||||
now = dt_util.utcnow()
|
||||
next_minute = ((now.minute // 15) + 1) * 15
|
||||
if next_minute >= 60:
|
||||
next_run = now.replace(minute=0, second=0, microsecond=0) + timedelta(
|
||||
hours=1
|
||||
)
|
||||
else:
|
||||
next_run = now.replace(
|
||||
minute=next_minute, second=0, microsecond=0, tzinfo=dt_util.UTC
|
||||
)
|
||||
return (next_run - now).total_seconds()
|
||||
|
||||
async def _async_update_data(self) -> dict[str, TibberHomeData]:
|
||||
"""Update data via API and return per-home data for sensors."""
|
||||
tibber_connection = await self.config_entry.runtime_data.async_get_client(
|
||||
self.hass
|
||||
)
|
||||
try:
|
||||
await tibber_connection.fetch_consumption_data_active_homes()
|
||||
await tibber_connection.fetch_production_data_active_homes()
|
||||
now = dt_util.now()
|
||||
for home in tibber_connection.get_homes(only_active=True):
|
||||
update_needed = False
|
||||
last_data_timestamp = home.last_data_timestamp
|
||||
|
||||
if last_data_timestamp is None:
|
||||
update_needed = True
|
||||
else:
|
||||
remaining_seconds = (last_data_timestamp - now).total_seconds()
|
||||
if remaining_seconds < 11 * 3600:
|
||||
update_needed = True
|
||||
|
||||
if update_needed:
|
||||
await home.update_info_and_price_info()
|
||||
except tibber.RetryableHttpExceptionError as err:
|
||||
raise UpdateFailed(f"Error communicating with API ({err.status})") from err
|
||||
except tibber.FatalHttpExceptionError as err:
|
||||
raise UpdateFailed(f"Error communicating with API ({err.status})") from err
|
||||
|
||||
result: dict[str, TibberHomeData] = {}
|
||||
for home in tibber_connection.get_homes(only_active=True):
|
||||
result[home.home_id] = _build_home_data(home)
|
||||
|
||||
self.update_interval = timedelta(seconds=self._seconds_until_next_15_minute())
|
||||
return result
|
||||
|
||||
|
||||
class TibberDataAPICoordinator(DataUpdateCoordinator[dict[str, TibberDevice]]):
|
||||
"""Fetch and cache Tibber Data API device capabilities."""
|
||||
|
||||
|
||||
@@ -3,10 +3,8 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Callable
|
||||
import datetime
|
||||
from datetime import timedelta
|
||||
import logging
|
||||
from random import randrange
|
||||
from typing import Any
|
||||
|
||||
import aiohttp
|
||||
@@ -42,18 +40,20 @@ from homeassistant.helpers.update_coordinator import (
|
||||
CoordinatorEntity,
|
||||
DataUpdateCoordinator,
|
||||
)
|
||||
from homeassistant.util import Throttle, dt as dt_util
|
||||
from homeassistant.util import dt as dt_util
|
||||
|
||||
from .const import DOMAIN, MANUFACTURER, TibberConfigEntry
|
||||
from .coordinator import TibberDataAPICoordinator, TibberDataCoordinator
|
||||
from .coordinator import (
|
||||
TibberDataAPICoordinator,
|
||||
TibberDataCoordinator,
|
||||
TibberPriceCoordinator,
|
||||
)
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
ICON = "mdi:currency-usd"
|
||||
SCAN_INTERVAL = timedelta(minutes=1)
|
||||
MIN_TIME_BETWEEN_UPDATES = timedelta(minutes=5)
|
||||
PARALLEL_UPDATES = 0
|
||||
TWENTY_MINUTES = 20 * 60
|
||||
|
||||
RT_SENSORS_UNIQUE_ID_MIGRATION = {
|
||||
"accumulated_consumption_last_hour": "accumulated consumption current hour",
|
||||
@@ -610,6 +610,7 @@ async def _async_setup_graphql_sensors(
|
||||
entity_registry = er.async_get(hass)
|
||||
|
||||
coordinator: TibberDataCoordinator | None = None
|
||||
price_coordinator: TibberPriceCoordinator | None = None
|
||||
entities: list[TibberSensor] = []
|
||||
for home in tibber_connection.get_homes(only_active=False):
|
||||
try:
|
||||
@@ -626,7 +627,9 @@ async def _async_setup_graphql_sensors(
|
||||
raise PlatformNotReady from err
|
||||
|
||||
if home.has_active_subscription:
|
||||
entities.append(TibberSensorElPrice(home))
|
||||
if price_coordinator is None:
|
||||
price_coordinator = TibberPriceCoordinator(hass, entry)
|
||||
entities.append(TibberSensorElPrice(price_coordinator, home))
|
||||
if coordinator is None:
|
||||
coordinator = TibberDataCoordinator(hass, entry, tibber_connection)
|
||||
entities.extend(
|
||||
@@ -737,18 +740,19 @@ class TibberSensor(SensorEntity):
|
||||
return device_info
|
||||
|
||||
|
||||
class TibberSensorElPrice(TibberSensor):
|
||||
class TibberSensorElPrice(TibberSensor, CoordinatorEntity[TibberPriceCoordinator]):
|
||||
"""Representation of a Tibber sensor for el price."""
|
||||
|
||||
_attr_state_class = SensorStateClass.MEASUREMENT
|
||||
_attr_translation_key = "electricity_price"
|
||||
|
||||
def __init__(self, tibber_home: TibberHome) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
coordinator: TibberPriceCoordinator,
|
||||
tibber_home: TibberHome,
|
||||
) -> None:
|
||||
"""Initialize the sensor."""
|
||||
super().__init__(tibber_home=tibber_home)
|
||||
self._last_updated: datetime.datetime | None = None
|
||||
self._spread_load_constant = randrange(TWENTY_MINUTES)
|
||||
|
||||
super().__init__(coordinator=coordinator, tibber_home=tibber_home)
|
||||
self._attr_available = False
|
||||
self._attr_extra_state_attributes = {
|
||||
"app_nickname": None,
|
||||
@@ -768,51 +772,36 @@ class TibberSensorElPrice(TibberSensor):
|
||||
|
||||
self._device_name = self._home_name
|
||||
|
||||
async def async_update(self) -> None:
|
||||
"""Get the latest data and updates the states."""
|
||||
now = dt_util.now()
|
||||
if (
|
||||
not self._tibber_home.last_data_timestamp
|
||||
or (self._tibber_home.last_data_timestamp - now).total_seconds()
|
||||
< 10 * 3600 - self._spread_load_constant
|
||||
or not self.available
|
||||
):
|
||||
_LOGGER.debug("Asking for new data")
|
||||
await self._fetch_data()
|
||||
|
||||
elif (
|
||||
self._tibber_home.price_total
|
||||
and self._last_updated
|
||||
and self._last_updated.hour == now.hour
|
||||
and now - self._last_updated < timedelta(minutes=15)
|
||||
and self._tibber_home.last_data_timestamp
|
||||
):
|
||||
@callback
|
||||
def _handle_coordinator_update(self) -> None:
|
||||
"""Handle updated data from the coordinator."""
|
||||
data = self.coordinator.data
|
||||
if not data:
|
||||
self._attr_available = False
|
||||
return
|
||||
|
||||
res = self._tibber_home.current_price_data()
|
||||
self._attr_native_value, self._last_updated, price_rank = res
|
||||
self._attr_extra_state_attributes["intraday_price_ranking"] = price_rank
|
||||
home_data = data.get(self._tibber_home.home_id)
|
||||
if home_data is None:
|
||||
self._attr_available = False
|
||||
return
|
||||
|
||||
attrs = self._tibber_home.current_attributes()
|
||||
self._attr_extra_state_attributes.update(attrs)
|
||||
self._attr_native_value = home_data.get("current_price")
|
||||
self._attr_extra_state_attributes["intraday_price_ranking"] = home_data.get(
|
||||
"intraday_price_ranking"
|
||||
)
|
||||
if attrs := home_data.get("attributes"):
|
||||
self._attr_extra_state_attributes.update(attrs)
|
||||
self._attr_native_unit_of_measurement = home_data.get("price_unit", "")
|
||||
if (app_nickname := home_data.get("app_nickname")) is not None:
|
||||
self._attr_extra_state_attributes["app_nickname"] = app_nickname
|
||||
if (grid_company := home_data.get("grid_company")) is not None:
|
||||
self._attr_extra_state_attributes["grid_company"] = grid_company
|
||||
if (est_annual := home_data.get("estimated_annual_consumption")) is not None:
|
||||
self._attr_extra_state_attributes["estimated_annual_consumption"] = (
|
||||
est_annual
|
||||
)
|
||||
self._attr_available = self._attr_native_value is not None
|
||||
self._attr_native_unit_of_measurement = self._tibber_home.price_unit
|
||||
|
||||
@Throttle(MIN_TIME_BETWEEN_UPDATES)
|
||||
async def _fetch_data(self) -> None:
|
||||
_LOGGER.debug("Fetching data")
|
||||
try:
|
||||
await self._tibber_home.update_info_and_price_info()
|
||||
except TimeoutError, aiohttp.ClientError:
|
||||
return
|
||||
data = self._tibber_home.info["viewer"]["home"]
|
||||
self._attr_extra_state_attributes["app_nickname"] = data["appNickname"]
|
||||
self._attr_extra_state_attributes["grid_company"] = data["meteringPointData"][
|
||||
"gridCompany"
|
||||
]
|
||||
self._attr_extra_state_attributes["estimated_annual_consumption"] = data[
|
||||
"meteringPointData"
|
||||
]["estimatedAnnualConsumption"]
|
||||
self.async_write_ha_state()
|
||||
|
||||
|
||||
class TibberDataSensor(TibberSensor, CoordinatorEntity[TibberDataCoordinator]):
|
||||
|
||||
Reference in New Issue
Block a user