Compare commits

...

5 Commits

Author SHA1 Message Date
Mike Degatano be14ef7158 Fix bad return and add test for polling 2026-06-22 19:18:29 +00:00
Mike Degatano b6166ac675 Add a stub listener to start the polling interval when at least one subscriber 2026-06-22 19:18:23 +00:00
Mike Degatano 1e41509c0e Create task on config entry and add back regular refreshes of SupervisorJobs to match previous behavior 2026-06-22 19:18:17 +00:00
Mike Degatano 53ed4c42bd Clean up from review and separate jobs coordinator to be top level like others 2026-06-22 19:18:11 +00:00
Mike Degatano 21f7836117 Refactor SupervisorJobs into a DataUpdateCoordinator
SupervisorJobs previously maintained its own job cache and lifecycle
using a manual dispatcher subscription and a refresh_data() method
called directly by HassioMainDataUpdateCoordinator.

Refactor it to be a proper DataUpdateCoordinator[dict[UUID, Job]]
alongside the other hassio coordinators:

- _async_update_data() fetches and flattens the job cache from Supervisor
- _async_refresh_finished() registers the dispatcher listener after
  the first successful refresh (replacing the first_update flag)
- _async_refresh() is overridden to compute deltas and notify
  job subscribers whenever the polled cache changes
- WS job events call async_set_updated_data() to update the cache
  in-place; supervisor restart events call async_request_refresh()
- async_shutdown() disconnects the dispatcher; the coordinator
  shutdown itself is handled via config_entry.async_on_unload

JobSubscription and SupervisorJobs are moved from jobs.py into
coordinator.py (per the home-assistant-enforce-class-module rule)
and jobs.py is deleted. All callers updated accordingly.
2026-06-22 19:18:05 +00:00
7 changed files with 314 additions and 221 deletions
+8 -3
View File
@@ -60,6 +60,7 @@ from .const import (
DATA_HASSIO_SUPERVISOR_USER,
DATA_KEY_SUPERVISOR_ISSUES,
DOMAIN,
JOBS_COORDINATOR,
MAIN_COORDINATOR,
STATS_COORDINATOR,
)
@@ -67,6 +68,7 @@ from .coordinator import (
HassioAddOnDataUpdateCoordinator,
HassioMainDataUpdateCoordinator,
HassioStatsDataUpdateCoordinator,
SupervisorJobsCoordinator,
get_addons_info,
get_addons_list,
get_addons_stats,
@@ -326,9 +328,11 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
await coordinator.async_config_entry_first_refresh()
hass.data[MAIN_COORDINATOR] = coordinator
addon_coordinator = HassioAddOnDataUpdateCoordinator(
hass, entry, dev_reg, coordinator.jobs
)
jobs_coordinator = SupervisorJobsCoordinator(hass, entry)
await jobs_coordinator.async_config_entry_first_refresh()
hass.data[JOBS_COORDINATOR] = jobs_coordinator
addon_coordinator = HassioAddOnDataUpdateCoordinator(hass, entry, dev_reg)
await addon_coordinator.async_config_entry_first_refresh()
hass.data[ADDONS_COORDINATOR] = addon_coordinator
@@ -437,5 +441,6 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
hass.data.pop(MAIN_COORDINATOR, None)
hass.data.pop(ADDONS_COORDINATOR, None)
hass.data.pop(STATS_COORDINATOR, None)
hass.data.pop(JOBS_COORDINATOR, None)
return unload_ok
+5
View File
@@ -27,6 +27,7 @@ if TYPE_CHECKING:
HassioAddOnDataUpdateCoordinator,
HassioMainDataUpdateCoordinator,
HassioStatsDataUpdateCoordinator,
SupervisorJobsCoordinator,
)
from .handler import HassIO
from .issues import SupervisorIssues
@@ -103,6 +104,9 @@ ADDONS_COORDINATOR: HassKey[HassioAddOnDataUpdateCoordinator] = HassKey(
STATS_COORDINATOR: HassKey[HassioStatsDataUpdateCoordinator] = HassKey(
"hassio_stats_coordinator"
)
JOBS_COORDINATOR: HassKey[SupervisorJobsCoordinator] = HassKey(
"hassio_jobs_coordinator"
)
DATA_COMPONENT: HassKey[HassIO] = HassKey(DOMAIN)
@@ -126,6 +130,7 @@ DATA_ADDONS_LIST: HassKey[list[InstalledAddon]] = HassKey("hassio_addons_list")
HASSIO_MAIN_UPDATE_INTERVAL = timedelta(minutes=5)
HASSIO_ADDON_UPDATE_INTERVAL = timedelta(minutes=15)
HASSIO_STATS_UPDATE_INTERVAL = timedelta(seconds=60)
SUPERVISOR_JOBS_UPDATE_INTERVAL = timedelta(minutes=15)
ATTR_AUTO_UPDATE = "auto_update"
ATTR_VERSION = "version"
+202 -9
View File
@@ -2,10 +2,11 @@
import asyncio
from collections import defaultdict
from collections.abc import Awaitable
from dataclasses import dataclass
from collections.abc import Awaitable, Callable
from dataclasses import dataclass, replace
import logging
from typing import TYPE_CHECKING, Any, cast, override
from uuid import UUID
from aiohasupervisor import SupervisorError, SupervisorNotFoundError
from aiohasupervisor.models import (
@@ -17,6 +18,7 @@ from aiohasupervisor.models import (
HostInfo,
InstalledAddon,
InstalledAddonComplete,
Job,
NetworkInfo,
NFSMountResponse,
OSInfo,
@@ -29,7 +31,12 @@ from aiohasupervisor.models import (
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import ATTR_MANUFACTURER
from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback
from homeassistant.core import (
CALLBACK_TYPE,
HomeAssistant,
callback,
is_callback_check_partial,
)
from homeassistant.helpers import device_registry as dr
from homeassistant.helpers.debounce import Debouncer
from homeassistant.helpers.device_registry import DeviceInfo
@@ -59,6 +66,7 @@ from .const import (
DATA_SUPERVISOR_INFO,
DATA_SUPERVISOR_STATS,
DOMAIN,
EVENT_JOB,
EVENT_SUPERVISOR_EVENT,
EVENT_SUPERVISOR_UPDATE,
HASSIO_ADDON_UPDATE_INTERVAL,
@@ -67,12 +75,12 @@ from .const import (
REQUEST_REFRESH_DELAY,
STARTUP_COMPLETE,
SUPERVISOR_CONTAINER,
SUPERVISOR_JOBS_UPDATE_INTERVAL,
UPDATE_KEY_SUPERVISOR,
SupervisorEntityModel,
)
from .exceptions import HassioNotReadyError
from .handler import get_supervisor_client
from .jobs import SupervisorJobs
if TYPE_CHECKING:
from .issues import SupervisorIssues
@@ -80,6 +88,196 @@ if TYPE_CHECKING:
_LOGGER = logging.getLogger(__name__)
@dataclass(slots=True, frozen=True)
class JobSubscription:
"""Subscribe for updates on jobs which match filters.
UUID is preferred match but only available in cases of a background API that
returns the UUID before taking the action. Others are used to match jobs only
if UUID is omitted. Either name or UUID is required to be able to match.
event_callback must be safe annotated as a homeassistant.core.callback
and safe to call in the event loop.
"""
event_callback: Callable[[Job], None]
uuid: str | None = None
name: str | None = None
reference: str | None = None
def __post_init__(self) -> None:
"""Validate at least one filter option is present."""
if not self.name and not self.uuid:
raise ValueError("Either name or uuid must be provided!")
if not is_callback_check_partial(self.event_callback):
raise ValueError("event_callback must be a homeassistant.core.callback!")
def matches(self, job: Job) -> bool:
"""Return true if job matches subscription filters."""
if self.uuid:
return job.uuid == self.uuid
return job.name == self.name and self.reference in (None, job.reference)
class SupervisorJobsCoordinator(DataUpdateCoordinator[dict[UUID, Job]]):
"""Manage access to Supervisor jobs."""
config_entry: ConfigEntry
def __init__(self, hass: HomeAssistant, config_entry: ConfigEntry) -> None:
"""Initialize object."""
super().__init__(
hass,
_LOGGER,
config_entry=config_entry,
name="SupervisorJobsCoordinator",
update_interval=SUPERVISOR_JOBS_UPDATE_INTERVAL,
# We don't want an immediate refresh since we want to avoid
# hammering the Supervisor API on startup
request_refresh_debouncer=Debouncer(
hass, _LOGGER, cooldown=REQUEST_REFRESH_DELAY, immediate=False
),
)
self._supervisor_client = get_supervisor_client(hass)
self._subscriptions: set[JobSubscription] = set()
self._dispatcher_disconnect: Callable[[], None] | None = None
self._noop_listener_disconnect: Callable[[], None] | None = None
@property
def current_jobs(self) -> list[Job]:
"""Return current jobs."""
return list(self.data.values()) if self.data is not None else []
@staticmethod
def _build_jobs(jobs: list[Job]) -> dict[UUID, Job]:
"""Flatten jobs and child jobs into a UUID keyed cache."""
job_queue: list[Job] = jobs.copy()
cached_jobs: dict[UUID, Job] = {}
while job_queue:
job = job_queue.pop(0)
job_queue.extend(job.child_jobs)
cached_jobs[job.uuid] = replace(job, child_jobs=[])
return cached_jobs
async def _async_update_data(self) -> dict[UUID, Job]:
"""Fetch data from Supervisor."""
job_data = await self._supervisor_client.jobs.info()
return self._build_jobs(job_data.jobs)
def _process_job_change(self, job: Job) -> None:
"""Process a job change by triggering callbacks on subscribers."""
for sub in self._subscriptions:
if sub.matches(job):
sub.event_callback(job)
def _process_job_deltas(
self,
previous_jobs: dict[UUID, Job],
current_jobs: dict[UUID, Job],
) -> None:
"""Notify subscribers about changes between two job caches."""
for job in current_jobs.values():
if (previous_job := previous_jobs.get(job.uuid)) is not None and (
previous_job == job
):
continue
self._process_job_change(job)
for uuid, job in previous_jobs.items():
if uuid not in current_jobs and job.done is False:
self._process_job_change(replace(job, done=True))
def subscribe(self, subscription: JobSubscription) -> CALLBACK_TYPE:
"""Subscribe to updates for job. Return callback is used to unsubscribe.
If any jobs match the subscription at the time this is called, runs the
callback on them.
"""
self._subscriptions.add(subscription)
# Connect a stub listener to start the update interval polling on first subscriber
if self._noop_listener_disconnect is None:
self._noop_listener_disconnect = self.async_add_listener(lambda: None)
# Run the callback on each existing match
# We catch all errors to prevent an error in one from stopping the others
for match in [job for job in self.current_jobs if subscription.matches(job)]:
try:
subscription.event_callback(match)
except Exception as err: # noqa: BLE001
_LOGGER.error(
"Error encountered processing Supervisor Job (%s %s %s) - %s",
match.name,
match.reference,
match.uuid,
err,
)
def _unsubscribe() -> None:
self._subscriptions.discard(subscription)
# Stop polling if there are no more subscribers
if not self._subscriptions and self._noop_listener_disconnect is not None:
self._noop_listener_disconnect()
self._noop_listener_disconnect = None
return _unsubscribe
@callback
def _async_refresh_finished(self) -> None:
"""Register to receive Supervisor events after the first successful refresh."""
if self.last_update_success and self._dispatcher_disconnect is None:
self._dispatcher_disconnect = async_dispatcher_connect(
self.hass, EVENT_SUPERVISOR_EVENT, self._supervisor_events_to_jobs
)
async def _async_refresh(
self,
log_failures: bool = True,
raise_on_auth_failed: bool = False,
scheduled: bool = False,
raise_on_entry_error: bool = False,
) -> None:
"""Refresh data and notify subscribers about cache changes."""
previous_jobs = self.data or {}
await super()._async_refresh(
log_failures, raise_on_auth_failed, scheduled, raise_on_entry_error
)
if self.last_update_success and self.data is not None:
self._process_job_deltas(previous_jobs, self.data)
async def async_shutdown(self) -> None:
"""Shut down the coordinator."""
await super().async_shutdown()
if self._dispatcher_disconnect:
self._dispatcher_disconnect()
self._dispatcher_disconnect = None
@callback
def _supervisor_events_to_jobs(self, event: dict[str, Any]) -> None:
"""Update job data cache from supervisor events."""
if ATTR_WS_EVENT not in event:
return
if (
event[ATTR_WS_EVENT] == EVENT_SUPERVISOR_UPDATE
and event.get(ATTR_UPDATE_KEY) == UPDATE_KEY_SUPERVISOR
and event.get(ATTR_DATA, {}).get(ATTR_STARTUP) == STARTUP_COMPLETE
):
self.config_entry.async_create_task(self.hass, self.async_request_refresh())
elif event[ATTR_WS_EVENT] == EVENT_JOB:
job = Job.from_dict(event[ATTR_DATA] | {"child_jobs": []})
previous_jobs = self.data or {}
updated_jobs = {**previous_jobs, job.uuid: job}
if job.done:
updated_jobs.pop(job.uuid, None)
self.async_set_updated_data(updated_jobs)
self._process_job_change(job)
@dataclass
class HassioMainData:
"""Data class for HassioMainDataUpdateCoordinator."""
@@ -591,7 +789,6 @@ class HassioAddOnDataUpdateCoordinator(DataUpdateCoordinator[HassioAddonData]):
hass: HomeAssistant,
config_entry: ConfigEntry,
dev_reg: dr.DeviceRegistry,
jobs: SupervisorJobs,
) -> None:
"""Initialize coordinator."""
super().__init__(
@@ -610,7 +807,6 @@ class HassioAddOnDataUpdateCoordinator(DataUpdateCoordinator[HassioAddonData]):
self.dev_reg = dev_reg
self._addon_info_subscriptions: defaultdict[str, set[str]] = defaultdict(set)
self.supervisor_client = get_supervisor_client(hass)
self.jobs = jobs
@override
async def _async_update_data(self) -> HassioAddonData:
@@ -800,7 +996,6 @@ class HassioMainDataUpdateCoordinator(DataUpdateCoordinator[HassioMainData]):
self.dev_reg = dev_reg
self.is_hass_os = False
self.supervisor_client = get_supervisor_client(hass)
self.jobs = SupervisorJobs(hass)
self._dispatcher_disconnect = async_dispatcher_connect(
hass, EVENT_SUPERVISOR_EVENT, self._supervisor_event
)
@@ -854,7 +1049,6 @@ class HassioMainDataUpdateCoordinator(DataUpdateCoordinator[HassioMainData]):
),
)
mounts_info = await client.mounts.info()
await self.jobs.refresh_data(is_first_update)
except SupervisorError as err:
raise UpdateFailed(f"Error on Supervisor API: {err}") from err
@@ -951,4 +1145,3 @@ class HassioMainDataUpdateCoordinator(DataUpdateCoordinator[HassioMainData]):
"""Shut down and clean up when config entry unloaded."""
await super().async_shutdown()
self._dispatcher_disconnect()
self.jobs.unload()
-179
View File
@@ -1,179 +0,0 @@
"""Track Supervisor job data and allow subscription to updates."""
from collections.abc import Callable
from dataclasses import dataclass, replace
from functools import partial
import logging
from typing import Any
from uuid import UUID
from aiohasupervisor.models import Job
from homeassistant.core import (
CALLBACK_TYPE,
HomeAssistant,
callback,
is_callback_check_partial,
)
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from .const import (
ATTR_DATA,
ATTR_STARTUP,
ATTR_UPDATE_KEY,
ATTR_WS_EVENT,
EVENT_JOB,
EVENT_SUPERVISOR_EVENT,
EVENT_SUPERVISOR_UPDATE,
STARTUP_COMPLETE,
UPDATE_KEY_SUPERVISOR,
)
from .handler import get_supervisor_client
_LOGGER = logging.getLogger(__name__)
@dataclass(slots=True, frozen=True)
class JobSubscription:
"""Subscribe for updates on jobs which match filters.
UUID is preferred match but only available in cases of a background API that
returns the UUID before taking the action. Others are used to match jobs only
if UUID is omitted. Either name or UUID is required to be able to match.
event_callback must be safe annotated as a homeassistant.core.callback
and safe to call in the event loop.
"""
event_callback: Callable[[Job], Any]
uuid: str | None = None
name: str | None = None
reference: str | None = None
def __post_init__(self) -> None:
"""Validate at least one filter option is present."""
if not self.name and not self.uuid:
raise ValueError("Either name or uuid must be provided!")
if not is_callback_check_partial(self.event_callback):
raise ValueError("event_callback must be a homeassistant.core.callback!")
def matches(self, job: Job) -> bool:
"""Return true if job matches subscription filters."""
if self.uuid:
return job.uuid == self.uuid
return job.name == self.name and self.reference in (None, job.reference)
class SupervisorJobs:
"""Manage access to Supervisor jobs."""
def __init__(self, hass: HomeAssistant) -> None:
"""Initialize object."""
self._hass = hass
self._supervisor_client = get_supervisor_client(hass)
self._jobs: dict[UUID, Job] = {}
self._subscriptions: set[JobSubscription] = set()
self._dispatcher_disconnect: Callable[[], None] | None = None
@property
def current_jobs(self) -> list[Job]:
"""Return current jobs."""
return list(self._jobs.values())
def subscribe(self, subscription: JobSubscription) -> CALLBACK_TYPE:
"""Subscribe to updates for job. Return callback is used to unsubscribe.
If any jobs match the subscription at the time this is called, runs the
callback on them.
"""
self._subscriptions.add(subscription)
# Run the callback on each existing match
# We catch all errors to prevent an error in one from stopping the others
for match in [job for job in self._jobs.values() if subscription.matches(job)]:
try:
return subscription.event_callback(match)
except Exception as err: # noqa: BLE001
_LOGGER.error(
"Error encountered processing Supervisor Job (%s %s %s) - %s",
match.name,
match.reference,
match.uuid,
err,
)
return partial(self._subscriptions.discard, subscription)
async def refresh_data(self, first_update: bool = False) -> None:
"""Refresh job data."""
job_data = await self._supervisor_client.jobs.info()
job_queue: list[Job] = job_data.jobs.copy()
new_jobs: dict[UUID, Job] = {}
changed_jobs: list[Job] = []
# Rebuild our job cache from new info and compare to find changes
while job_queue:
job = job_queue.pop(0)
job_queue.extend(job.child_jobs)
job = replace(job, child_jobs=[])
if job.uuid not in self._jobs or job != self._jobs[job.uuid]:
changed_jobs.append(job)
new_jobs[job.uuid] = replace(job, child_jobs=[])
# For any jobs that disappeared which weren't done, tell subscribers they
# changed to done. We don't know what else happened to them so leave the
# rest of their state as is rather then guessing
changed_jobs.extend(
[
replace(job, done=True)
for uuid, job in self._jobs.items()
if uuid not in new_jobs and job.done is False
]
)
# Replace our cache and inform subscribers of all changes
self._jobs = new_jobs
for job in changed_jobs:
self._process_job_change(job)
# If this is the first update register to receive Supervisor events
if first_update:
self._dispatcher_disconnect = async_dispatcher_connect(
self._hass, EVENT_SUPERVISOR_EVENT, self._supervisor_events_to_jobs
)
@callback
def _supervisor_events_to_jobs(self, event: dict[str, Any]) -> None:
"""Update job data cache from supervisor events."""
if ATTR_WS_EVENT not in event:
return
if (
event[ATTR_WS_EVENT] == EVENT_SUPERVISOR_UPDATE
and event.get(ATTR_UPDATE_KEY) == UPDATE_KEY_SUPERVISOR
and event.get(ATTR_DATA, {}).get(ATTR_STARTUP) == STARTUP_COMPLETE
):
self._hass.async_create_task(self.refresh_data())
elif event[ATTR_WS_EVENT] == EVENT_JOB:
job = Job.from_dict(event[ATTR_DATA] | {"child_jobs": []})
self._jobs[job.uuid] = job
self._process_job_change(job)
def _process_job_change(self, job: Job) -> None:
"""Process a job change by triggering callbacks on subscribers."""
for sub in self._subscriptions:
if sub.matches(job):
sub.event_callback(job)
# If the job is done, pop it from our cache if present after processing is done
if job.done and job.uuid in self._jobs:
del self._jobs[job.uuid]
@callback
def unload(self) -> None:
"""Unregister with dispatcher on config entry unload."""
if self._dispatcher_disconnect:
self._dispatcher_disconnect()
self._dispatcher_disconnect = None
+10 -6
View File
@@ -17,15 +17,19 @@ from homeassistant.core import HomeAssistant, callback
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from .const import ADDONS_COORDINATOR, ATTR_VERSION_LATEST, MAIN_COORDINATOR
from .coordinator import AddonData
from .const import (
ADDONS_COORDINATOR,
ATTR_VERSION_LATEST,
JOBS_COORDINATOR,
MAIN_COORDINATOR,
)
from .coordinator import AddonData, JobSubscription
from .entity import (
HassioAddonEntity,
HassioCoreEntity,
HassioOSEntity,
HassioSupervisorEntity,
)
from .jobs import JobSubscription
from .update_helper import update_addon, update_core, update_os
ENTITY_DESCRIPTION = UpdateEntityDescription(
@@ -216,7 +220,7 @@ class SupervisorAddonUpdateEntity(HassioAddonEntity, UpdateEntity):
"""Subscribe to progress updates."""
await super().async_added_to_hass()
self.async_on_remove(
self.coordinator.jobs.subscribe(
self.hass.data[JOBS_COORDINATOR].subscribe(
JobSubscription(
self._update_job_changed,
name="addon_manager_update",
@@ -388,7 +392,7 @@ class SupervisorSupervisorUpdateEntity(HassioSupervisorEntity, UpdateEntity):
"""Subscribe to progress updates."""
await super().async_added_to_hass()
self.async_on_remove(
self.coordinator.jobs.subscribe(
self.hass.data[JOBS_COORDINATOR].subscribe(
JobSubscription(self._update_job_changed, name="supervisor_update")
)
)
@@ -458,7 +462,7 @@ class SupervisorCoreUpdateEntity(HassioCoreEntity, UpdateEntity):
"""Subscribe to progress updates."""
await super().async_added_to_hass()
self.async_on_remove(
self.coordinator.jobs.subscribe(
self.hass.data[JOBS_COORDINATOR].subscribe(
JobSubscription(
self._update_job_changed, name="home_assistant_core_update"
)
-4
View File
@@ -899,10 +899,6 @@ def supervisor_client() -> Generator[AsyncMock]:
"homeassistant.components.hassio.issues.get_supervisor_client",
return_value=supervisor_client,
),
patch(
"homeassistant.components.hassio.jobs.get_supervisor_client",
return_value=supervisor_client,
),
patch(
"homeassistant.components.hassio.repairs.get_supervisor_client",
return_value=supervisor_client,
+89 -20
View File
@@ -9,14 +9,25 @@ from uuid import uuid4
from aiohasupervisor.models import Job, JobsInfo
import pytest
from homeassistant.components.hassio.const import DOMAIN, MAIN_COORDINATOR
from homeassistant.components.hassio.coordinator import HassioMainDataUpdateCoordinator
from homeassistant.components.hassio.jobs import JobSubscription
from homeassistant.components.hassio.const import (
DOMAIN,
JOBS_COORDINATOR,
MAIN_COORDINATOR,
REQUEST_REFRESH_DELAY,
SUPERVISOR_JOBS_UPDATE_INTERVAL,
)
from homeassistant.components.hassio.coordinator import (
HassioMainDataUpdateCoordinator,
JobSubscription,
SupervisorJobsCoordinator,
)
from homeassistant.core import HomeAssistant, callback
from homeassistant.setup import async_setup_component
from homeassistant.util import dt as dt_util
from .test_init import MOCK_ENVIRON
from tests.common import async_fire_time_changed
from tests.typing import WebSocketGenerator
@@ -65,10 +76,10 @@ async def test_job_manager_setup(hass: HomeAssistant, jobs_info: AsyncMock) -> N
assert result
jobs_info.assert_called_once()
data_coordinator: HassioMainDataUpdateCoordinator = hass.data[MAIN_COORDINATOR]
assert len(data_coordinator.jobs.current_jobs) == 2
assert data_coordinator.jobs.current_jobs[0].name == "test_job"
assert data_coordinator.jobs.current_jobs[1].name == "test_inner_job"
jobs_coordinator: SupervisorJobsCoordinator = hass.data[JOBS_COORDINATOR]
assert len(jobs_coordinator.current_jobs) == 2
assert jobs_coordinator.current_jobs[0].name == "test_job"
assert jobs_coordinator.current_jobs[1].name == "test_inner_job"
@pytest.mark.usefixtures("all_setup_requests")
@@ -100,8 +111,8 @@ async def test_job_manager_ws_updates(
jobs_info.reset_mock()
client = await hass_supervisor_ws_client()
data_coordinator: HassioMainDataUpdateCoordinator = hass.data[MAIN_COORDINATOR]
assert not data_coordinator.jobs.current_jobs
jobs_coordinator: SupervisorJobsCoordinator = hass.data[JOBS_COORDINATOR]
assert not jobs_coordinator.current_jobs
# Make an example listener
job_data: Job | None = None
@@ -114,7 +125,7 @@ async def test_job_manager_ws_updates(
subscription = JobSubscription(
mock_subcription_callback, name="test_job", reference="test"
)
unsubscribe = data_coordinator.jobs.subscribe(subscription)
unsubscribe = jobs_coordinator.subscribe(subscription)
# Send start of job update
await client.send_json(
@@ -146,7 +157,7 @@ async def test_job_manager_ws_updates(
assert job_data.progress == 0
assert job_data.done is False
# One job in the cache
assert len(data_coordinator.jobs.current_jobs) == 1
assert len(jobs_coordinator.current_jobs) == 1
# Example progress update
await client.send_json(
@@ -178,7 +189,7 @@ async def test_job_manager_ws_updates(
assert job_data.progress == 50
assert job_data.done is False
# Same job, same number of jobs in cache
assert len(data_coordinator.jobs.current_jobs) == 1
assert len(jobs_coordinator.current_jobs) == 1
# Unrelated job update - name change, subscriber should not receive
await client.send_json(
@@ -208,7 +219,7 @@ async def test_job_manager_ws_updates(
assert job_data.name == "test_job"
assert job_data.reference == "test"
# New job, cache increases
assert len(data_coordinator.jobs.current_jobs) == 2
assert len(jobs_coordinator.current_jobs) == 2
# Unrelated job update - reference change, subscriber should not receive
await client.send_json(
@@ -238,7 +249,7 @@ async def test_job_manager_ws_updates(
assert job_data.name == "test_job"
assert job_data.reference == "test"
# New job, cache increases
assert len(data_coordinator.jobs.current_jobs) == 3
assert len(jobs_coordinator.current_jobs) == 3
# Unsubscribe mock listener, should not receive final update
unsubscribe()
@@ -271,7 +282,7 @@ async def test_job_manager_ws_updates(
assert job_data.progress == 50
assert job_data.done is False
# Job ended, cache decreases
assert len(data_coordinator.jobs.current_jobs) == 2
assert len(jobs_coordinator.current_jobs) == 2
# REST API should not be used during this sequence
jobs_info.assert_not_called()
@@ -306,9 +317,9 @@ async def test_job_manager_reload_on_supervisor_restart(
assert result
jobs_info.assert_called_once()
data_coordinator: HassioMainDataUpdateCoordinator = hass.data[MAIN_COORDINATOR]
assert len(data_coordinator.jobs.current_jobs) == 1
assert data_coordinator.jobs.current_jobs[0].name == "test_job"
jobs_coordinator: SupervisorJobsCoordinator = hass.data[JOBS_COORDINATOR]
assert len(jobs_coordinator.current_jobs) == 1
assert jobs_coordinator.current_jobs[0].name == "test_job"
jobs_info.reset_mock()
jobs_info.return_value = JobsInfo(ignore_conditions=[], jobs=[])
@@ -323,7 +334,7 @@ async def test_job_manager_reload_on_supervisor_restart(
job_data = job
subscription = JobSubscription(mock_subcription_callback, name="test_job")
data_coordinator.jobs.subscribe(subscription)
jobs_coordinator.subscribe(subscription)
# Send supervisor restart signal
await client.send_json(
@@ -341,9 +352,67 @@ async def test_job_manager_reload_on_supervisor_restart(
assert msg["success"]
await hass.async_block_till_done()
# Advance time past the debouncer cooldown for the refresh to complete
async_fire_time_changed(
hass, dt_util.utcnow() + dt_util.dt.timedelta(seconds=REQUEST_REFRESH_DELAY + 1)
)
await hass.async_block_till_done()
# Listener should be told job is done and cache cleared out
jobs_info.assert_called_once()
assert job_data.name == "test_job"
assert job_data.reference == "test"
assert job_data.done is True
assert not data_coordinator.jobs.current_jobs
assert not jobs_coordinator.current_jobs
@pytest.mark.usefixtures("all_setup_requests")
async def test_job_manager_periodic_refresh(
hass: HomeAssistant, jobs_info: AsyncMock
) -> None:
"""Test job manager performs periodic refresh as backstop for dropped WS events."""
jobs_info.return_value = JobsInfo(
ignore_conditions=[],
jobs=[
Job(
name="test_job",
reference="test",
uuid=uuid4(),
progress=0,
stage=None,
done=False,
errors=[],
created=datetime.now(), # pylint: disable=home-assistant-enforce-naive-now
extra=None,
child_jobs=[],
)
],
)
result = await async_setup_component(hass, DOMAIN, {})
assert result
jobs_info.assert_called_once()
jobs_coordinator: SupervisorJobsCoordinator = hass.data[JOBS_COORDINATOR]
assert len(jobs_coordinator.current_jobs) == 1
# Subscribe to job updates
job_data: Job | None = None
@callback
def mock_subscription_callback(job: Job) -> None:
nonlocal job_data
job_data = job
subscription = JobSubscription(mock_subscription_callback, name="test_job")
jobs_coordinator.subscribe(subscription)
# Reset mock to verify periodic refresh
jobs_info.reset_mock()
# Advance time past the SUPERVISOR_JOBS_UPDATE_INTERVAL to trigger periodic refresh
async_fire_time_changed(hass, dt_util.utcnow() + SUPERVISOR_JOBS_UPDATE_INTERVAL)
await hass.async_block_till_done()
# Periodic refresh should have called jobs_info
jobs_info.assert_called()