Add a coordinator to Waze Travel Time (#148585)

This commit is contained in:
Etienne C.
2025-08-11 13:20:18 +02:00
committed by GitHub
parent 531073acc0
commit d54f979612
7 changed files with 293 additions and 268 deletions

View File

@@ -1,15 +1,13 @@
"""The waze_travel_time component."""
import asyncio
from collections.abc import Collection
import logging
from typing import Literal
from pywaze.route_calculator import CalcRoutesResponse, WazeRouteCalculator, WRCError
from pywaze.route_calculator import WazeRouteCalculator
import voluptuous as vol
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_REGION, Platform, UnitOfLength
from homeassistant.const import CONF_REGION, Platform
from homeassistant.core import (
HomeAssistant,
ServiceCall,
@@ -27,7 +25,6 @@ from homeassistant.helpers.selector import (
TextSelectorConfig,
TextSelectorType,
)
from homeassistant.util.unit_conversion import DistanceConverter
from .const import (
CONF_AVOID_FERRIES,
@@ -43,13 +40,13 @@ from .const import (
DEFAULT_FILTER,
DEFAULT_VEHICLE_TYPE,
DOMAIN,
IMPERIAL_UNITS,
METRIC_UNITS,
REGIONS,
SEMAPHORE,
UNITS,
VEHICLE_TYPES,
)
from .coordinator import WazeTravelTimeCoordinator, async_get_travel_times
PLATFORMS = [Platform.SENSOR]
@@ -109,6 +106,16 @@ async def async_setup_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> b
if SEMAPHORE not in hass.data.setdefault(DOMAIN, {}):
hass.data.setdefault(DOMAIN, {})[SEMAPHORE] = asyncio.Semaphore(1)
httpx_client = get_async_client(hass)
client = WazeRouteCalculator(
region=config_entry.data[CONF_REGION].upper(), client=httpx_client
)
coordinator = WazeTravelTimeCoordinator(hass, config_entry, client)
config_entry.runtime_data = coordinator
await coordinator.async_config_entry_first_refresh()
await hass.config_entries.async_forward_entry_setups(config_entry, PLATFORMS)
async def async_get_travel_times_service(service: ServiceCall) -> ServiceResponse:
@@ -140,7 +147,7 @@ async def async_setup_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> b
incl_filters=service.data.get(CONF_INCL_FILTER, DEFAULT_FILTER),
excl_filters=service.data.get(CONF_EXCL_FILTER, DEFAULT_FILTER),
)
return {"routes": [vars(route) for route in response]} if response else None
return {"routes": [vars(route) for route in response]}
hass.services.async_register(
DOMAIN,
@@ -152,106 +159,6 @@ async def async_setup_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> b
return True
async def async_get_travel_times(
client: WazeRouteCalculator,
origin: str,
destination: str,
vehicle_type: str,
avoid_toll_roads: bool,
avoid_subscription_roads: bool,
avoid_ferries: bool,
realtime: bool,
units: Literal["metric", "imperial"] = "metric",
incl_filters: Collection[str] | None = None,
excl_filters: Collection[str] | None = None,
) -> list[CalcRoutesResponse] | None:
"""Get all available routes."""
incl_filters = incl_filters or ()
excl_filters = excl_filters or ()
_LOGGER.debug(
"Getting update for origin: %s destination: %s",
origin,
destination,
)
routes = []
vehicle_type = "" if vehicle_type.upper() == "CAR" else vehicle_type.upper()
try:
routes = await client.calc_routes(
origin,
destination,
vehicle_type=vehicle_type,
avoid_toll_roads=avoid_toll_roads,
avoid_subscription_roads=avoid_subscription_roads,
avoid_ferries=avoid_ferries,
real_time=realtime,
alternatives=3,
)
_LOGGER.debug("Got routes: %s", routes)
incl_routes: list[CalcRoutesResponse] = []
def should_include_route(route: CalcRoutesResponse) -> bool:
if len(incl_filters) < 1:
return True
should_include = any(
street_name in incl_filters or "" in incl_filters
for street_name in route.street_names
)
if not should_include:
_LOGGER.debug(
"Excluding route [%s], because no inclusive filter matched any streetname",
route.name,
)
return False
return True
incl_routes = [route for route in routes if should_include_route(route)]
filtered_routes: list[CalcRoutesResponse] = []
def should_exclude_route(route: CalcRoutesResponse) -> bool:
for street_name in route.street_names:
for excl_filter in excl_filters:
if excl_filter == street_name:
_LOGGER.debug(
"Excluding route, because exclusive filter [%s] matched streetname: %s",
excl_filter,
route.name,
)
return True
return False
filtered_routes = [
route for route in incl_routes if not should_exclude_route(route)
]
if units == IMPERIAL_UNITS:
filtered_routes = [
CalcRoutesResponse(
name=route.name,
distance=DistanceConverter.convert(
route.distance, UnitOfLength.KILOMETERS, UnitOfLength.MILES
),
duration=route.duration,
street_names=route.street_names,
)
for route in filtered_routes
if route.distance is not None
]
if len(filtered_routes) < 1:
_LOGGER.warning("No routes found")
return None
except WRCError as exp:
_LOGGER.warning("Error on retrieving data: %s", exp)
return None
else:
return filtered_routes
async def async_unload_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> bool:
"""Unload a config entry."""
return await hass.config_entries.async_unload_platforms(config_entry, PLATFORMS)

View File

@@ -0,0 +1,245 @@
"""The Waze Travel Time data coordinator."""
import asyncio
from collections.abc import Collection
from dataclasses import dataclass
from datetime import timedelta
import logging
from typing import Literal
from pywaze.route_calculator import CalcRoutesResponse, WazeRouteCalculator, WRCError
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import UnitOfLength
from homeassistant.core import HomeAssistant
from homeassistant.helpers.location import find_coordinates
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from homeassistant.util.unit_conversion import DistanceConverter
from .const import (
CONF_AVOID_FERRIES,
CONF_AVOID_SUBSCRIPTION_ROADS,
CONF_AVOID_TOLL_ROADS,
CONF_DESTINATION,
CONF_EXCL_FILTER,
CONF_INCL_FILTER,
CONF_ORIGIN,
CONF_REALTIME,
CONF_UNITS,
CONF_VEHICLE_TYPE,
DOMAIN,
IMPERIAL_UNITS,
SEMAPHORE,
)
_LOGGER = logging.getLogger(__name__)
SCAN_INTERVAL = timedelta(minutes=5)
SECONDS_BETWEEN_API_CALLS = 0.5
async def async_get_travel_times(
client: WazeRouteCalculator,
origin: str,
destination: str,
vehicle_type: str,
avoid_toll_roads: bool,
avoid_subscription_roads: bool,
avoid_ferries: bool,
realtime: bool,
units: Literal["metric", "imperial"] = "metric",
incl_filters: Collection[str] | None = None,
excl_filters: Collection[str] | None = None,
) -> list[CalcRoutesResponse]:
"""Get all available routes."""
incl_filters = incl_filters or ()
excl_filters = excl_filters or ()
_LOGGER.debug(
"Getting update for origin: %s destination: %s",
origin,
destination,
)
routes = []
vehicle_type = "" if vehicle_type.upper() == "CAR" else vehicle_type.upper()
try:
routes = await client.calc_routes(
origin,
destination,
vehicle_type=vehicle_type,
avoid_toll_roads=avoid_toll_roads,
avoid_subscription_roads=avoid_subscription_roads,
avoid_ferries=avoid_ferries,
real_time=realtime,
alternatives=3,
)
if len(routes) < 1:
_LOGGER.warning("No routes found")
return routes
_LOGGER.debug("Got routes: %s", routes)
incl_routes: list[CalcRoutesResponse] = []
def should_include_route(route: CalcRoutesResponse) -> bool:
if len(incl_filters) < 1:
return True
should_include = any(
street_name in incl_filters or "" in incl_filters
for street_name in route.street_names
)
if not should_include:
_LOGGER.debug(
"Excluding route [%s], because no inclusive filter matched any streetname",
route.name,
)
return False
return True
incl_routes = [route for route in routes if should_include_route(route)]
filtered_routes: list[CalcRoutesResponse] = []
def should_exclude_route(route: CalcRoutesResponse) -> bool:
for street_name in route.street_names:
for excl_filter in excl_filters:
if excl_filter == street_name:
_LOGGER.debug(
"Excluding route, because exclusive filter [%s] matched streetname: %s",
excl_filter,
route.name,
)
return True
return False
filtered_routes = [
route for route in incl_routes if not should_exclude_route(route)
]
if len(filtered_routes) < 1:
_LOGGER.warning("No routes matched your filters")
return filtered_routes
if units == IMPERIAL_UNITS:
filtered_routes = [
CalcRoutesResponse(
name=route.name,
distance=DistanceConverter.convert(
route.distance, UnitOfLength.KILOMETERS, UnitOfLength.MILES
),
duration=route.duration,
street_names=route.street_names,
)
for route in filtered_routes
if route.distance is not None
]
except WRCError as exp:
raise UpdateFailed(f"Error on retrieving data: {exp}") from exp
else:
return filtered_routes
@dataclass
class WazeTravelTimeData:
"""WazeTravelTime data class."""
origin: str
destination: str
duration: float | None
distance: float | None
route: str | None
class WazeTravelTimeCoordinator(DataUpdateCoordinator[WazeTravelTimeData]):
"""Waze Travel Time DataUpdateCoordinator."""
config_entry: ConfigEntry
def __init__(
self,
hass: HomeAssistant,
config_entry: ConfigEntry,
client: WazeRouteCalculator,
) -> None:
"""Initialize."""
super().__init__(
hass,
_LOGGER,
name=DOMAIN,
config_entry=config_entry,
update_interval=SCAN_INTERVAL,
)
self.client = client
self._origin = config_entry.data[CONF_ORIGIN]
self._destination = config_entry.data[CONF_DESTINATION]
async def _async_update_data(self) -> WazeTravelTimeData:
"""Get the latest data from Waze."""
origin_coordinates = find_coordinates(self.hass, self._origin)
destination_coordinates = find_coordinates(self.hass, self._destination)
_LOGGER.debug(
"Fetching Route for %s, from %s to %s",
self.config_entry.title,
self._origin,
self._destination,
)
await self.hass.data[DOMAIN][SEMAPHORE].acquire()
try:
if origin_coordinates is None or destination_coordinates is None:
raise UpdateFailed("Unable to determine origin or destination")
# Grab options on every update
incl_filter = self.config_entry.options[CONF_INCL_FILTER]
excl_filter = self.config_entry.options[CONF_EXCL_FILTER]
realtime = self.config_entry.options[CONF_REALTIME]
vehicle_type = self.config_entry.options[CONF_VEHICLE_TYPE]
avoid_toll_roads = self.config_entry.options[CONF_AVOID_TOLL_ROADS]
avoid_subscription_roads = self.config_entry.options[
CONF_AVOID_SUBSCRIPTION_ROADS
]
avoid_ferries = self.config_entry.options[CONF_AVOID_FERRIES]
routes = await async_get_travel_times(
self.client,
origin_coordinates,
destination_coordinates,
vehicle_type,
avoid_toll_roads,
avoid_subscription_roads,
avoid_ferries,
realtime,
self.config_entry.options[CONF_UNITS],
incl_filter,
excl_filter,
)
if len(routes) < 1:
travel_data = WazeTravelTimeData(
origin=origin_coordinates,
destination=destination_coordinates,
duration=None,
distance=None,
route=None,
)
else:
route = routes[0]
travel_data = WazeTravelTimeData(
origin=origin_coordinates,
destination=destination_coordinates,
duration=route.duration,
distance=route.distance,
route=route.name,
)
await asyncio.sleep(SECONDS_BETWEEN_API_CALLS)
finally:
self.hass.data[DOMAIN][SEMAPHORE].release()
return travel_data

View File

@@ -2,56 +2,22 @@
from __future__ import annotations
import asyncio
from datetime import timedelta
import logging
from typing import Any
import httpx
from pywaze.route_calculator import WazeRouteCalculator
from homeassistant.components.sensor import (
SensorDeviceClass,
SensorEntity,
SensorStateClass,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
CONF_NAME,
CONF_REGION,
EVENT_HOMEASSISTANT_STARTED,
UnitOfTime,
)
from homeassistant.core import CoreState, HomeAssistant
from homeassistant.const import CONF_NAME, UnitOfTime
from homeassistant.core import HomeAssistant
from homeassistant.helpers.device_registry import DeviceEntryType, DeviceInfo
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from homeassistant.helpers.httpx_client import get_async_client
from homeassistant.helpers.location import find_coordinates
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from . import async_get_travel_times
from .const import (
CONF_AVOID_FERRIES,
CONF_AVOID_SUBSCRIPTION_ROADS,
CONF_AVOID_TOLL_ROADS,
CONF_DESTINATION,
CONF_EXCL_FILTER,
CONF_INCL_FILTER,
CONF_ORIGIN,
CONF_REALTIME,
CONF_UNITS,
CONF_VEHICLE_TYPE,
DEFAULT_NAME,
DOMAIN,
SEMAPHORE,
)
_LOGGER = logging.getLogger(__name__)
SCAN_INTERVAL = timedelta(minutes=5)
PARALLEL_UPDATES = 1
SECONDS_BETWEEN_API_CALLS = 0.5
from .const import DEFAULT_NAME, DOMAIN
from .coordinator import WazeTravelTimeCoordinator
async def async_setup_entry(
@@ -60,23 +26,15 @@ async def async_setup_entry(
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up a Waze travel time sensor entry."""
destination = config_entry.data[CONF_DESTINATION]
origin = config_entry.data[CONF_ORIGIN]
region = config_entry.data[CONF_REGION]
name = config_entry.data.get(CONF_NAME, DEFAULT_NAME)
coordinator = config_entry.runtime_data
data = WazeTravelTimeData(
region,
get_async_client(hass),
config_entry,
)
sensor = WazeTravelTime(config_entry.entry_id, name, origin, destination, data)
sensor = WazeTravelTimeSensor(config_entry.entry_id, name, coordinator)
async_add_entities([sensor], False)
class WazeTravelTime(SensorEntity):
class WazeTravelTimeSensor(CoordinatorEntity[WazeTravelTimeCoordinator], SensorEntity):
"""Representation of a Waze travel time sensor."""
_attr_attribution = "Powered by Waze"
@@ -95,119 +53,33 @@ class WazeTravelTime(SensorEntity):
self,
unique_id: str,
name: str,
origin: str,
destination: str,
waze_data: WazeTravelTimeData,
coordinator: WazeTravelTimeCoordinator,
) -> None:
"""Initialize the Waze travel time sensor."""
super().__init__(coordinator)
self._attr_unique_id = unique_id
self._waze_data = waze_data
self._attr_name = name
self._origin = origin
self._destination = destination
self._state = None
async def async_added_to_hass(self) -> None:
"""Handle when entity is added."""
if self.hass.state is not CoreState.running:
self.hass.bus.async_listen_once(
EVENT_HOMEASSISTANT_STARTED, self.first_update
)
else:
await self.first_update()
@property
def native_value(self) -> float | None:
"""Return the state of the sensor."""
if self._waze_data.duration is not None:
return round(self._waze_data.duration)
if (
self.coordinator.data is not None
and self.coordinator.data.duration is not None
):
return round(self.coordinator.data.duration)
return None
@property
def extra_state_attributes(self) -> dict[str, Any] | None:
"""Return the state attributes of the last update."""
if self._waze_data.duration is None:
if self.coordinator.data is None:
return None
return {
"duration": self._waze_data.duration,
"distance": self._waze_data.distance,
"route": self._waze_data.route,
"origin": self._waze_data.origin,
"destination": self._waze_data.destination,
"duration": self.coordinator.data.duration,
"distance": self.coordinator.data.distance,
"route": self.coordinator.data.route,
"origin": self.coordinator.data.origin,
"destination": self.coordinator.data.destination,
}
async def first_update(self, _=None) -> None:
"""Run first update and write state."""
await self.async_update()
self.async_write_ha_state()
async def async_update(self) -> None:
"""Fetch new state data for the sensor."""
_LOGGER.debug("Fetching Route for %s", self._attr_name)
self._waze_data.origin = find_coordinates(self.hass, self._origin)
self._waze_data.destination = find_coordinates(self.hass, self._destination)
await self.hass.data[DOMAIN][SEMAPHORE].acquire()
try:
await self._waze_data.async_update()
await asyncio.sleep(SECONDS_BETWEEN_API_CALLS)
finally:
self.hass.data[DOMAIN][SEMAPHORE].release()
class WazeTravelTimeData:
"""WazeTravelTime Data object."""
def __init__(
self, region: str, client: httpx.AsyncClient, config_entry: ConfigEntry
) -> None:
"""Set up WazeRouteCalculator."""
self.config_entry = config_entry
self.client = WazeRouteCalculator(region=region, client=client)
self.origin: str | None = None
self.destination: str | None = None
self.duration = None
self.distance = None
self.route = None
async def async_update(self):
"""Update WazeRouteCalculator Sensor."""
_LOGGER.debug(
"Getting update for origin: %s destination: %s",
self.origin,
self.destination,
)
if self.origin is not None and self.destination is not None:
# Grab options on every update
incl_filter = self.config_entry.options[CONF_INCL_FILTER]
excl_filter = self.config_entry.options[CONF_EXCL_FILTER]
realtime = self.config_entry.options[CONF_REALTIME]
vehicle_type = self.config_entry.options[CONF_VEHICLE_TYPE]
avoid_toll_roads = self.config_entry.options[CONF_AVOID_TOLL_ROADS]
avoid_subscription_roads = self.config_entry.options[
CONF_AVOID_SUBSCRIPTION_ROADS
]
avoid_ferries = self.config_entry.options[CONF_AVOID_FERRIES]
routes = await async_get_travel_times(
self.client,
self.origin,
self.destination,
vehicle_type,
avoid_toll_roads,
avoid_subscription_roads,
avoid_ferries,
realtime,
self.config_entry.options[CONF_UNITS],
incl_filter,
excl_filter,
)
if routes:
route = routes[0]
else:
_LOGGER.warning("No routes found")
return
self.duration = route.duration
self.distance = route.distance
self.route = route.name

View File

@@ -53,7 +53,7 @@ def mock_update_fixture():
@pytest.fixture(name="validate_config_entry")
def validate_config_entry_fixture(mock_update):
"""Return valid config entry."""
mock_update.return_value = None
mock_update.return_value = []
return mock_update

View File

@@ -116,8 +116,8 @@ async def test_options(hass: HomeAssistant) -> None:
CONF_AVOID_FERRIES: True,
CONF_AVOID_SUBSCRIPTION_ROADS: True,
CONF_AVOID_TOLL_ROADS: True,
CONF_EXCL_FILTER: ["exclude"],
CONF_INCL_FILTER: ["include"],
CONF_EXCL_FILTER: ["ExcludeThis"],
CONF_INCL_FILTER: ["IncludeThis"],
CONF_REALTIME: False,
CONF_UNITS: IMPERIAL_UNITS,
CONF_VEHICLE_TYPE: "taxi",
@@ -129,8 +129,8 @@ async def test_options(hass: HomeAssistant) -> None:
CONF_AVOID_FERRIES: True,
CONF_AVOID_SUBSCRIPTION_ROADS: True,
CONF_AVOID_TOLL_ROADS: True,
CONF_EXCL_FILTER: ["exclude"],
CONF_INCL_FILTER: ["include"],
CONF_EXCL_FILTER: ["ExcludeThis"],
CONF_INCL_FILTER: ["IncludeThis"],
CONF_REALTIME: False,
CONF_UNITS: IMPERIAL_UNITS,
CONF_VEHICLE_TYPE: "taxi",
@@ -140,8 +140,8 @@ async def test_options(hass: HomeAssistant) -> None:
CONF_AVOID_FERRIES: True,
CONF_AVOID_SUBSCRIPTION_ROADS: True,
CONF_AVOID_TOLL_ROADS: True,
CONF_EXCL_FILTER: ["exclude"],
CONF_INCL_FILTER: ["include"],
CONF_EXCL_FILTER: ["ExcludeThis"],
CONF_INCL_FILTER: ["IncludeThis"],
CONF_REALTIME: False,
CONF_UNITS: IMPERIAL_UNITS,
CONF_VEHICLE_TYPE: "taxi",

View File

@@ -101,8 +101,8 @@ async def test_migrate_entry_v1_v2(hass: HomeAssistant) -> None:
CONF_AVOID_FERRIES: DEFAULT_AVOID_FERRIES,
CONF_AVOID_SUBSCRIPTION_ROADS: DEFAULT_AVOID_SUBSCRIPTION_ROADS,
CONF_AVOID_TOLL_ROADS: DEFAULT_AVOID_TOLL_ROADS,
CONF_INCL_FILTER: "include",
CONF_EXCL_FILTER: "exclude",
CONF_INCL_FILTER: "IncludeThis",
CONF_EXCL_FILTER: "ExcludeThis",
},
)
@@ -114,5 +114,5 @@ async def test_migrate_entry_v1_v2(hass: HomeAssistant) -> None:
assert updated_entry.state is ConfigEntryState.LOADED
assert updated_entry.version == 2
assert updated_entry.options[CONF_INCL_FILTER] == ["include"]
assert updated_entry.options[CONF_EXCL_FILTER] == ["exclude"]
assert updated_entry.options[CONF_INCL_FILTER] == ["IncludeThis"]
assert updated_entry.options[CONF_EXCL_FILTER] == ["ExcludeThis"]

View File

@@ -18,6 +18,7 @@ from homeassistant.components.waze_travel_time.const import (
IMPERIAL_UNITS,
METRIC_UNITS,
)
from homeassistant.config_entries import ConfigEntryState
from homeassistant.core import HomeAssistant
from .const import MOCK_CONFIG
@@ -153,5 +154,5 @@ async def test_sensor_failed_wrcerror(
await hass.config_entries.async_setup(config_entry.entry_id)
await hass.async_block_till_done()
assert hass.states.get("sensor.waze_travel_time").state == "unknown"
assert config_entry.state is ConfigEntryState.SETUP_RETRY
assert "Error on retrieving data: " in caplog.text