Compare commits

...

1 Commits

Author SHA1 Message Date
J. Nick Koston
a13c0effb8 attempt to fix rename race 2026-02-24 10:04:27 -06:00
5 changed files with 118 additions and 5 deletions

View File

@@ -28,6 +28,11 @@ def async_setup(hass: HomeAssistant) -> None:
assert event.data["action"] == "update" and "old_entity_id" in event.data
old_entity_id = event.data["old_entity_id"]
new_entity_id = event.data["entity_id"]
# Notify the states meta manager about the pending rename so
# that any StatisticsTask that runs before the actual database
# update can still resolve the new entity_id to the correct
# metadata_id.
instance.states_meta_manager.queue_rename(old_entity_id, new_entity_id)
async_update_statistics_metadata(
hass, old_entity_id, new_statistic_id=new_entity_id
)

View File

@@ -952,7 +952,13 @@ def async_update_statistics_metadata(
f"for unit_class '{new_unit_class}'"
)
get_instance(hass).async_update_statistics_metadata(
instance = get_instance(hass)
# Notify the statistics meta manager about the pending rename so
# that any StatisticsTask that runs before the actual database
# update can still resolve the new statistic_id.
if new_statistic_id is not UNDEFINED and new_statistic_id is not None:
instance.statistics_meta_manager.queue_rename(statistic_id, new_statistic_id)
instance.async_update_statistics_metadata(
statistic_id,
new_statistic_id=new_statistic_id,
new_unit_class=new_unit_class,

View File

@@ -3,6 +3,7 @@
from __future__ import annotations
from collections.abc import Iterable, Sequence
from queue import SimpleQueue
from typing import TYPE_CHECKING, cast
from sqlalchemy.orm.session import Session
@@ -27,8 +28,32 @@ class StatesMetaManager(BaseLRUTableManager[StatesMeta]):
def __init__(self, recorder: Recorder) -> None:
"""Initialize the states meta manager."""
self._did_first_load = False
# Thread-safe queue for entity_id renames from the event loop.
# Items are (old_entity_id, new_entity_id) tuples.
self._rename_queue: SimpleQueue[tuple[str, str]] = SimpleQueue()
# Recorder-thread-only dict mapping new_entity_id -> old_entity_id
# for renames that haven't been applied to the database yet.
self._pending_rename: dict[str, str] = {}
super().__init__(recorder, CACHE_SIZE)
def queue_rename(self, old_entity_id: str, new_entity_id: str) -> None:
"""Queue an entity_id rename notification.
This method is thread-safe and is called from the event loop
to notify the recorder thread about a pending entity_id rename.
"""
self._rename_queue.put((old_entity_id, new_entity_id))
def drain_pending_renames(self) -> None:
"""Drain the rename queue into the pending rename dict.
This call is not thread-safe and must be called from the
recorder thread.
"""
while not self._rename_queue.empty():
old_entity_id, new_entity_id = self._rename_queue.get_nowait()
self._pending_rename[new_entity_id] = old_entity_id
def load(
self, events: list[Event[EventStateChangedData]], session: Session
) -> None:
@@ -117,6 +142,21 @@ class StatesMetaManager(BaseLRUTableManager[StatesMeta]):
if update_cache:
self._id_map[entity_id] = metadata_id
if not from_recorder:
return results
# Check pending renames for any entity_ids still not resolved.
# If an entity_id was renamed but the database hasn't been updated
# yet, we can resolve the new entity_id by looking up the old one.
pending_rename = self._pending_rename
for entity_id in missing:
if (
results.get(entity_id) is None
and (old_entity_id := pending_rename.get(entity_id)) is not None
and (metadata_id := self._id_map.get(old_entity_id)) is not None
):
results[entity_id] = metadata_id
return results
def add_pending(self, db_states_meta: StatesMeta) -> None:
@@ -155,12 +195,18 @@ class StatesMetaManager(BaseLRUTableManager[StatesMeta]):
new_entity_id: str,
) -> bool:
"""Update states metadata for an entity_id."""
# Clear the pending rename before the collision check so
# get() doesn't resolve new_entity_id via the side channel.
self._pending_rename.pop(new_entity_id, None)
if self.get(new_entity_id, session, True) is not None:
# If the new entity id already exists we have
# a collision and should not update.
return False
metadata_id = self._id_map.get(entity_id)
session.query(StatesMeta).filter(StatesMeta.entity_id == entity_id).update(
{StatesMeta.entity_id: new_entity_id}
)
self._id_map.pop(entity_id, None)
if metadata_id is not None:
self._id_map[new_entity_id] = metadata_id
return True

View File

@@ -1,8 +1,9 @@
"""Support managing StatesMeta."""
"""Support managing StatisticsMeta."""
from __future__ import annotations
import logging
from queue import SimpleQueue
import threading
from typing import TYPE_CHECKING, Any, Final, Literal
@@ -88,12 +89,36 @@ class StatisticsMetaManager:
self._stat_id_to_id_meta: LRU[str, tuple[int, StatisticMetaData]] = LRU(
CACHE_SIZE
)
# Thread-safe queue for statistic_id renames from the event loop.
# Items are (old_statistic_id, new_statistic_id) tuples.
self._rename_queue: SimpleQueue[tuple[str, str]] = SimpleQueue()
# Recorder-thread-only dict mapping new_statistic_id -> old_statistic_id
# for renames that haven't been applied to the database yet.
self._pending_rename: dict[str, str] = {}
def _clear_cache(self, statistic_ids: list[str]) -> None:
"""Clear the cache."""
for statistic_id in statistic_ids:
self._stat_id_to_id_meta.pop(statistic_id, None)
def queue_rename(self, old_statistic_id: str, new_statistic_id: str) -> None:
"""Queue a statistic_id rename notification.
This method is thread-safe and is called from the event loop
to notify the recorder thread about a pending statistic_id rename.
"""
self._rename_queue.put((old_statistic_id, new_statistic_id))
def drain_pending_renames(self) -> None:
"""Drain the rename queue into the pending rename dict.
This call is not thread-safe and must be called from the
recorder thread.
"""
while not self._rename_queue.empty():
old_statistic_id, new_statistic_id = self._rename_queue.get_nowait()
self._pending_rename[new_statistic_id] = old_statistic_id
def _get_from_database(
self,
session: Session,
@@ -293,9 +318,28 @@ class StatisticsMetaManager:
return results
# Fetch metadata from the database
return results | self._get_from_database(
session, statistic_ids=missing_statistic_id
)
results |= self._get_from_database(session, statistic_ids=missing_statistic_id)
# Check pending renames for any statistic_ids still not resolved.
# If a statistic_id was renamed but the database hasn't been
# updated yet, resolve the new statistic_id using the old one.
if self.recorder.thread_id == threading.get_ident() and (
pending_rename := self._pending_rename
):
for statistic_id in missing_statistic_id:
if (
statistic_id not in results
and (old_id := pending_rename.get(statistic_id)) is not None
):
# Try cache first, then database for the old statistic_id
if id_meta := self._stat_id_to_id_meta.get(old_id):
results[statistic_id] = id_meta
elif db_result := self._get_from_database(
session, statistic_ids={old_id}
):
results[statistic_id] = next(iter(db_result.values()))
return results
def get_from_cache_threadsafe(
self, statistic_ids: set[str]
@@ -377,6 +421,9 @@ class StatisticsMetaManager:
recorder thread.
"""
self._assert_in_recorder_thread()
# Clear the pending rename before the collision check so
# get() doesn't resolve new_statistic_id via the side channel.
self._pending_rename.pop(new_statistic_id, None)
if self.get(session, new_statistic_id):
_LOGGER.error(
"Cannot rename statistic_id `%s` to `%s` because the new statistic_id is already in use",

View File

@@ -82,6 +82,7 @@ class UpdateStatisticsMetadataTask(RecorderTask):
def run(self, instance: Recorder) -> None:
"""Handle the task."""
instance.statistics_meta_manager.drain_pending_renames()
statistics.update_statistics_metadata(
instance,
self.statistic_id,
@@ -102,6 +103,7 @@ class UpdateStatesMetadataTask(RecorderTask):
def run(self, instance: Recorder) -> None:
"""Handle the task."""
instance.states_meta_manager.drain_pending_renames()
entity_registry.update_states_metadata(
instance,
self.entity_id,
@@ -169,6 +171,11 @@ class StatisticsTask(RecorderTask):
def run(self, instance: Recorder) -> None:
"""Run statistics task."""
# Drain any pending entity_id/statistic_id renames so the
# compilation can resolve new ids that the database doesn't
# know about yet.
instance.states_meta_manager.drain_pending_renames()
instance.statistics_meta_manager.drain_pending_renames()
if statistics.compile_statistics(instance, self.start, self.fire_events):
return
# Schedule a new statistics task if this one didn't finish
@@ -181,6 +188,8 @@ class CompileMissingStatisticsTask(RecorderTask):
def run(self, instance: Recorder) -> None:
"""Run statistics task to compile missing statistics."""
instance.states_meta_manager.drain_pending_renames()
instance.statistics_meta_manager.drain_pending_renames()
if statistics.compile_missing_statistics(instance):
return
# Schedule a new statistics task if this one didn't finish