mirror of
https://github.com/home-assistant/core.git
synced 2026-03-04 14:57:07 +01:00
Compare commits
5 Commits
copilot/su
...
tibber_ref
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
15b4d07c13 | ||
|
|
6aeb95c25c | ||
|
|
b502409a4c | ||
|
|
b925045cc4 | ||
|
|
81cd811150 |
@@ -2,9 +2,9 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import timedelta
|
||||
from datetime import datetime, timedelta
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, cast
|
||||
from typing import TYPE_CHECKING, TypedDict, cast
|
||||
|
||||
from aiohttp.client_exceptions import ClientError
|
||||
import tibber
|
||||
@@ -38,6 +38,58 @@ FIVE_YEARS = 5 * 365 * 24
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TibberHomeData(TypedDict):
|
||||
"""Data for a Tibber home used by the price sensor."""
|
||||
|
||||
currency: str
|
||||
price_unit: str
|
||||
current_price: float | None
|
||||
current_price_time: datetime | None
|
||||
intraday_price_ranking: float | None
|
||||
max_price: float
|
||||
avg_price: float
|
||||
min_price: float
|
||||
off_peak_1: float
|
||||
peak: float
|
||||
off_peak_2: float
|
||||
month_cost: float | None
|
||||
peak_hour: float | None
|
||||
peak_hour_time: datetime | None
|
||||
month_cons: float | None
|
||||
app_nickname: str | None
|
||||
grid_company: str | None
|
||||
estimated_annual_consumption: int | None
|
||||
|
||||
|
||||
def _build_home_data(home: tibber.TibberHome) -> TibberHomeData:
|
||||
"""Build TibberHomeData from a TibberHome for the price sensor."""
|
||||
current_price, last_updated, price_rank = home.current_price_data()
|
||||
attributes = home.current_attributes()
|
||||
result: TibberHomeData = {
|
||||
"currency": home.currency,
|
||||
"price_unit": home.price_unit,
|
||||
"current_price": current_price,
|
||||
"current_price_time": last_updated,
|
||||
"intraday_price_ranking": price_rank,
|
||||
"max_price": attributes["max_price"],
|
||||
"avg_price": attributes["avg_price"],
|
||||
"min_price": attributes["min_price"],
|
||||
"off_peak_1": attributes["off_peak_1"],
|
||||
"peak": attributes["peak"],
|
||||
"off_peak_2": attributes["off_peak_2"],
|
||||
"month_cost": home.month_cost,
|
||||
"peak_hour": home.peak_hour,
|
||||
"peak_hour_time": home.peak_hour_time,
|
||||
"month_cons": home.month_cons,
|
||||
"app_nickname": home.info["viewer"]["home"].get("appNickname"),
|
||||
"grid_company": home.info["viewer"]["home"]["meteringPointData"]["gridCompany"],
|
||||
"estimated_annual_consumption": home.info["viewer"]["home"][
|
||||
"meteringPointData"
|
||||
]["estimatedAnnualConsumption"],
|
||||
}
|
||||
return result
|
||||
|
||||
|
||||
class TibberDataCoordinator(DataUpdateCoordinator[None]):
|
||||
"""Handle Tibber data and insert statistics."""
|
||||
|
||||
@@ -57,13 +109,16 @@ class TibberDataCoordinator(DataUpdateCoordinator[None]):
|
||||
name=f"Tibber {tibber_connection.name}",
|
||||
update_interval=timedelta(minutes=20),
|
||||
)
|
||||
self._tibber_connection = tibber_connection
|
||||
|
||||
async def _async_update_data(self) -> None:
|
||||
"""Update data via API."""
|
||||
tibber_connection = await self.config_entry.runtime_data.async_get_client(
|
||||
self.hass
|
||||
)
|
||||
|
||||
try:
|
||||
await self._tibber_connection.fetch_consumption_data_active_homes()
|
||||
await self._tibber_connection.fetch_production_data_active_homes()
|
||||
await tibber_connection.fetch_consumption_data_active_homes()
|
||||
await tibber_connection.fetch_production_data_active_homes()
|
||||
await self._insert_statistics()
|
||||
except tibber.RetryableHttpExceptionError as err:
|
||||
raise UpdateFailed(f"Error communicating with API ({err.status})") from err
|
||||
@@ -75,7 +130,10 @@ class TibberDataCoordinator(DataUpdateCoordinator[None]):
|
||||
|
||||
async def _insert_statistics(self) -> None:
|
||||
"""Insert Tibber statistics."""
|
||||
for home in self._tibber_connection.get_homes():
|
||||
tibber_connection = await self.config_entry.runtime_data.async_get_client(
|
||||
self.hass
|
||||
)
|
||||
for home in tibber_connection.get_homes():
|
||||
sensors: list[tuple[str, bool, str | None, str]] = []
|
||||
if home.hourly_consumption_data:
|
||||
sensors.append(
|
||||
@@ -194,6 +252,74 @@ class TibberDataCoordinator(DataUpdateCoordinator[None]):
|
||||
async_add_external_statistics(self.hass, metadata, statistics)
|
||||
|
||||
|
||||
class TibberPriceCoordinator(DataUpdateCoordinator[dict[str, TibberHomeData]]):
|
||||
"""Handle Tibber price data and insert statistics."""
|
||||
|
||||
config_entry: TibberConfigEntry
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
hass: HomeAssistant,
|
||||
config_entry: TibberConfigEntry,
|
||||
) -> None:
|
||||
"""Initialize the price coordinator."""
|
||||
super().__init__(
|
||||
hass,
|
||||
_LOGGER,
|
||||
config_entry=config_entry,
|
||||
name=f"{DOMAIN} price",
|
||||
update_interval=timedelta(minutes=1),
|
||||
)
|
||||
|
||||
def _seconds_until_next_15_minute(self) -> float:
|
||||
"""Return seconds until the next 15-minute boundary (0, 15, 30, 45) in UTC."""
|
||||
now = dt_util.utcnow()
|
||||
next_minute = ((now.minute // 15) + 1) * 15
|
||||
if next_minute >= 60:
|
||||
next_run = now.replace(minute=0, second=0, microsecond=0) + timedelta(
|
||||
hours=1
|
||||
)
|
||||
else:
|
||||
next_run = now.replace(
|
||||
minute=next_minute, second=0, microsecond=0, tzinfo=dt_util.UTC
|
||||
)
|
||||
return (next_run - now).total_seconds()
|
||||
|
||||
async def _async_update_data(self) -> dict[str, TibberHomeData]:
|
||||
"""Update data via API and return per-home data for sensors."""
|
||||
tibber_connection = await self.config_entry.runtime_data.async_get_client(
|
||||
self.hass
|
||||
)
|
||||
try:
|
||||
await tibber_connection.fetch_consumption_data_active_homes()
|
||||
await tibber_connection.fetch_production_data_active_homes()
|
||||
now = dt_util.now()
|
||||
for home in tibber_connection.get_homes(only_active=True):
|
||||
update_needed = False
|
||||
last_data_timestamp = home.last_data_timestamp
|
||||
|
||||
if last_data_timestamp is None:
|
||||
update_needed = True
|
||||
else:
|
||||
remaining_seconds = (last_data_timestamp - now).total_seconds()
|
||||
if remaining_seconds < 11 * 3600:
|
||||
update_needed = True
|
||||
|
||||
if update_needed:
|
||||
await home.update_info_and_price_info()
|
||||
except tibber.RetryableHttpExceptionError as err:
|
||||
raise UpdateFailed(f"Error communicating with API ({err.status})") from err
|
||||
except tibber.FatalHttpExceptionError as err:
|
||||
raise UpdateFailed(f"Error communicating with API ({err.status})") from err
|
||||
|
||||
result: dict[str, TibberHomeData] = {}
|
||||
for home in tibber_connection.get_homes(only_active=True):
|
||||
result[home.home_id] = _build_home_data(home)
|
||||
|
||||
self.update_interval = timedelta(seconds=self._seconds_until_next_15_minute())
|
||||
return result
|
||||
|
||||
|
||||
class TibberDataAPICoordinator(DataUpdateCoordinator[dict[str, TibberDevice]]):
|
||||
"""Fetch and cache Tibber Data API device capabilities."""
|
||||
|
||||
|
||||
@@ -3,10 +3,8 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Callable
|
||||
import datetime
|
||||
from datetime import timedelta
|
||||
import logging
|
||||
from random import randrange
|
||||
from typing import Any
|
||||
|
||||
import aiohttp
|
||||
@@ -42,18 +40,20 @@ from homeassistant.helpers.update_coordinator import (
|
||||
CoordinatorEntity,
|
||||
DataUpdateCoordinator,
|
||||
)
|
||||
from homeassistant.util import Throttle, dt as dt_util
|
||||
from homeassistant.util import dt as dt_util
|
||||
|
||||
from .const import DOMAIN, MANUFACTURER, TibberConfigEntry
|
||||
from .coordinator import TibberDataAPICoordinator, TibberDataCoordinator
|
||||
from .coordinator import (
|
||||
TibberDataAPICoordinator,
|
||||
TibberDataCoordinator,
|
||||
TibberPriceCoordinator,
|
||||
)
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
ICON = "mdi:currency-usd"
|
||||
SCAN_INTERVAL = timedelta(minutes=1)
|
||||
MIN_TIME_BETWEEN_UPDATES = timedelta(minutes=5)
|
||||
PARALLEL_UPDATES = 0
|
||||
TWENTY_MINUTES = 20 * 60
|
||||
|
||||
RT_SENSORS_UNIQUE_ID_MIGRATION = {
|
||||
"accumulated_consumption_last_hour": "accumulated consumption current hour",
|
||||
@@ -610,6 +610,7 @@ async def _async_setup_graphql_sensors(
|
||||
entity_registry = er.async_get(hass)
|
||||
|
||||
coordinator: TibberDataCoordinator | None = None
|
||||
price_coordinator: TibberPriceCoordinator | None = None
|
||||
entities: list[TibberSensor] = []
|
||||
for home in tibber_connection.get_homes(only_active=False):
|
||||
try:
|
||||
@@ -626,7 +627,9 @@ async def _async_setup_graphql_sensors(
|
||||
raise PlatformNotReady from err
|
||||
|
||||
if home.has_active_subscription:
|
||||
entities.append(TibberSensorElPrice(home))
|
||||
if price_coordinator is None:
|
||||
price_coordinator = TibberPriceCoordinator(hass, entry)
|
||||
entities.append(TibberSensorElPrice(price_coordinator, home))
|
||||
if coordinator is None:
|
||||
coordinator = TibberDataCoordinator(hass, entry, tibber_connection)
|
||||
entities.extend(
|
||||
@@ -737,18 +740,19 @@ class TibberSensor(SensorEntity):
|
||||
return device_info
|
||||
|
||||
|
||||
class TibberSensorElPrice(TibberSensor):
|
||||
class TibberSensorElPrice(TibberSensor, CoordinatorEntity[TibberPriceCoordinator]):
|
||||
"""Representation of a Tibber sensor for el price."""
|
||||
|
||||
_attr_state_class = SensorStateClass.MEASUREMENT
|
||||
_attr_translation_key = "electricity_price"
|
||||
|
||||
def __init__(self, tibber_home: TibberHome) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
coordinator: TibberPriceCoordinator,
|
||||
tibber_home: TibberHome,
|
||||
) -> None:
|
||||
"""Initialize the sensor."""
|
||||
super().__init__(tibber_home=tibber_home)
|
||||
self._last_updated: datetime.datetime | None = None
|
||||
self._spread_load_constant = randrange(TWENTY_MINUTES)
|
||||
|
||||
super().__init__(coordinator=coordinator, tibber_home=tibber_home)
|
||||
self._attr_available = False
|
||||
self._attr_extra_state_attributes = {
|
||||
"app_nickname": None,
|
||||
@@ -768,51 +772,36 @@ class TibberSensorElPrice(TibberSensor):
|
||||
|
||||
self._device_name = self._home_name
|
||||
|
||||
async def async_update(self) -> None:
|
||||
"""Get the latest data and updates the states."""
|
||||
now = dt_util.now()
|
||||
if (
|
||||
not self._tibber_home.last_data_timestamp
|
||||
or (self._tibber_home.last_data_timestamp - now).total_seconds()
|
||||
< 10 * 3600 - self._spread_load_constant
|
||||
or not self.available
|
||||
):
|
||||
_LOGGER.debug("Asking for new data")
|
||||
await self._fetch_data()
|
||||
|
||||
elif (
|
||||
self._tibber_home.price_total
|
||||
and self._last_updated
|
||||
and self._last_updated.hour == now.hour
|
||||
and now - self._last_updated < timedelta(minutes=15)
|
||||
and self._tibber_home.last_data_timestamp
|
||||
):
|
||||
@callback
|
||||
def _handle_coordinator_update(self) -> None:
|
||||
"""Handle updated data from the coordinator."""
|
||||
data = self.coordinator.data
|
||||
if not data:
|
||||
self._attr_available = False
|
||||
return
|
||||
|
||||
res = self._tibber_home.current_price_data()
|
||||
self._attr_native_value, self._last_updated, price_rank = res
|
||||
self._attr_extra_state_attributes["intraday_price_ranking"] = price_rank
|
||||
|
||||
attrs = self._tibber_home.current_attributes()
|
||||
self._attr_extra_state_attributes.update(attrs)
|
||||
self._attr_available = self._attr_native_value is not None
|
||||
self._attr_native_unit_of_measurement = self._tibber_home.price_unit
|
||||
|
||||
@Throttle(MIN_TIME_BETWEEN_UPDATES)
|
||||
async def _fetch_data(self) -> None:
|
||||
_LOGGER.debug("Fetching data")
|
||||
try:
|
||||
await self._tibber_home.update_info_and_price_info()
|
||||
except TimeoutError, aiohttp.ClientError:
|
||||
home_data = data.get(self._tibber_home.home_id)
|
||||
if home_data is None:
|
||||
self._attr_available = False
|
||||
return
|
||||
data = self._tibber_home.info["viewer"]["home"]
|
||||
self._attr_extra_state_attributes["app_nickname"] = data["appNickname"]
|
||||
self._attr_extra_state_attributes["grid_company"] = data["meteringPointData"][
|
||||
"gridCompany"
|
||||
|
||||
self._attr_native_value = home_data.get("current_price")
|
||||
self._attr_extra_state_attributes["intraday_price_ranking"] = home_data.get(
|
||||
"intraday_price_ranking"
|
||||
)
|
||||
self._attr_extra_state_attributes["max_price"] = home_data["max_price"]
|
||||
self._attr_extra_state_attributes["avg_price"] = home_data["avg_price"]
|
||||
self._attr_extra_state_attributes["min_price"] = home_data["min_price"]
|
||||
self._attr_extra_state_attributes["off_peak_1"] = home_data["off_peak_1"]
|
||||
self._attr_extra_state_attributes["peak"] = home_data["peak"]
|
||||
self._attr_extra_state_attributes["off_peak_2"] = home_data["off_peak_2"]
|
||||
self._attr_extra_state_attributes["app_nickname"] = home_data["app_nickname"]
|
||||
self._attr_extra_state_attributes["grid_company"] = home_data["grid_company"]
|
||||
self._attr_extra_state_attributes["estimated_annual_consumption"] = home_data[
|
||||
"estimated_annual_consumption"
|
||||
]
|
||||
self._attr_extra_state_attributes["estimated_annual_consumption"] = data[
|
||||
"meteringPointData"
|
||||
]["estimatedAnnualConsumption"]
|
||||
self._attr_available = self._attr_native_value is not None
|
||||
self.async_write_ha_state()
|
||||
|
||||
|
||||
class TibberDataSensor(TibberSensor, CoordinatorEntity[TibberDataCoordinator]):
|
||||
|
||||
@@ -24,6 +24,10 @@ async def test_async_setup_entry(
|
||||
tibber_connection.fetch_production_data_active_homes.return_value = None
|
||||
tibber_connection.get_homes = mock_get_homes
|
||||
|
||||
runtime_data = AsyncMock()
|
||||
runtime_data.async_get_client.return_value = tibber_connection
|
||||
config_entry.runtime_data = runtime_data
|
||||
|
||||
coordinator = TibberDataCoordinator(hass, config_entry, tibber_connection)
|
||||
await coordinator._async_update_data()
|
||||
await async_wait_recording_done(hass)
|
||||
|
||||
Reference in New Issue
Block a user