Inline force_data_refresh into coordinator update methods

Replace the indirect data flow (force_data_refresh writes to hass.data,
_async_update_data reads it back) with direct fetching in
_async_update_data. Coordinator data is now built directly from API
results, and hass.data is updated separately for legacy accessor
functions.

This ensures deprecated compatibility keys (repositories, addons
folded into supervisor_info) are only written to hass.data and do not
leak into coordinator data.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Stefan Agner
2026-04-10 12:14:14 +02:00
parent 7e6cc18489
commit 0dd31eedc9

View File

@@ -463,17 +463,44 @@ class HassioAddOnDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
async def _async_update_data(self) -> dict[str, Any]:
"""Update data via library."""
is_first_update = not self.data
client = self.supervisor_client
try:
await self.force_data_refresh(is_first_update)
installed_addons: list[InstalledAddon] = await client.addons.list()
all_addons = {addon.slug for addon in installed_addons}
# Fetch addon info for all addons on first update, or only
# for addons with subscribed entities on subsequent updates.
addon_info_results = dict(
await asyncio.gather(
*[
self._update_addon_info(slug)
for slug in all_addons
if is_first_update or self._addon_info_subscriptions.get(slug)
]
)
)
except SupervisorError as err:
raise UpdateFailed(f"Error on Supervisor API: {err}") from err
new_data: dict[str, Any] = {}
addons_info = get_addons_info(self.hass) or {}
store_data = get_store(self.hass)
addons_list = get_addons_list(self.hass) or []
# Update hass.data for legacy accessor functions
data = self.hass.data
addons_list_dicts = [addon.to_dict() for addon in installed_addons]
data[DATA_ADDONS_LIST] = addons_list_dicts
# Update addon info cache in hass.data
addon_info_cache: dict[str, Any] = data.setdefault(DATA_ADDONS_INFO, {})
for slug in addon_info_cache.keys() - all_addons:
del addon_info_cache[slug]
addon_info_cache.update(addon_info_results)
# Deprecated 2026.4.0: Folding addons.list results into supervisor_info
# for compatibility. Written to hass.data only, not coordinator data.
if DATA_SUPERVISOR_INFO in data:
data[DATA_SUPERVISOR_INFO]["addons"] = addons_list_dicts
# Build clean coordinator data
store_data = get_store(self.hass)
if store_data:
repositories = {
repo.slug: repo.name
@@ -482,20 +509,21 @@ class HassioAddOnDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
else:
repositories = {}
new_data: dict[str, Any] = {}
new_data[DATA_KEY_ADDONS] = {
(slug := addon[ATTR_SLUG]): {
**addon,
ATTR_AUTO_UPDATE: (addons_info.get(slug) or {}).get(
ATTR_AUTO_UPDATE: (addon_info_cache.get(slug) or {}).get(
ATTR_AUTO_UPDATE, False
),
ATTR_REPOSITORY: repositories.get(
repo_slug := addon.get(ATTR_REPOSITORY, ""), repo_slug
),
}
for addon in addons_list
for addon in addons_list_dicts
}
# If this is the initial refresh, register all addons and return the dict
# If this is the initial refresh, register all addons
if is_first_update:
async_register_addons_in_dev_reg(
self.entry_id, self.dev_reg, new_data[DATA_KEY_ADDONS].values()
@@ -532,42 +560,6 @@ class HassioAddOnDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
except SupervisorNotFoundError:
return None
async def force_data_refresh(self, first_update: bool) -> None:
"""Force update of the addon info."""
data = self.hass.data
client = self.supervisor_client
installed_addons: list[InstalledAddon] = await client.addons.list()
data[DATA_ADDONS_LIST] = [addon.to_dict() for addon in installed_addons]
# Deprecated 2026.4.0: Folding addons.list results into supervisor_info for compatibility
# Can drop this after removal period
if DATA_SUPERVISOR_INFO in data:
data[DATA_SUPERVISOR_INFO]["addons"] = data[DATA_ADDONS_LIST]
all_addons = {addon.slug for addon in installed_addons}
# Update addon info if its the first update or
# there is at least one entity that needs the data.
addon_info: dict[str, Any] = data.setdefault(DATA_ADDONS_INFO, {})
# Clean up cache
for slug in addon_info.keys() - all_addons:
del addon_info[slug]
# Update cache from API
addon_info.update(
dict(
await asyncio.gather(
*[
self._update_addon_info(slug)
for slug in all_addons
if (first_update) or self._addon_info_subscriptions.get(slug)
]
)
)
)
async def _update_addon_info(self, slug: str) -> tuple[str, dict[str, Any] | None]:
"""Return the info for an addon."""
try:
@@ -666,23 +658,54 @@ class HassioDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
async def _async_update_data(self) -> dict[str, Any]:
"""Update data via library."""
is_first_update = not self.data
client = self.supervisor_client
try:
await self.force_data_refresh(is_first_update)
(
info,
core_info,
supervisor_info,
os_info,
host_info,
store_info,
network_info,
) = await asyncio.gather(
client.info(),
client.homeassistant.info(),
client.supervisor.info(),
client.os.info(),
client.host.info(),
client.store.info(),
client.network.info(),
)
mounts_info = await client.mounts.info()
await self.jobs.refresh_data(is_first_update)
except SupervisorError as err:
raise UpdateFailed(f"Error on Supervisor API: {err}") from err
# Build clean coordinator data
new_data: dict[str, Any] = {}
supervisor_info = get_supervisor_info(self.hass) or {}
mounts_info = await self.supervisor_client.mounts.info()
if self.is_hass_os:
new_data[DATA_KEY_OS] = get_os_info(self.hass)
new_data[DATA_KEY_CORE] = get_core_info(self.hass) or {}
new_data[DATA_KEY_SUPERVISOR] = supervisor_info
new_data[DATA_KEY_HOST] = get_host_info(self.hass) or {}
new_data[DATA_KEY_CORE] = core_info.to_dict()
new_data[DATA_KEY_SUPERVISOR] = supervisor_info.to_dict()
new_data[DATA_KEY_HOST] = host_info.to_dict()
new_data[DATA_KEY_MOUNTS] = {mount.name: mount for mount in mounts_info.mounts}
if self.is_hass_os:
new_data[DATA_KEY_OS] = os_info.to_dict()
# Update hass.data for legacy accessor functions
data = self.hass.data
data[DATA_INFO] = info.to_dict()
data[DATA_CORE_INFO] = new_data[DATA_KEY_CORE]
data[DATA_OS_INFO] = new_data.get(DATA_KEY_OS, os_info.to_dict())
data[DATA_HOST_INFO] = new_data[DATA_KEY_HOST]
data[DATA_STORE] = store_info.to_dict()
data[DATA_NETWORK_INFO] = network_info.to_dict()
# Separate dict for hass.data supervisor info since we add deprecated
# compat keys that should not be in coordinator data
data[DATA_SUPERVISOR_INFO] = supervisor_info.to_dict()
# Deprecated 2026.4.0: Folding repositories into supervisor_info for
# compatibility. Written to hass.data only, not coordinator data.
data[DATA_SUPERVISOR_INFO]["repositories"] = data[DATA_STORE][ATTR_REPOSITORIES]
# If this is the initial refresh, register all main components
if is_first_update:
@@ -733,32 +756,6 @@ class HassioDataUpdateCoordinator(DataUpdateCoordinator[dict[str, Any]]):
return new_data
async def force_data_refresh(self, first_update: bool) -> None:
"""Force update of the main component info."""
data = self.hass.data
client = self.supervisor_client
updates: dict[str, Awaitable[ResponseData]] = {
DATA_INFO: client.info(),
DATA_CORE_INFO: client.homeassistant.info(),
DATA_SUPERVISOR_INFO: client.supervisor.info(),
DATA_OS_INFO: client.os.info(),
DATA_HOST_INFO: client.host.info(),
DATA_STORE: client.store.info(),
DATA_NETWORK_INFO: client.network.info(),
}
api_results: list[ResponseData] = await asyncio.gather(*updates.values())
for key, result in zip(updates, api_results, strict=True):
data[key] = result.to_dict()
# Deprecated 2026.4.0: Folding repositories into supervisor_info for compatibility
# Can drop this after removal period
data[DATA_SUPERVISOR_INFO]["repositories"] = data[DATA_STORE][ATTR_REPOSITORIES]
# Refresh jobs data
await self.jobs.refresh_data(first_update)
async def _async_refresh(
self,
log_failures: bool = True,