This commit is contained in:
J. Nick Koston
2023-03-10 08:54:40 -10:00
parent 532f26bc5b
commit 81464050dd
4 changed files with 49 additions and 0 deletions

View File

@@ -164,6 +164,7 @@ class EventTypeManager:
"""Initialize the event manager.""" """Initialize the event manager."""
self._id_map: dict[str, int] = LRU(EVENT_DATA_ID_CACHE_SIZE) self._id_map: dict[str, int] = LRU(EVENT_DATA_ID_CACHE_SIZE)
self._pending: dict[str, EventTypes] = {} self._pending: dict[str, EventTypes] = {}
self.active = False
def load(self, events: list[Event], session: Session) -> None: def load(self, events: list[Event], session: Session) -> None:
"""Load the event types into memory.""" """Load the event types into memory."""
@@ -222,6 +223,11 @@ class EventTypeManager:
self._id_map.clear() self._id_map.clear()
self._pending.clear() self._pending.clear()
def evict_purged(self, event_types: Iterable[str]) -> None:
"""Evict purged event types."""
for event_type in event_types:
self._id_map.pop(event_type, None)
class Recorder(threading.Thread): class Recorder(threading.Thread):
"""A threaded recorder class.""" """A threaded recorder class."""

View File

@@ -1341,6 +1341,7 @@ def migrate_event_type_ids(instance: Recorder) -> bool:
_drop_index( _drop_index(
session_maker, "events", "ix_events_event_type_timed_fired_ts", quiet=True session_maker, "events", "ix_events_event_type_timed_fired_ts", quiet=True
) )
instance.event_type_manager.active = True
_LOGGER.debug("Migrating event_types done=%s", is_done) _LOGGER.debug("Migrating event_types done=%s", is_done)
return is_done return is_done

View File

@@ -24,12 +24,14 @@ from .queries import (
data_ids_exist_in_events_with_fast_in_distinct, data_ids_exist_in_events_with_fast_in_distinct,
delete_event_data_rows, delete_event_data_rows,
delete_event_rows, delete_event_rows,
delete_event_types_rows,
delete_recorder_runs_rows, delete_recorder_runs_rows,
delete_states_attributes_rows, delete_states_attributes_rows,
delete_states_rows, delete_states_rows,
delete_statistics_runs_rows, delete_statistics_runs_rows,
delete_statistics_short_term_rows, delete_statistics_short_term_rows,
disconnect_states_rows, disconnect_states_rows,
find_event_types_to_purge,
find_events_to_purge, find_events_to_purge,
find_latest_statistics_runs_run_id, find_latest_statistics_runs_run_id,
find_legacy_event_state_and_attributes_and_data_ids_to_purge, find_legacy_event_state_and_attributes_and_data_ids_to_purge,
@@ -110,6 +112,7 @@ def purge_old_data(
return False return False
_purge_old_recorder_runs(instance, session, purge_before) _purge_old_recorder_runs(instance, session, purge_before)
_purge_old_event_types(instance, session)
if repack: if repack:
repack_database(instance) repack_database(instance)
return True return True
@@ -564,6 +567,22 @@ def _purge_old_recorder_runs(
_LOGGER.debug("Deleted %s recorder_runs", deleted_rows) _LOGGER.debug("Deleted %s recorder_runs", deleted_rows)
def _purge_old_event_types(instance: Recorder, session: Session) -> None:
"""Purge all old event types."""
# Event types is small, no need to batch run it
purged_event_types = set()
event_type_ids = set()
for event_type_id, event_type in session.execute(find_event_types_to_purge()):
purged_event_types.add(event_type)
event_type_ids.add(event_type_id)
deleted_rows = session.execute(delete_event_types_rows(event_type_ids))
_LOGGER.debug("Deleted %s event types", deleted_rows)
# Evict any entries in the event_type cache referring to a purged state
instance.event_type_manager.evict_purged(purged_event_types)
def _purge_filtered_data(instance: Recorder, session: Session) -> bool: def _purge_filtered_data(instance: Recorder, session: Session) -> bool:
"""Remove filtered states and events that shouldn't be in the database.""" """Remove filtered states and events that shouldn't be in the database."""
_LOGGER.debug("Cleanup filtered data") _LOGGER.debug("Cleanup filtered data")

View File

@@ -737,3 +737,26 @@ def find_states_context_ids_to_migrate() -> StatementLambdaElement:
.filter(States.context_id_bin.is_(None)) .filter(States.context_id_bin.is_(None))
.limit(SQLITE_MAX_BIND_VARS) .limit(SQLITE_MAX_BIND_VARS)
) )
def find_event_types_to_purge() -> StatementLambdaElement:
"""Find event_type_ids to purge."""
return lambda_stmt(
lambda: select(EventTypes.event_type_id, EventTypes.event_type).where(
EventTypes.event_type_id
== (
select(
distinct(Events.event_type_id).label("unused_event_type_id")
).filter(Events.event_type_id.not_in(select(EventTypes.event_type_id)))
).c.unused_event_type_id
)
)
def delete_event_types_rows(event_type_ids: Iterable[int]) -> StatementLambdaElement:
"""Delete EventTypes rows."""
return lambda_stmt(
lambda: delete(EventTypes)
.where(EventTypes.event_type_id.in_(event_type_ids))
.execution_options(synchronize_session=False)
)