mirror of
https://github.com/home-assistant/core.git
synced 2026-06-28 17:46:02 +02:00
Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| be14ef7158 | |||
| b6166ac675 | |||
| 1e41509c0e | |||
| 53ed4c42bd | |||
| 21f7836117 |
@@ -60,6 +60,7 @@ from .const import (
|
||||
DATA_HASSIO_SUPERVISOR_USER,
|
||||
DATA_KEY_SUPERVISOR_ISSUES,
|
||||
DOMAIN,
|
||||
JOBS_COORDINATOR,
|
||||
MAIN_COORDINATOR,
|
||||
STATS_COORDINATOR,
|
||||
)
|
||||
@@ -67,6 +68,7 @@ from .coordinator import (
|
||||
HassioAddOnDataUpdateCoordinator,
|
||||
HassioMainDataUpdateCoordinator,
|
||||
HassioStatsDataUpdateCoordinator,
|
||||
SupervisorJobsCoordinator,
|
||||
get_addons_info,
|
||||
get_addons_list,
|
||||
get_addons_stats,
|
||||
@@ -326,9 +328,11 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
||||
await coordinator.async_config_entry_first_refresh()
|
||||
hass.data[MAIN_COORDINATOR] = coordinator
|
||||
|
||||
addon_coordinator = HassioAddOnDataUpdateCoordinator(
|
||||
hass, entry, dev_reg, coordinator.jobs
|
||||
)
|
||||
jobs_coordinator = SupervisorJobsCoordinator(hass, entry)
|
||||
await jobs_coordinator.async_config_entry_first_refresh()
|
||||
hass.data[JOBS_COORDINATOR] = jobs_coordinator
|
||||
|
||||
addon_coordinator = HassioAddOnDataUpdateCoordinator(hass, entry, dev_reg)
|
||||
await addon_coordinator.async_config_entry_first_refresh()
|
||||
hass.data[ADDONS_COORDINATOR] = addon_coordinator
|
||||
|
||||
@@ -437,5 +441,6 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
|
||||
hass.data.pop(MAIN_COORDINATOR, None)
|
||||
hass.data.pop(ADDONS_COORDINATOR, None)
|
||||
hass.data.pop(STATS_COORDINATOR, None)
|
||||
hass.data.pop(JOBS_COORDINATOR, None)
|
||||
|
||||
return unload_ok
|
||||
|
||||
@@ -27,6 +27,7 @@ if TYPE_CHECKING:
|
||||
HassioAddOnDataUpdateCoordinator,
|
||||
HassioMainDataUpdateCoordinator,
|
||||
HassioStatsDataUpdateCoordinator,
|
||||
SupervisorJobsCoordinator,
|
||||
)
|
||||
from .handler import HassIO
|
||||
from .issues import SupervisorIssues
|
||||
@@ -103,6 +104,9 @@ ADDONS_COORDINATOR: HassKey[HassioAddOnDataUpdateCoordinator] = HassKey(
|
||||
STATS_COORDINATOR: HassKey[HassioStatsDataUpdateCoordinator] = HassKey(
|
||||
"hassio_stats_coordinator"
|
||||
)
|
||||
JOBS_COORDINATOR: HassKey[SupervisorJobsCoordinator] = HassKey(
|
||||
"hassio_jobs_coordinator"
|
||||
)
|
||||
|
||||
|
||||
DATA_COMPONENT: HassKey[HassIO] = HassKey(DOMAIN)
|
||||
@@ -126,6 +130,7 @@ DATA_ADDONS_LIST: HassKey[list[InstalledAddon]] = HassKey("hassio_addons_list")
|
||||
HASSIO_MAIN_UPDATE_INTERVAL = timedelta(minutes=5)
|
||||
HASSIO_ADDON_UPDATE_INTERVAL = timedelta(minutes=15)
|
||||
HASSIO_STATS_UPDATE_INTERVAL = timedelta(seconds=60)
|
||||
SUPERVISOR_JOBS_UPDATE_INTERVAL = timedelta(minutes=15)
|
||||
|
||||
ATTR_AUTO_UPDATE = "auto_update"
|
||||
ATTR_VERSION = "version"
|
||||
|
||||
@@ -2,10 +2,11 @@
|
||||
|
||||
import asyncio
|
||||
from collections import defaultdict
|
||||
from collections.abc import Awaitable
|
||||
from dataclasses import dataclass
|
||||
from collections.abc import Awaitable, Callable
|
||||
from dataclasses import dataclass, replace
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Any, cast, override
|
||||
from uuid import UUID
|
||||
|
||||
from aiohasupervisor import SupervisorError, SupervisorNotFoundError
|
||||
from aiohasupervisor.models import (
|
||||
@@ -17,6 +18,7 @@ from aiohasupervisor.models import (
|
||||
HostInfo,
|
||||
InstalledAddon,
|
||||
InstalledAddonComplete,
|
||||
Job,
|
||||
NetworkInfo,
|
||||
NFSMountResponse,
|
||||
OSInfo,
|
||||
@@ -29,7 +31,12 @@ from aiohasupervisor.models import (
|
||||
|
||||
from homeassistant.config_entries import ConfigEntry
|
||||
from homeassistant.const import ATTR_MANUFACTURER
|
||||
from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback
|
||||
from homeassistant.core import (
|
||||
CALLBACK_TYPE,
|
||||
HomeAssistant,
|
||||
callback,
|
||||
is_callback_check_partial,
|
||||
)
|
||||
from homeassistant.helpers import device_registry as dr
|
||||
from homeassistant.helpers.debounce import Debouncer
|
||||
from homeassistant.helpers.device_registry import DeviceInfo
|
||||
@@ -59,6 +66,7 @@ from .const import (
|
||||
DATA_SUPERVISOR_INFO,
|
||||
DATA_SUPERVISOR_STATS,
|
||||
DOMAIN,
|
||||
EVENT_JOB,
|
||||
EVENT_SUPERVISOR_EVENT,
|
||||
EVENT_SUPERVISOR_UPDATE,
|
||||
HASSIO_ADDON_UPDATE_INTERVAL,
|
||||
@@ -67,12 +75,12 @@ from .const import (
|
||||
REQUEST_REFRESH_DELAY,
|
||||
STARTUP_COMPLETE,
|
||||
SUPERVISOR_CONTAINER,
|
||||
SUPERVISOR_JOBS_UPDATE_INTERVAL,
|
||||
UPDATE_KEY_SUPERVISOR,
|
||||
SupervisorEntityModel,
|
||||
)
|
||||
from .exceptions import HassioNotReadyError
|
||||
from .handler import get_supervisor_client
|
||||
from .jobs import SupervisorJobs
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .issues import SupervisorIssues
|
||||
@@ -80,6 +88,196 @@ if TYPE_CHECKING:
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass(slots=True, frozen=True)
|
||||
class JobSubscription:
|
||||
"""Subscribe for updates on jobs which match filters.
|
||||
|
||||
UUID is preferred match but only available in cases of a background API that
|
||||
returns the UUID before taking the action. Others are used to match jobs only
|
||||
if UUID is omitted. Either name or UUID is required to be able to match.
|
||||
|
||||
event_callback must be safe annotated as a homeassistant.core.callback
|
||||
and safe to call in the event loop.
|
||||
"""
|
||||
|
||||
event_callback: Callable[[Job], None]
|
||||
uuid: str | None = None
|
||||
name: str | None = None
|
||||
reference: str | None = None
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
"""Validate at least one filter option is present."""
|
||||
if not self.name and not self.uuid:
|
||||
raise ValueError("Either name or uuid must be provided!")
|
||||
if not is_callback_check_partial(self.event_callback):
|
||||
raise ValueError("event_callback must be a homeassistant.core.callback!")
|
||||
|
||||
def matches(self, job: Job) -> bool:
|
||||
"""Return true if job matches subscription filters."""
|
||||
if self.uuid:
|
||||
return job.uuid == self.uuid
|
||||
return job.name == self.name and self.reference in (None, job.reference)
|
||||
|
||||
|
||||
class SupervisorJobsCoordinator(DataUpdateCoordinator[dict[UUID, Job]]):
|
||||
"""Manage access to Supervisor jobs."""
|
||||
|
||||
config_entry: ConfigEntry
|
||||
|
||||
def __init__(self, hass: HomeAssistant, config_entry: ConfigEntry) -> None:
|
||||
"""Initialize object."""
|
||||
super().__init__(
|
||||
hass,
|
||||
_LOGGER,
|
||||
config_entry=config_entry,
|
||||
name="SupervisorJobsCoordinator",
|
||||
update_interval=SUPERVISOR_JOBS_UPDATE_INTERVAL,
|
||||
# We don't want an immediate refresh since we want to avoid
|
||||
# hammering the Supervisor API on startup
|
||||
request_refresh_debouncer=Debouncer(
|
||||
hass, _LOGGER, cooldown=REQUEST_REFRESH_DELAY, immediate=False
|
||||
),
|
||||
)
|
||||
self._supervisor_client = get_supervisor_client(hass)
|
||||
self._subscriptions: set[JobSubscription] = set()
|
||||
self._dispatcher_disconnect: Callable[[], None] | None = None
|
||||
self._noop_listener_disconnect: Callable[[], None] | None = None
|
||||
|
||||
@property
|
||||
def current_jobs(self) -> list[Job]:
|
||||
"""Return current jobs."""
|
||||
return list(self.data.values()) if self.data is not None else []
|
||||
|
||||
@staticmethod
|
||||
def _build_jobs(jobs: list[Job]) -> dict[UUID, Job]:
|
||||
"""Flatten jobs and child jobs into a UUID keyed cache."""
|
||||
job_queue: list[Job] = jobs.copy()
|
||||
cached_jobs: dict[UUID, Job] = {}
|
||||
|
||||
while job_queue:
|
||||
job = job_queue.pop(0)
|
||||
job_queue.extend(job.child_jobs)
|
||||
cached_jobs[job.uuid] = replace(job, child_jobs=[])
|
||||
|
||||
return cached_jobs
|
||||
|
||||
async def _async_update_data(self) -> dict[UUID, Job]:
|
||||
"""Fetch data from Supervisor."""
|
||||
job_data = await self._supervisor_client.jobs.info()
|
||||
return self._build_jobs(job_data.jobs)
|
||||
|
||||
def _process_job_change(self, job: Job) -> None:
|
||||
"""Process a job change by triggering callbacks on subscribers."""
|
||||
for sub in self._subscriptions:
|
||||
if sub.matches(job):
|
||||
sub.event_callback(job)
|
||||
|
||||
def _process_job_deltas(
|
||||
self,
|
||||
previous_jobs: dict[UUID, Job],
|
||||
current_jobs: dict[UUID, Job],
|
||||
) -> None:
|
||||
"""Notify subscribers about changes between two job caches."""
|
||||
for job in current_jobs.values():
|
||||
if (previous_job := previous_jobs.get(job.uuid)) is not None and (
|
||||
previous_job == job
|
||||
):
|
||||
continue
|
||||
self._process_job_change(job)
|
||||
|
||||
for uuid, job in previous_jobs.items():
|
||||
if uuid not in current_jobs and job.done is False:
|
||||
self._process_job_change(replace(job, done=True))
|
||||
|
||||
def subscribe(self, subscription: JobSubscription) -> CALLBACK_TYPE:
|
||||
"""Subscribe to updates for job. Return callback is used to unsubscribe.
|
||||
|
||||
If any jobs match the subscription at the time this is called, runs the
|
||||
callback on them.
|
||||
"""
|
||||
self._subscriptions.add(subscription)
|
||||
|
||||
# Connect a stub listener to start the update interval polling on first subscriber
|
||||
if self._noop_listener_disconnect is None:
|
||||
self._noop_listener_disconnect = self.async_add_listener(lambda: None)
|
||||
|
||||
# Run the callback on each existing match
|
||||
# We catch all errors to prevent an error in one from stopping the others
|
||||
for match in [job for job in self.current_jobs if subscription.matches(job)]:
|
||||
try:
|
||||
subscription.event_callback(match)
|
||||
except Exception as err: # noqa: BLE001
|
||||
_LOGGER.error(
|
||||
"Error encountered processing Supervisor Job (%s %s %s) - %s",
|
||||
match.name,
|
||||
match.reference,
|
||||
match.uuid,
|
||||
err,
|
||||
)
|
||||
|
||||
def _unsubscribe() -> None:
|
||||
self._subscriptions.discard(subscription)
|
||||
|
||||
# Stop polling if there are no more subscribers
|
||||
if not self._subscriptions and self._noop_listener_disconnect is not None:
|
||||
self._noop_listener_disconnect()
|
||||
self._noop_listener_disconnect = None
|
||||
|
||||
return _unsubscribe
|
||||
|
||||
@callback
|
||||
def _async_refresh_finished(self) -> None:
|
||||
"""Register to receive Supervisor events after the first successful refresh."""
|
||||
if self.last_update_success and self._dispatcher_disconnect is None:
|
||||
self._dispatcher_disconnect = async_dispatcher_connect(
|
||||
self.hass, EVENT_SUPERVISOR_EVENT, self._supervisor_events_to_jobs
|
||||
)
|
||||
|
||||
async def _async_refresh(
|
||||
self,
|
||||
log_failures: bool = True,
|
||||
raise_on_auth_failed: bool = False,
|
||||
scheduled: bool = False,
|
||||
raise_on_entry_error: bool = False,
|
||||
) -> None:
|
||||
"""Refresh data and notify subscribers about cache changes."""
|
||||
previous_jobs = self.data or {}
|
||||
await super()._async_refresh(
|
||||
log_failures, raise_on_auth_failed, scheduled, raise_on_entry_error
|
||||
)
|
||||
if self.last_update_success and self.data is not None:
|
||||
self._process_job_deltas(previous_jobs, self.data)
|
||||
|
||||
async def async_shutdown(self) -> None:
|
||||
"""Shut down the coordinator."""
|
||||
await super().async_shutdown()
|
||||
if self._dispatcher_disconnect:
|
||||
self._dispatcher_disconnect()
|
||||
self._dispatcher_disconnect = None
|
||||
|
||||
@callback
|
||||
def _supervisor_events_to_jobs(self, event: dict[str, Any]) -> None:
|
||||
"""Update job data cache from supervisor events."""
|
||||
if ATTR_WS_EVENT not in event:
|
||||
return
|
||||
|
||||
if (
|
||||
event[ATTR_WS_EVENT] == EVENT_SUPERVISOR_UPDATE
|
||||
and event.get(ATTR_UPDATE_KEY) == UPDATE_KEY_SUPERVISOR
|
||||
and event.get(ATTR_DATA, {}).get(ATTR_STARTUP) == STARTUP_COMPLETE
|
||||
):
|
||||
self.config_entry.async_create_task(self.hass, self.async_request_refresh())
|
||||
|
||||
elif event[ATTR_WS_EVENT] == EVENT_JOB:
|
||||
job = Job.from_dict(event[ATTR_DATA] | {"child_jobs": []})
|
||||
previous_jobs = self.data or {}
|
||||
updated_jobs = {**previous_jobs, job.uuid: job}
|
||||
if job.done:
|
||||
updated_jobs.pop(job.uuid, None)
|
||||
self.async_set_updated_data(updated_jobs)
|
||||
self._process_job_change(job)
|
||||
|
||||
|
||||
@dataclass
|
||||
class HassioMainData:
|
||||
"""Data class for HassioMainDataUpdateCoordinator."""
|
||||
@@ -591,7 +789,6 @@ class HassioAddOnDataUpdateCoordinator(DataUpdateCoordinator[HassioAddonData]):
|
||||
hass: HomeAssistant,
|
||||
config_entry: ConfigEntry,
|
||||
dev_reg: dr.DeviceRegistry,
|
||||
jobs: SupervisorJobs,
|
||||
) -> None:
|
||||
"""Initialize coordinator."""
|
||||
super().__init__(
|
||||
@@ -610,7 +807,6 @@ class HassioAddOnDataUpdateCoordinator(DataUpdateCoordinator[HassioAddonData]):
|
||||
self.dev_reg = dev_reg
|
||||
self._addon_info_subscriptions: defaultdict[str, set[str]] = defaultdict(set)
|
||||
self.supervisor_client = get_supervisor_client(hass)
|
||||
self.jobs = jobs
|
||||
|
||||
@override
|
||||
async def _async_update_data(self) -> HassioAddonData:
|
||||
@@ -800,7 +996,6 @@ class HassioMainDataUpdateCoordinator(DataUpdateCoordinator[HassioMainData]):
|
||||
self.dev_reg = dev_reg
|
||||
self.is_hass_os = False
|
||||
self.supervisor_client = get_supervisor_client(hass)
|
||||
self.jobs = SupervisorJobs(hass)
|
||||
self._dispatcher_disconnect = async_dispatcher_connect(
|
||||
hass, EVENT_SUPERVISOR_EVENT, self._supervisor_event
|
||||
)
|
||||
@@ -854,7 +1049,6 @@ class HassioMainDataUpdateCoordinator(DataUpdateCoordinator[HassioMainData]):
|
||||
),
|
||||
)
|
||||
mounts_info = await client.mounts.info()
|
||||
await self.jobs.refresh_data(is_first_update)
|
||||
except SupervisorError as err:
|
||||
raise UpdateFailed(f"Error on Supervisor API: {err}") from err
|
||||
|
||||
@@ -951,4 +1145,3 @@ class HassioMainDataUpdateCoordinator(DataUpdateCoordinator[HassioMainData]):
|
||||
"""Shut down and clean up when config entry unloaded."""
|
||||
await super().async_shutdown()
|
||||
self._dispatcher_disconnect()
|
||||
self.jobs.unload()
|
||||
|
||||
@@ -1,179 +0,0 @@
|
||||
"""Track Supervisor job data and allow subscription to updates."""
|
||||
|
||||
from collections.abc import Callable
|
||||
from dataclasses import dataclass, replace
|
||||
from functools import partial
|
||||
import logging
|
||||
from typing import Any
|
||||
from uuid import UUID
|
||||
|
||||
from aiohasupervisor.models import Job
|
||||
|
||||
from homeassistant.core import (
|
||||
CALLBACK_TYPE,
|
||||
HomeAssistant,
|
||||
callback,
|
||||
is_callback_check_partial,
|
||||
)
|
||||
from homeassistant.helpers.dispatcher import async_dispatcher_connect
|
||||
|
||||
from .const import (
|
||||
ATTR_DATA,
|
||||
ATTR_STARTUP,
|
||||
ATTR_UPDATE_KEY,
|
||||
ATTR_WS_EVENT,
|
||||
EVENT_JOB,
|
||||
EVENT_SUPERVISOR_EVENT,
|
||||
EVENT_SUPERVISOR_UPDATE,
|
||||
STARTUP_COMPLETE,
|
||||
UPDATE_KEY_SUPERVISOR,
|
||||
)
|
||||
from .handler import get_supervisor_client
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass(slots=True, frozen=True)
|
||||
class JobSubscription:
|
||||
"""Subscribe for updates on jobs which match filters.
|
||||
|
||||
UUID is preferred match but only available in cases of a background API that
|
||||
returns the UUID before taking the action. Others are used to match jobs only
|
||||
if UUID is omitted. Either name or UUID is required to be able to match.
|
||||
|
||||
event_callback must be safe annotated as a homeassistant.core.callback
|
||||
and safe to call in the event loop.
|
||||
"""
|
||||
|
||||
event_callback: Callable[[Job], Any]
|
||||
uuid: str | None = None
|
||||
name: str | None = None
|
||||
reference: str | None = None
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
"""Validate at least one filter option is present."""
|
||||
if not self.name and not self.uuid:
|
||||
raise ValueError("Either name or uuid must be provided!")
|
||||
if not is_callback_check_partial(self.event_callback):
|
||||
raise ValueError("event_callback must be a homeassistant.core.callback!")
|
||||
|
||||
def matches(self, job: Job) -> bool:
|
||||
"""Return true if job matches subscription filters."""
|
||||
if self.uuid:
|
||||
return job.uuid == self.uuid
|
||||
return job.name == self.name and self.reference in (None, job.reference)
|
||||
|
||||
|
||||
class SupervisorJobs:
|
||||
"""Manage access to Supervisor jobs."""
|
||||
|
||||
def __init__(self, hass: HomeAssistant) -> None:
|
||||
"""Initialize object."""
|
||||
self._hass = hass
|
||||
self._supervisor_client = get_supervisor_client(hass)
|
||||
self._jobs: dict[UUID, Job] = {}
|
||||
self._subscriptions: set[JobSubscription] = set()
|
||||
self._dispatcher_disconnect: Callable[[], None] | None = None
|
||||
|
||||
@property
|
||||
def current_jobs(self) -> list[Job]:
|
||||
"""Return current jobs."""
|
||||
return list(self._jobs.values())
|
||||
|
||||
def subscribe(self, subscription: JobSubscription) -> CALLBACK_TYPE:
|
||||
"""Subscribe to updates for job. Return callback is used to unsubscribe.
|
||||
|
||||
If any jobs match the subscription at the time this is called, runs the
|
||||
callback on them.
|
||||
"""
|
||||
self._subscriptions.add(subscription)
|
||||
|
||||
# Run the callback on each existing match
|
||||
# We catch all errors to prevent an error in one from stopping the others
|
||||
for match in [job for job in self._jobs.values() if subscription.matches(job)]:
|
||||
try:
|
||||
return subscription.event_callback(match)
|
||||
except Exception as err: # noqa: BLE001
|
||||
_LOGGER.error(
|
||||
"Error encountered processing Supervisor Job (%s %s %s) - %s",
|
||||
match.name,
|
||||
match.reference,
|
||||
match.uuid,
|
||||
err,
|
||||
)
|
||||
|
||||
return partial(self._subscriptions.discard, subscription)
|
||||
|
||||
async def refresh_data(self, first_update: bool = False) -> None:
|
||||
"""Refresh job data."""
|
||||
job_data = await self._supervisor_client.jobs.info()
|
||||
job_queue: list[Job] = job_data.jobs.copy()
|
||||
new_jobs: dict[UUID, Job] = {}
|
||||
changed_jobs: list[Job] = []
|
||||
|
||||
# Rebuild our job cache from new info and compare to find changes
|
||||
while job_queue:
|
||||
job = job_queue.pop(0)
|
||||
job_queue.extend(job.child_jobs)
|
||||
job = replace(job, child_jobs=[])
|
||||
|
||||
if job.uuid not in self._jobs or job != self._jobs[job.uuid]:
|
||||
changed_jobs.append(job)
|
||||
new_jobs[job.uuid] = replace(job, child_jobs=[])
|
||||
|
||||
# For any jobs that disappeared which weren't done, tell subscribers they
|
||||
# changed to done. We don't know what else happened to them so leave the
|
||||
# rest of their state as is rather then guessing
|
||||
changed_jobs.extend(
|
||||
[
|
||||
replace(job, done=True)
|
||||
for uuid, job in self._jobs.items()
|
||||
if uuid not in new_jobs and job.done is False
|
||||
]
|
||||
)
|
||||
|
||||
# Replace our cache and inform subscribers of all changes
|
||||
self._jobs = new_jobs
|
||||
for job in changed_jobs:
|
||||
self._process_job_change(job)
|
||||
|
||||
# If this is the first update register to receive Supervisor events
|
||||
if first_update:
|
||||
self._dispatcher_disconnect = async_dispatcher_connect(
|
||||
self._hass, EVENT_SUPERVISOR_EVENT, self._supervisor_events_to_jobs
|
||||
)
|
||||
|
||||
@callback
|
||||
def _supervisor_events_to_jobs(self, event: dict[str, Any]) -> None:
|
||||
"""Update job data cache from supervisor events."""
|
||||
if ATTR_WS_EVENT not in event:
|
||||
return
|
||||
|
||||
if (
|
||||
event[ATTR_WS_EVENT] == EVENT_SUPERVISOR_UPDATE
|
||||
and event.get(ATTR_UPDATE_KEY) == UPDATE_KEY_SUPERVISOR
|
||||
and event.get(ATTR_DATA, {}).get(ATTR_STARTUP) == STARTUP_COMPLETE
|
||||
):
|
||||
self._hass.async_create_task(self.refresh_data())
|
||||
|
||||
elif event[ATTR_WS_EVENT] == EVENT_JOB:
|
||||
job = Job.from_dict(event[ATTR_DATA] | {"child_jobs": []})
|
||||
self._jobs[job.uuid] = job
|
||||
self._process_job_change(job)
|
||||
|
||||
def _process_job_change(self, job: Job) -> None:
|
||||
"""Process a job change by triggering callbacks on subscribers."""
|
||||
for sub in self._subscriptions:
|
||||
if sub.matches(job):
|
||||
sub.event_callback(job)
|
||||
|
||||
# If the job is done, pop it from our cache if present after processing is done
|
||||
if job.done and job.uuid in self._jobs:
|
||||
del self._jobs[job.uuid]
|
||||
|
||||
@callback
|
||||
def unload(self) -> None:
|
||||
"""Unregister with dispatcher on config entry unload."""
|
||||
if self._dispatcher_disconnect:
|
||||
self._dispatcher_disconnect()
|
||||
self._dispatcher_disconnect = None
|
||||
@@ -17,15 +17,19 @@ from homeassistant.core import HomeAssistant, callback
|
||||
from homeassistant.exceptions import HomeAssistantError
|
||||
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
|
||||
|
||||
from .const import ADDONS_COORDINATOR, ATTR_VERSION_LATEST, MAIN_COORDINATOR
|
||||
from .coordinator import AddonData
|
||||
from .const import (
|
||||
ADDONS_COORDINATOR,
|
||||
ATTR_VERSION_LATEST,
|
||||
JOBS_COORDINATOR,
|
||||
MAIN_COORDINATOR,
|
||||
)
|
||||
from .coordinator import AddonData, JobSubscription
|
||||
from .entity import (
|
||||
HassioAddonEntity,
|
||||
HassioCoreEntity,
|
||||
HassioOSEntity,
|
||||
HassioSupervisorEntity,
|
||||
)
|
||||
from .jobs import JobSubscription
|
||||
from .update_helper import update_addon, update_core, update_os
|
||||
|
||||
ENTITY_DESCRIPTION = UpdateEntityDescription(
|
||||
@@ -216,7 +220,7 @@ class SupervisorAddonUpdateEntity(HassioAddonEntity, UpdateEntity):
|
||||
"""Subscribe to progress updates."""
|
||||
await super().async_added_to_hass()
|
||||
self.async_on_remove(
|
||||
self.coordinator.jobs.subscribe(
|
||||
self.hass.data[JOBS_COORDINATOR].subscribe(
|
||||
JobSubscription(
|
||||
self._update_job_changed,
|
||||
name="addon_manager_update",
|
||||
@@ -388,7 +392,7 @@ class SupervisorSupervisorUpdateEntity(HassioSupervisorEntity, UpdateEntity):
|
||||
"""Subscribe to progress updates."""
|
||||
await super().async_added_to_hass()
|
||||
self.async_on_remove(
|
||||
self.coordinator.jobs.subscribe(
|
||||
self.hass.data[JOBS_COORDINATOR].subscribe(
|
||||
JobSubscription(self._update_job_changed, name="supervisor_update")
|
||||
)
|
||||
)
|
||||
@@ -458,7 +462,7 @@ class SupervisorCoreUpdateEntity(HassioCoreEntity, UpdateEntity):
|
||||
"""Subscribe to progress updates."""
|
||||
await super().async_added_to_hass()
|
||||
self.async_on_remove(
|
||||
self.coordinator.jobs.subscribe(
|
||||
self.hass.data[JOBS_COORDINATOR].subscribe(
|
||||
JobSubscription(
|
||||
self._update_job_changed, name="home_assistant_core_update"
|
||||
)
|
||||
|
||||
@@ -899,10 +899,6 @@ def supervisor_client() -> Generator[AsyncMock]:
|
||||
"homeassistant.components.hassio.issues.get_supervisor_client",
|
||||
return_value=supervisor_client,
|
||||
),
|
||||
patch(
|
||||
"homeassistant.components.hassio.jobs.get_supervisor_client",
|
||||
return_value=supervisor_client,
|
||||
),
|
||||
patch(
|
||||
"homeassistant.components.hassio.repairs.get_supervisor_client",
|
||||
return_value=supervisor_client,
|
||||
|
||||
@@ -9,14 +9,25 @@ from uuid import uuid4
|
||||
from aiohasupervisor.models import Job, JobsInfo
|
||||
import pytest
|
||||
|
||||
from homeassistant.components.hassio.const import DOMAIN, MAIN_COORDINATOR
|
||||
from homeassistant.components.hassio.coordinator import HassioMainDataUpdateCoordinator
|
||||
from homeassistant.components.hassio.jobs import JobSubscription
|
||||
from homeassistant.components.hassio.const import (
|
||||
DOMAIN,
|
||||
JOBS_COORDINATOR,
|
||||
MAIN_COORDINATOR,
|
||||
REQUEST_REFRESH_DELAY,
|
||||
SUPERVISOR_JOBS_UPDATE_INTERVAL,
|
||||
)
|
||||
from homeassistant.components.hassio.coordinator import (
|
||||
HassioMainDataUpdateCoordinator,
|
||||
JobSubscription,
|
||||
SupervisorJobsCoordinator,
|
||||
)
|
||||
from homeassistant.core import HomeAssistant, callback
|
||||
from homeassistant.setup import async_setup_component
|
||||
from homeassistant.util import dt as dt_util
|
||||
|
||||
from .test_init import MOCK_ENVIRON
|
||||
|
||||
from tests.common import async_fire_time_changed
|
||||
from tests.typing import WebSocketGenerator
|
||||
|
||||
|
||||
@@ -65,10 +76,10 @@ async def test_job_manager_setup(hass: HomeAssistant, jobs_info: AsyncMock) -> N
|
||||
assert result
|
||||
jobs_info.assert_called_once()
|
||||
|
||||
data_coordinator: HassioMainDataUpdateCoordinator = hass.data[MAIN_COORDINATOR]
|
||||
assert len(data_coordinator.jobs.current_jobs) == 2
|
||||
assert data_coordinator.jobs.current_jobs[0].name == "test_job"
|
||||
assert data_coordinator.jobs.current_jobs[1].name == "test_inner_job"
|
||||
jobs_coordinator: SupervisorJobsCoordinator = hass.data[JOBS_COORDINATOR]
|
||||
assert len(jobs_coordinator.current_jobs) == 2
|
||||
assert jobs_coordinator.current_jobs[0].name == "test_job"
|
||||
assert jobs_coordinator.current_jobs[1].name == "test_inner_job"
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("all_setup_requests")
|
||||
@@ -100,8 +111,8 @@ async def test_job_manager_ws_updates(
|
||||
|
||||
jobs_info.reset_mock()
|
||||
client = await hass_supervisor_ws_client()
|
||||
data_coordinator: HassioMainDataUpdateCoordinator = hass.data[MAIN_COORDINATOR]
|
||||
assert not data_coordinator.jobs.current_jobs
|
||||
jobs_coordinator: SupervisorJobsCoordinator = hass.data[JOBS_COORDINATOR]
|
||||
assert not jobs_coordinator.current_jobs
|
||||
|
||||
# Make an example listener
|
||||
job_data: Job | None = None
|
||||
@@ -114,7 +125,7 @@ async def test_job_manager_ws_updates(
|
||||
subscription = JobSubscription(
|
||||
mock_subcription_callback, name="test_job", reference="test"
|
||||
)
|
||||
unsubscribe = data_coordinator.jobs.subscribe(subscription)
|
||||
unsubscribe = jobs_coordinator.subscribe(subscription)
|
||||
|
||||
# Send start of job update
|
||||
await client.send_json(
|
||||
@@ -146,7 +157,7 @@ async def test_job_manager_ws_updates(
|
||||
assert job_data.progress == 0
|
||||
assert job_data.done is False
|
||||
# One job in the cache
|
||||
assert len(data_coordinator.jobs.current_jobs) == 1
|
||||
assert len(jobs_coordinator.current_jobs) == 1
|
||||
|
||||
# Example progress update
|
||||
await client.send_json(
|
||||
@@ -178,7 +189,7 @@ async def test_job_manager_ws_updates(
|
||||
assert job_data.progress == 50
|
||||
assert job_data.done is False
|
||||
# Same job, same number of jobs in cache
|
||||
assert len(data_coordinator.jobs.current_jobs) == 1
|
||||
assert len(jobs_coordinator.current_jobs) == 1
|
||||
|
||||
# Unrelated job update - name change, subscriber should not receive
|
||||
await client.send_json(
|
||||
@@ -208,7 +219,7 @@ async def test_job_manager_ws_updates(
|
||||
assert job_data.name == "test_job"
|
||||
assert job_data.reference == "test"
|
||||
# New job, cache increases
|
||||
assert len(data_coordinator.jobs.current_jobs) == 2
|
||||
assert len(jobs_coordinator.current_jobs) == 2
|
||||
|
||||
# Unrelated job update - reference change, subscriber should not receive
|
||||
await client.send_json(
|
||||
@@ -238,7 +249,7 @@ async def test_job_manager_ws_updates(
|
||||
assert job_data.name == "test_job"
|
||||
assert job_data.reference == "test"
|
||||
# New job, cache increases
|
||||
assert len(data_coordinator.jobs.current_jobs) == 3
|
||||
assert len(jobs_coordinator.current_jobs) == 3
|
||||
|
||||
# Unsubscribe mock listener, should not receive final update
|
||||
unsubscribe()
|
||||
@@ -271,7 +282,7 @@ async def test_job_manager_ws_updates(
|
||||
assert job_data.progress == 50
|
||||
assert job_data.done is False
|
||||
# Job ended, cache decreases
|
||||
assert len(data_coordinator.jobs.current_jobs) == 2
|
||||
assert len(jobs_coordinator.current_jobs) == 2
|
||||
|
||||
# REST API should not be used during this sequence
|
||||
jobs_info.assert_not_called()
|
||||
@@ -306,9 +317,9 @@ async def test_job_manager_reload_on_supervisor_restart(
|
||||
assert result
|
||||
jobs_info.assert_called_once()
|
||||
|
||||
data_coordinator: HassioMainDataUpdateCoordinator = hass.data[MAIN_COORDINATOR]
|
||||
assert len(data_coordinator.jobs.current_jobs) == 1
|
||||
assert data_coordinator.jobs.current_jobs[0].name == "test_job"
|
||||
jobs_coordinator: SupervisorJobsCoordinator = hass.data[JOBS_COORDINATOR]
|
||||
assert len(jobs_coordinator.current_jobs) == 1
|
||||
assert jobs_coordinator.current_jobs[0].name == "test_job"
|
||||
|
||||
jobs_info.reset_mock()
|
||||
jobs_info.return_value = JobsInfo(ignore_conditions=[], jobs=[])
|
||||
@@ -323,7 +334,7 @@ async def test_job_manager_reload_on_supervisor_restart(
|
||||
job_data = job
|
||||
|
||||
subscription = JobSubscription(mock_subcription_callback, name="test_job")
|
||||
data_coordinator.jobs.subscribe(subscription)
|
||||
jobs_coordinator.subscribe(subscription)
|
||||
|
||||
# Send supervisor restart signal
|
||||
await client.send_json(
|
||||
@@ -341,9 +352,67 @@ async def test_job_manager_reload_on_supervisor_restart(
|
||||
assert msg["success"]
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# Advance time past the debouncer cooldown for the refresh to complete
|
||||
async_fire_time_changed(
|
||||
hass, dt_util.utcnow() + dt_util.dt.timedelta(seconds=REQUEST_REFRESH_DELAY + 1)
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# Listener should be told job is done and cache cleared out
|
||||
jobs_info.assert_called_once()
|
||||
assert job_data.name == "test_job"
|
||||
assert job_data.reference == "test"
|
||||
assert job_data.done is True
|
||||
assert not data_coordinator.jobs.current_jobs
|
||||
assert not jobs_coordinator.current_jobs
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("all_setup_requests")
|
||||
async def test_job_manager_periodic_refresh(
|
||||
hass: HomeAssistant, jobs_info: AsyncMock
|
||||
) -> None:
|
||||
"""Test job manager performs periodic refresh as backstop for dropped WS events."""
|
||||
jobs_info.return_value = JobsInfo(
|
||||
ignore_conditions=[],
|
||||
jobs=[
|
||||
Job(
|
||||
name="test_job",
|
||||
reference="test",
|
||||
uuid=uuid4(),
|
||||
progress=0,
|
||||
stage=None,
|
||||
done=False,
|
||||
errors=[],
|
||||
created=datetime.now(), # pylint: disable=home-assistant-enforce-naive-now
|
||||
extra=None,
|
||||
child_jobs=[],
|
||||
)
|
||||
],
|
||||
)
|
||||
|
||||
result = await async_setup_component(hass, DOMAIN, {})
|
||||
assert result
|
||||
jobs_info.assert_called_once()
|
||||
|
||||
jobs_coordinator: SupervisorJobsCoordinator = hass.data[JOBS_COORDINATOR]
|
||||
assert len(jobs_coordinator.current_jobs) == 1
|
||||
|
||||
# Subscribe to job updates
|
||||
job_data: Job | None = None
|
||||
|
||||
@callback
|
||||
def mock_subscription_callback(job: Job) -> None:
|
||||
nonlocal job_data
|
||||
job_data = job
|
||||
|
||||
subscription = JobSubscription(mock_subscription_callback, name="test_job")
|
||||
jobs_coordinator.subscribe(subscription)
|
||||
|
||||
# Reset mock to verify periodic refresh
|
||||
jobs_info.reset_mock()
|
||||
|
||||
# Advance time past the SUPERVISOR_JOBS_UPDATE_INTERVAL to trigger periodic refresh
|
||||
async_fire_time_changed(hass, dt_util.utcnow() + SUPERVISOR_JOBS_UPDATE_INTERVAL)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# Periodic refresh should have called jobs_info
|
||||
jobs_info.assert_called()
|
||||
|
||||
Reference in New Issue
Block a user