mirror of
https://github.com/home-assistant/core.git
synced 2026-02-26 04:01:10 +01:00
Compare commits
1 Commits
dev
...
rename_rac
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a13c0effb8 |
@@ -28,6 +28,11 @@ def async_setup(hass: HomeAssistant) -> None:
|
||||
assert event.data["action"] == "update" and "old_entity_id" in event.data
|
||||
old_entity_id = event.data["old_entity_id"]
|
||||
new_entity_id = event.data["entity_id"]
|
||||
# Notify the states meta manager about the pending rename so
|
||||
# that any StatisticsTask that runs before the actual database
|
||||
# update can still resolve the new entity_id to the correct
|
||||
# metadata_id.
|
||||
instance.states_meta_manager.queue_rename(old_entity_id, new_entity_id)
|
||||
async_update_statistics_metadata(
|
||||
hass, old_entity_id, new_statistic_id=new_entity_id
|
||||
)
|
||||
|
||||
@@ -952,7 +952,13 @@ def async_update_statistics_metadata(
|
||||
f"for unit_class '{new_unit_class}'"
|
||||
)
|
||||
|
||||
get_instance(hass).async_update_statistics_metadata(
|
||||
instance = get_instance(hass)
|
||||
# Notify the statistics meta manager about the pending rename so
|
||||
# that any StatisticsTask that runs before the actual database
|
||||
# update can still resolve the new statistic_id.
|
||||
if new_statistic_id is not UNDEFINED and new_statistic_id is not None:
|
||||
instance.statistics_meta_manager.queue_rename(statistic_id, new_statistic_id)
|
||||
instance.async_update_statistics_metadata(
|
||||
statistic_id,
|
||||
new_statistic_id=new_statistic_id,
|
||||
new_unit_class=new_unit_class,
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Iterable, Sequence
|
||||
from queue import SimpleQueue
|
||||
from typing import TYPE_CHECKING, cast
|
||||
|
||||
from sqlalchemy.orm.session import Session
|
||||
@@ -27,8 +28,32 @@ class StatesMetaManager(BaseLRUTableManager[StatesMeta]):
|
||||
def __init__(self, recorder: Recorder) -> None:
|
||||
"""Initialize the states meta manager."""
|
||||
self._did_first_load = False
|
||||
# Thread-safe queue for entity_id renames from the event loop.
|
||||
# Items are (old_entity_id, new_entity_id) tuples.
|
||||
self._rename_queue: SimpleQueue[tuple[str, str]] = SimpleQueue()
|
||||
# Recorder-thread-only dict mapping new_entity_id -> old_entity_id
|
||||
# for renames that haven't been applied to the database yet.
|
||||
self._pending_rename: dict[str, str] = {}
|
||||
super().__init__(recorder, CACHE_SIZE)
|
||||
|
||||
def queue_rename(self, old_entity_id: str, new_entity_id: str) -> None:
|
||||
"""Queue an entity_id rename notification.
|
||||
|
||||
This method is thread-safe and is called from the event loop
|
||||
to notify the recorder thread about a pending entity_id rename.
|
||||
"""
|
||||
self._rename_queue.put((old_entity_id, new_entity_id))
|
||||
|
||||
def drain_pending_renames(self) -> None:
|
||||
"""Drain the rename queue into the pending rename dict.
|
||||
|
||||
This call is not thread-safe and must be called from the
|
||||
recorder thread.
|
||||
"""
|
||||
while not self._rename_queue.empty():
|
||||
old_entity_id, new_entity_id = self._rename_queue.get_nowait()
|
||||
self._pending_rename[new_entity_id] = old_entity_id
|
||||
|
||||
def load(
|
||||
self, events: list[Event[EventStateChangedData]], session: Session
|
||||
) -> None:
|
||||
@@ -117,6 +142,21 @@ class StatesMetaManager(BaseLRUTableManager[StatesMeta]):
|
||||
if update_cache:
|
||||
self._id_map[entity_id] = metadata_id
|
||||
|
||||
if not from_recorder:
|
||||
return results
|
||||
|
||||
# Check pending renames for any entity_ids still not resolved.
|
||||
# If an entity_id was renamed but the database hasn't been updated
|
||||
# yet, we can resolve the new entity_id by looking up the old one.
|
||||
pending_rename = self._pending_rename
|
||||
for entity_id in missing:
|
||||
if (
|
||||
results.get(entity_id) is None
|
||||
and (old_entity_id := pending_rename.get(entity_id)) is not None
|
||||
and (metadata_id := self._id_map.get(old_entity_id)) is not None
|
||||
):
|
||||
results[entity_id] = metadata_id
|
||||
|
||||
return results
|
||||
|
||||
def add_pending(self, db_states_meta: StatesMeta) -> None:
|
||||
@@ -155,12 +195,18 @@ class StatesMetaManager(BaseLRUTableManager[StatesMeta]):
|
||||
new_entity_id: str,
|
||||
) -> bool:
|
||||
"""Update states metadata for an entity_id."""
|
||||
# Clear the pending rename before the collision check so
|
||||
# get() doesn't resolve new_entity_id via the side channel.
|
||||
self._pending_rename.pop(new_entity_id, None)
|
||||
if self.get(new_entity_id, session, True) is not None:
|
||||
# If the new entity id already exists we have
|
||||
# a collision and should not update.
|
||||
return False
|
||||
metadata_id = self._id_map.get(entity_id)
|
||||
session.query(StatesMeta).filter(StatesMeta.entity_id == entity_id).update(
|
||||
{StatesMeta.entity_id: new_entity_id}
|
||||
)
|
||||
self._id_map.pop(entity_id, None)
|
||||
if metadata_id is not None:
|
||||
self._id_map[new_entity_id] = metadata_id
|
||||
return True
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
"""Support managing StatesMeta."""
|
||||
"""Support managing StatisticsMeta."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from queue import SimpleQueue
|
||||
import threading
|
||||
from typing import TYPE_CHECKING, Any, Final, Literal
|
||||
|
||||
@@ -88,12 +89,36 @@ class StatisticsMetaManager:
|
||||
self._stat_id_to_id_meta: LRU[str, tuple[int, StatisticMetaData]] = LRU(
|
||||
CACHE_SIZE
|
||||
)
|
||||
# Thread-safe queue for statistic_id renames from the event loop.
|
||||
# Items are (old_statistic_id, new_statistic_id) tuples.
|
||||
self._rename_queue: SimpleQueue[tuple[str, str]] = SimpleQueue()
|
||||
# Recorder-thread-only dict mapping new_statistic_id -> old_statistic_id
|
||||
# for renames that haven't been applied to the database yet.
|
||||
self._pending_rename: dict[str, str] = {}
|
||||
|
||||
def _clear_cache(self, statistic_ids: list[str]) -> None:
|
||||
"""Clear the cache."""
|
||||
for statistic_id in statistic_ids:
|
||||
self._stat_id_to_id_meta.pop(statistic_id, None)
|
||||
|
||||
def queue_rename(self, old_statistic_id: str, new_statistic_id: str) -> None:
|
||||
"""Queue a statistic_id rename notification.
|
||||
|
||||
This method is thread-safe and is called from the event loop
|
||||
to notify the recorder thread about a pending statistic_id rename.
|
||||
"""
|
||||
self._rename_queue.put((old_statistic_id, new_statistic_id))
|
||||
|
||||
def drain_pending_renames(self) -> None:
|
||||
"""Drain the rename queue into the pending rename dict.
|
||||
|
||||
This call is not thread-safe and must be called from the
|
||||
recorder thread.
|
||||
"""
|
||||
while not self._rename_queue.empty():
|
||||
old_statistic_id, new_statistic_id = self._rename_queue.get_nowait()
|
||||
self._pending_rename[new_statistic_id] = old_statistic_id
|
||||
|
||||
def _get_from_database(
|
||||
self,
|
||||
session: Session,
|
||||
@@ -293,9 +318,28 @@ class StatisticsMetaManager:
|
||||
return results
|
||||
|
||||
# Fetch metadata from the database
|
||||
return results | self._get_from_database(
|
||||
session, statistic_ids=missing_statistic_id
|
||||
)
|
||||
results |= self._get_from_database(session, statistic_ids=missing_statistic_id)
|
||||
|
||||
# Check pending renames for any statistic_ids still not resolved.
|
||||
# If a statistic_id was renamed but the database hasn't been
|
||||
# updated yet, resolve the new statistic_id using the old one.
|
||||
if self.recorder.thread_id == threading.get_ident() and (
|
||||
pending_rename := self._pending_rename
|
||||
):
|
||||
for statistic_id in missing_statistic_id:
|
||||
if (
|
||||
statistic_id not in results
|
||||
and (old_id := pending_rename.get(statistic_id)) is not None
|
||||
):
|
||||
# Try cache first, then database for the old statistic_id
|
||||
if id_meta := self._stat_id_to_id_meta.get(old_id):
|
||||
results[statistic_id] = id_meta
|
||||
elif db_result := self._get_from_database(
|
||||
session, statistic_ids={old_id}
|
||||
):
|
||||
results[statistic_id] = next(iter(db_result.values()))
|
||||
|
||||
return results
|
||||
|
||||
def get_from_cache_threadsafe(
|
||||
self, statistic_ids: set[str]
|
||||
@@ -377,6 +421,9 @@ class StatisticsMetaManager:
|
||||
recorder thread.
|
||||
"""
|
||||
self._assert_in_recorder_thread()
|
||||
# Clear the pending rename before the collision check so
|
||||
# get() doesn't resolve new_statistic_id via the side channel.
|
||||
self._pending_rename.pop(new_statistic_id, None)
|
||||
if self.get(session, new_statistic_id):
|
||||
_LOGGER.error(
|
||||
"Cannot rename statistic_id `%s` to `%s` because the new statistic_id is already in use",
|
||||
|
||||
@@ -82,6 +82,7 @@ class UpdateStatisticsMetadataTask(RecorderTask):
|
||||
|
||||
def run(self, instance: Recorder) -> None:
|
||||
"""Handle the task."""
|
||||
instance.statistics_meta_manager.drain_pending_renames()
|
||||
statistics.update_statistics_metadata(
|
||||
instance,
|
||||
self.statistic_id,
|
||||
@@ -102,6 +103,7 @@ class UpdateStatesMetadataTask(RecorderTask):
|
||||
|
||||
def run(self, instance: Recorder) -> None:
|
||||
"""Handle the task."""
|
||||
instance.states_meta_manager.drain_pending_renames()
|
||||
entity_registry.update_states_metadata(
|
||||
instance,
|
||||
self.entity_id,
|
||||
@@ -169,6 +171,11 @@ class StatisticsTask(RecorderTask):
|
||||
|
||||
def run(self, instance: Recorder) -> None:
|
||||
"""Run statistics task."""
|
||||
# Drain any pending entity_id/statistic_id renames so the
|
||||
# compilation can resolve new ids that the database doesn't
|
||||
# know about yet.
|
||||
instance.states_meta_manager.drain_pending_renames()
|
||||
instance.statistics_meta_manager.drain_pending_renames()
|
||||
if statistics.compile_statistics(instance, self.start, self.fire_events):
|
||||
return
|
||||
# Schedule a new statistics task if this one didn't finish
|
||||
@@ -181,6 +188,8 @@ class CompileMissingStatisticsTask(RecorderTask):
|
||||
|
||||
def run(self, instance: Recorder) -> None:
|
||||
"""Run statistics task to compile missing statistics."""
|
||||
instance.states_meta_manager.drain_pending_renames()
|
||||
instance.statistics_meta_manager.drain_pending_renames()
|
||||
if statistics.compile_missing_statistics(instance):
|
||||
return
|
||||
# Schedule a new statistics task if this one didn't finish
|
||||
|
||||
Reference in New Issue
Block a user