diff --git a/homeassistant/components/logbook/queries/common.py b/homeassistant/components/logbook/queries/common.py index a0c8ddbdda2..8645c8f68cb 100644 --- a/homeassistant/components/logbook/queries/common.py +++ b/homeassistant/components/logbook/queries/common.py @@ -17,10 +17,12 @@ from homeassistant.components.recorder.db_schema import ( STATES_CONTEXT_ID_BIN_INDEX, EventData, Events, + EventTypes, StateAttributes, States, ) from homeassistant.components.recorder.filters import like_domain_matchers +from homeassistant.components.recorder.queries import select_event_type_ids from ..const import ALWAYS_CONTINUOUS_DOMAINS, CONDITIONALLY_CONTINUOUS_DOMAINS @@ -44,7 +46,7 @@ PSEUDO_EVENT_STATE_CHANGED: Final = None EVENT_COLUMNS = ( Events.event_id.label("event_id"), - Events.event_type.label("event_type"), + EventTypes.event_type.label("event_type"), Events.event_data.label("event_data"), Events.time_fired_ts.label("time_fired_ts"), Events.context_id_bin.label("context_id_bin"), @@ -115,7 +117,8 @@ def select_events_context_id_subquery( return ( select(Events.context_id_bin) .where((Events.time_fired_ts > start_day) & (Events.time_fired_ts < end_day)) - .where(Events.event_type.in_(event_types)) + .where(Events.event_type_id.in_(select_event_type_ids(event_types))) + .outerjoin(EventTypes, (Events.event_type_id == EventTypes.event_type_id)) .outerjoin(EventData, (Events.data_id == EventData.data_id)) ) @@ -147,7 +150,8 @@ def select_events_without_states( return ( select(*EVENT_ROWS_NO_STATES, NOT_CONTEXT_ONLY) .where((Events.time_fired_ts > start_day) & (Events.time_fired_ts < end_day)) - .where(Events.event_type.in_(event_types)) + .where(Events.event_type_id.in_(select_event_type_ids(event_types))) + .outerjoin(EventTypes, (Events.event_type_id == EventTypes.event_type_id)) .outerjoin(EventData, (Events.data_id == EventData.data_id)) ) @@ -182,6 +186,7 @@ def legacy_select_events_context_id( .outerjoin( StateAttributes, (States.attributes_id == StateAttributes.attributes_id) ) + .outerjoin(EventTypes, (Events.event_type_id == EventTypes.event_type_id)) .where((Events.time_fired_ts > start_day) & (Events.time_fired_ts < end_day)) .where(Events.context_id_bin == context_id_bin) ) diff --git a/homeassistant/components/logbook/queries/devices.py b/homeassistant/components/logbook/queries/devices.py index d84a5343108..687c48b8921 100644 --- a/homeassistant/components/logbook/queries/devices.py +++ b/homeassistant/components/logbook/queries/devices.py @@ -13,6 +13,7 @@ from homeassistant.components.recorder.db_schema import ( DEVICE_ID_IN_EVENT, EventData, Events, + EventTypes, States, ) @@ -60,7 +61,9 @@ def _apply_devices_context_union( select_events_context_only() .select_from(devices_cte) .outerjoin(Events, devices_cte.c.context_id_bin == Events.context_id_bin) - ).outerjoin(EventData, (Events.data_id == EventData.data_id)), + .outerjoin(EventTypes, (Events.event_type_id == EventTypes.event_type_id)) + .outerjoin(EventData, (Events.data_id == EventData.data_id)), + ), apply_states_context_hints( select_states_context_only() .select_from(devices_cte) diff --git a/homeassistant/components/logbook/queries/entities.py b/homeassistant/components/logbook/queries/entities.py index 10ca6fad134..e0ae32b6694 100644 --- a/homeassistant/components/logbook/queries/entities.py +++ b/homeassistant/components/logbook/queries/entities.py @@ -15,6 +15,7 @@ from homeassistant.components.recorder.db_schema import ( OLD_ENTITY_ID_IN_EVENT, EventData, Events, + EventTypes, States, ) @@ -78,7 +79,9 @@ def _apply_entities_context_union( select_events_context_only() .select_from(entities_cte) .outerjoin(Events, entities_cte.c.context_id_bin == Events.context_id_bin) - ).outerjoin(EventData, (Events.data_id == EventData.data_id)), + .outerjoin(EventTypes, (Events.event_type_id == EventTypes.event_type_id)) + .outerjoin(EventData, (Events.data_id == EventData.data_id)) + ), apply_states_context_hints( select_states_context_only() .select_from(entities_cte) diff --git a/homeassistant/components/logbook/queries/entities_and_devices.py b/homeassistant/components/logbook/queries/entities_and_devices.py index b4a1c7bc9f8..677feddda84 100644 --- a/homeassistant/components/logbook/queries/entities_and_devices.py +++ b/homeassistant/components/logbook/queries/entities_and_devices.py @@ -8,7 +8,12 @@ from sqlalchemy.sql.elements import ColumnElement from sqlalchemy.sql.lambdas import StatementLambdaElement from sqlalchemy.sql.selectable import CTE, CompoundSelect, Select -from homeassistant.components.recorder.db_schema import EventData, Events, States +from homeassistant.components.recorder.db_schema import ( + EventData, + Events, + EventTypes, + States, +) from .common import ( apply_events_context_hints, @@ -80,7 +85,9 @@ def _apply_entities_devices_context_union( .outerjoin( Events, devices_entities_cte.c.context_id_bin == Events.context_id_bin ) - ).outerjoin(EventData, (Events.data_id == EventData.data_id)), + .outerjoin(EventTypes, (Events.event_type_id == EventTypes.event_type_id)) + .outerjoin(EventData, (Events.data_id == EventData.data_id)), + ), apply_states_context_hints( select_states_context_only() .select_from(devices_entities_cte) diff --git a/homeassistant/components/recorder/core.py b/homeassistant/components/recorder/core.py index 7e3f08d7abd..97d72c7f85c 100644 --- a/homeassistant/components/recorder/core.py +++ b/homeassistant/components/recorder/core.py @@ -61,6 +61,7 @@ from .db_schema import ( Base, EventData, Events, + EventTypes, StateAttributes, States, Statistics, @@ -81,8 +82,10 @@ from .queries import ( find_shared_data_id, get_shared_attributes, get_shared_event_datas, + has_event_type_to_migrate, ) from .run_history import RunHistory +from .table_managers.event_types import EventTypeManager from .tasks import ( AdjustLRUSizeTask, AdjustStatisticsTask, @@ -92,6 +95,7 @@ from .tasks import ( ContextIDMigrationTask, DatabaseLockTask, EventTask, + EventTypeIDMigrationTask, ImportStatisticsTask, KeepAliveTask, PerodicCleanupTask, @@ -135,6 +139,7 @@ EXPIRE_AFTER_COMMITS = 120 STATE_ATTRIBUTES_ID_CACHE_SIZE = 2048 EVENT_DATA_ID_CACHE_SIZE = 2048 + SHUTDOWN_TASK = object() COMMIT_TASK = CommitTask() @@ -209,6 +214,7 @@ class Recorder(threading.Thread): self._old_states: dict[str | None, States] = {} self._state_attributes_ids: LRU = LRU(STATE_ATTRIBUTES_ID_CACHE_SIZE) self._event_data_ids: LRU = LRU(EVENT_DATA_ID_CACHE_SIZE) + self.event_type_manager = EventTypeManager() self._pending_state_attributes: dict[str, StateAttributes] = {} self._pending_event_data: dict[str, EventData] = {} self._pending_expunge: list[States] = [] @@ -688,10 +694,26 @@ class Recorder(threading.Thread): _LOGGER.debug("Recorder processing the queue") self._adjust_lru_size() self.hass.add_job(self._async_set_recorder_ready_migration_done) - self.queue_task(ContextIDMigrationTask()) + self._activate_table_managers_or_migrate() self._run_event_loop() self._shutdown() + def _activate_table_managers_or_migrate(self) -> None: + """Activate the table managers or schedule migrations.""" + # Currently we always check if context ids need to be migrated + # since there are multiple tables. This could be optimized + # to check both the states and events table to see if there + # are any missing and avoid inserting the task but it currently + # is not needed since there is no dependent code branching + # on the result of the migration. + self.queue_task(ContextIDMigrationTask()) + with session_scope(session=self.get_session()) as session: + if session.execute(has_event_type_to_migrate()).scalar(): + self.queue_task(EventTypeIDMigrationTask()) + else: + _LOGGER.debug("Activating event type manager as all data is migrated") + self.event_type_manager.active = True + def _run_event_loop(self) -> None: """Run the event loop for the recorder.""" # Use a session for the event read loop @@ -724,8 +746,10 @@ class Recorder(threading.Thread): else: non_state_change_events.append(event_) + assert self.event_session is not None self._pre_process_state_change_events(state_change_events) self._pre_process_non_state_change_events(non_state_change_events) + self.event_type_manager.load(non_state_change_events, self.event_session) def _pre_process_state_change_events(self, events: list[Event]) -> None: """Load startup state attributes from the database. @@ -944,13 +968,30 @@ class Recorder(threading.Thread): def _process_non_state_changed_event_into_session(self, event: Event) -> None: """Process any event into the session except state changed.""" - assert self.event_session is not None + event_session = self.event_session + assert event_session is not None dbevent = Events.from_event(event) + + # Map the event_type to the EventTypes table + event_type_manager = self.event_type_manager + if pending_event_types := event_type_manager.get_pending(event.event_type): + dbevent.event_type_rel = pending_event_types + elif event_type_id := event_type_manager.get(event.event_type, event_session): + dbevent.event_type_id = event_type_id + else: + event_types = EventTypes(event_type=event.event_type) + event_type_manager.add_pending(event_types) + event_session.add(event_types) + dbevent.event_type_rel = event_types + if not event.data: - self.event_session.add(dbevent) + event_session.add(dbevent) return + if not (shared_data_bytes := self._serialize_event_data_from_event(event)): return + + # Map the event data to the EventData table shared_data = shared_data_bytes.decode("utf-8") # Matching attributes found in the pending commit if pending_event_data := self._pending_event_data.get(shared_data): @@ -969,9 +1010,9 @@ class Recorder(threading.Thread): dbevent.event_data_rel = self._pending_event_data[ shared_data ] = dbevent_data - self.event_session.add(dbevent_data) + event_session.add(dbevent_data) - self.event_session.add(dbevent) + event_session.add(dbevent) def _serialize_state_attributes_from_event(self, event: Event) -> bytes | None: """Serialize state changed event data.""" @@ -1096,6 +1137,7 @@ class Recorder(threading.Thread): for event_data in self._pending_event_data.values(): self._event_data_ids[event_data.shared_data] = event_data.data_id self._pending_event_data = {} + self.event_type_manager.post_commit_pending() # Expire is an expensive operation (frequently more expensive # than the flush and commit itself) so we only @@ -1122,6 +1164,7 @@ class Recorder(threading.Thread): self._event_data_ids.clear() self._pending_state_attributes.clear() self._pending_event_data.clear() + self.event_type_manager.reset() if not self.event_session: return @@ -1152,6 +1195,10 @@ class Recorder(threading.Thread): """Migrate context ids if needed.""" return migration.migrate_context_ids(self) + def _migrate_event_type_ids(self) -> bool: + """Migrate event type ids if needed.""" + return migration.migrate_event_type_ids(self) + def _send_keep_alive(self) -> None: """Send a keep alive to keep the db connection open.""" assert self.event_session is not None diff --git a/homeassistant/components/recorder/db_schema.py b/homeassistant/components/recorder/db_schema.py index 01b15dd3781..9499e9d4e31 100644 --- a/homeassistant/components/recorder/db_schema.py +++ b/homeassistant/components/recorder/db_schema.py @@ -68,12 +68,13 @@ class Base(DeclarativeBase): """Base class for tables.""" -SCHEMA_VERSION = 36 +SCHEMA_VERSION = 37 _LOGGER = logging.getLogger(__name__) TABLE_EVENTS = "events" TABLE_EVENT_DATA = "event_data" +TABLE_EVENT_TYPES = "event_types" TABLE_STATES = "states" TABLE_STATE_ATTRIBUTES = "state_attributes" TABLE_RECORDER_RUNS = "recorder_runs" @@ -93,6 +94,7 @@ ALL_TABLES = [ TABLE_STATE_ATTRIBUTES, TABLE_EVENTS, TABLE_EVENT_DATA, + TABLE_EVENT_TYPES, TABLE_RECORDER_RUNS, TABLE_SCHEMA_CHANGES, TABLE_STATISTICS, @@ -176,7 +178,9 @@ class Events(Base): __table_args__ = ( # Used for fetching events at a specific time # see logbook - Index("ix_events_event_type_time_fired_ts", "event_type", "time_fired_ts"), + Index( + "ix_events_event_type_id_time_fired_ts", "event_type_id", "time_fired_ts" + ), Index( EVENTS_CONTEXT_ID_BIN_INDEX, "context_id_bin", @@ -187,7 +191,9 @@ class Events(Base): ) __tablename__ = TABLE_EVENTS event_id: Mapped[int] = mapped_column(Integer, Identity(), primary_key=True) - event_type: Mapped[str | None] = mapped_column(String(MAX_LENGTH_EVENT_EVENT_TYPE)) + event_type: Mapped[str | None] = mapped_column( + String(MAX_LENGTH_EVENT_EVENT_TYPE) + ) # no longer used event_data: Mapped[str | None] = mapped_column( Text().with_variant(mysql.LONGTEXT, "mysql", "mariadb") ) @@ -220,13 +226,17 @@ class Events(Base): context_parent_id_bin: Mapped[bytes | None] = mapped_column( LargeBinary(CONTEXT_ID_BIN_MAX_LENGTH) ) + event_type_id: Mapped[int | None] = mapped_column( + Integer, ForeignKey("event_types.event_type_id"), index=True + ) event_data_rel: Mapped[EventData | None] = relationship("EventData") + event_type_rel: Mapped[EventTypes | None] = relationship("EventTypes") def __repr__(self) -> str: """Return string representation of instance for debugging.""" return ( "" ) @@ -247,7 +257,7 @@ class Events(Base): def from_event(event: Event) -> Events: """Create an event database object from a native event.""" return Events( - event_type=event.event_type, + event_type=None, event_data=None, origin_idx=EVENT_ORIGIN_TO_IDX.get(event.origin), time_fired=None, @@ -330,6 +340,23 @@ class EventData(Base): return {} +class EventTypes(Base): + """Event type history.""" + + __table_args__ = (_DEFAULT_TABLE_ARGS,) + __tablename__ = TABLE_EVENT_TYPES + event_type_id: Mapped[int] = mapped_column(Integer, Identity(), primary_key=True) + event_type: Mapped[str | None] = mapped_column(String(MAX_LENGTH_EVENT_EVENT_TYPE)) + + def __repr__(self) -> str: + """Return string representation of instance for debugging.""" + return ( + "" + ) + + class States(Base): """State change history.""" diff --git a/homeassistant/components/recorder/migration.py b/homeassistant/components/recorder/migration.py index a5ff110e57e..e7a34f22fcc 100644 --- a/homeassistant/components/recorder/migration.py +++ b/homeassistant/components/recorder/migration.py @@ -35,6 +35,7 @@ from .db_schema import ( TABLE_STATES, Base, Events, + EventTypes, SchemaChanges, States, Statistics, @@ -44,6 +45,7 @@ from .db_schema import ( ) from .models import process_timestamp from .queries import ( + find_event_type_to_migrate, find_events_context_ids_to_migrate, find_states_context_ids_to_migrate, ) @@ -978,6 +980,11 @@ def _apply_update( # noqa: C901 ) _create_index(session_maker, "events", "ix_events_context_id_bin") _create_index(session_maker, "states", "ix_states_context_id_bin") + elif new_version == 37: + _add_columns(session_maker, "events", [f"event_type_id {big_int}"]) + _create_index(session_maker, "events", "ix_events_event_type_id") + _drop_index(session_maker, "events", "ix_events_event_type_time_fired_ts") + _create_index(session_maker, "events", "ix_events_event_type_id_time_fired_ts") else: raise ValueError(f"No schema migration defined for version {new_version}") @@ -1288,6 +1295,57 @@ def migrate_context_ids(instance: Recorder) -> bool: return is_done +def migrate_event_type_ids(instance: Recorder) -> bool: + """Migrate event_type to event_type_ids.""" + session_maker = instance.get_session + _LOGGER.debug("Migrating event_types") + event_type_manager = instance.event_type_manager + with session_scope(session=session_maker()) as session: + if events := session.execute(find_event_type_to_migrate()).all(): + event_types = {event_type for _, event_type in events} + event_type_to_id = event_type_manager.get_many(event_types, session) + if missing_event_types := { + event_type + for event_type, event_id in event_type_to_id.items() + if event_id is None + }: + missing_db_event_types = [ + EventTypes(event_type=event_type) + for event_type in missing_event_types + ] + session.add_all(missing_db_event_types) + session.flush() # Assign ids + for db_event_type in missing_db_event_types: + # We cannot add the assigned ids to the event_type_manager + # because the commit could get rolled back + assert db_event_type.event_type is not None + event_type_to_id[ + db_event_type.event_type + ] = db_event_type.event_type_id + + session.execute( + update(Events), + [ + { + "event_id": event_id, + "event_type": None, + "event_type_id": event_type_to_id[event_type], + } + for event_id, event_type in events + ], + ) + + # If there is more work to do return False + # so that we can be called again + is_done = not events + + if is_done: + instance.event_type_manager.active = True + + _LOGGER.debug("Migrating event_types done=%s", is_done) + return is_done + + def _initialize_database(session: Session) -> bool: """Initialize a new database. diff --git a/homeassistant/components/recorder/purge.py b/homeassistant/components/recorder/purge.py index 7ae63ef026b..368a6ccdf1c 100644 --- a/homeassistant/components/recorder/purge.py +++ b/homeassistant/components/recorder/purge.py @@ -24,12 +24,14 @@ from .queries import ( data_ids_exist_in_events_with_fast_in_distinct, delete_event_data_rows, delete_event_rows, + delete_event_types_rows, delete_recorder_runs_rows, delete_states_attributes_rows, delete_states_rows, delete_statistics_runs_rows, delete_statistics_short_term_rows, disconnect_states_rows, + find_event_types_to_purge, find_events_to_purge, find_latest_statistics_runs_run_id, find_legacy_event_state_and_attributes_and_data_ids_to_purge, @@ -109,6 +111,11 @@ def purge_old_data( _LOGGER.debug("Cleanup filtered data hasn't fully completed yet") return False + # This purge cycle is finished, clean up old event types and + # recorder runs + if instance.event_type_manager.active: + _purge_old_event_types(instance, session) + _purge_old_recorder_runs(instance, session, purge_before) if repack: repack_database(instance) @@ -564,6 +571,25 @@ def _purge_old_recorder_runs( _LOGGER.debug("Deleted %s recorder_runs", deleted_rows) +def _purge_old_event_types(instance: Recorder, session: Session) -> None: + """Purge all old event types.""" + # Event types is small, no need to batch run it + purge_event_types = set() + event_type_ids = set() + for event_type_id, event_type in session.execute(find_event_types_to_purge()): + purge_event_types.add(event_type) + event_type_ids.add(event_type_id) + + if not event_type_ids: + return + + deleted_rows = session.execute(delete_event_types_rows(event_type_ids)) + _LOGGER.debug("Deleted %s event types", deleted_rows) + + # Evict any entries in the event_type cache referring to a purged state + instance.event_type_manager.evict_purged(purge_event_types) + + def _purge_filtered_data(instance: Recorder, session: Session) -> bool: """Remove filtered states and events that shouldn't be in the database.""" _LOGGER.debug("Cleanup filtered data") diff --git a/homeassistant/components/recorder/queries.py b/homeassistant/components/recorder/queries.py index 217b7ed11bb..d0672e61581 100644 --- a/homeassistant/components/recorder/queries.py +++ b/homeassistant/components/recorder/queries.py @@ -12,6 +12,7 @@ from .const import SQLITE_MAX_BIND_VARS from .db_schema import ( EventData, Events, + EventTypes, RecorderRuns, StateAttributes, States, @@ -20,6 +21,17 @@ from .db_schema import ( ) +def select_event_type_ids(event_types: tuple[str, ...]) -> Select: + """Generate a select for event type ids. + + This query is intentionally not a lambda statement as it is used inside + other lambda statements. + """ + return select(EventTypes.event_type_id).where( + EventTypes.event_type.in_(event_types) + ) + + def get_shared_attributes(hashes: list[int]) -> StatementLambdaElement: """Load shared attributes from the database.""" return lambda_stmt( @@ -38,6 +50,15 @@ def get_shared_event_datas(hashes: list[int]) -> StatementLambdaElement: ) +def find_event_type_ids(event_types: Iterable[str]) -> StatementLambdaElement: + """Find an event_type id by event_type.""" + return lambda_stmt( + lambda: select(EventTypes.event_type_id, EventTypes.event_type).filter( + EventTypes.event_type.in_(event_types) + ) + ) + + def find_shared_attributes_id( data_hash: int, shared_attrs: str ) -> StatementLambdaElement: @@ -683,6 +704,25 @@ def find_events_context_ids_to_migrate() -> StatementLambdaElement: ) +def find_event_type_to_migrate() -> StatementLambdaElement: + """Find events event_type to migrate.""" + return lambda_stmt( + lambda: select( + Events.event_id, + Events.event_type, + ) + .filter(Events.event_type_id.is_(None)) + .limit(SQLITE_MAX_BIND_VARS) + ) + + +def has_event_type_to_migrate() -> StatementLambdaElement: + """Check if there are event_types to migrate.""" + return lambda_stmt( + lambda: select(Events.event_id).filter(Events.event_type_id.is_(None)).limit(1) + ) + + def find_states_context_ids_to_migrate() -> StatementLambdaElement: """Find events context_ids to migrate.""" return lambda_stmt( @@ -695,3 +735,29 @@ def find_states_context_ids_to_migrate() -> StatementLambdaElement: .filter(States.context_id_bin.is_(None)) .limit(SQLITE_MAX_BIND_VARS) ) + + +def find_event_types_to_purge() -> StatementLambdaElement: + """Find event_type_ids to purge.""" + return lambda_stmt( + lambda: select(EventTypes.event_type_id, EventTypes.event_type).where( + EventTypes.event_type_id.not_in( + select(EventTypes.event_type_id).join( + used_event_type_ids := select( + distinct(Events.event_type_id).label("used_event_type_id") + ).subquery(), + EventTypes.event_type_id + == used_event_type_ids.c.used_event_type_id, + ) + ) + ) + ) + + +def delete_event_types_rows(event_type_ids: Iterable[int]) -> StatementLambdaElement: + """Delete EventTypes rows.""" + return lambda_stmt( + lambda: delete(EventTypes) + .where(EventTypes.event_type_id.in_(event_type_ids)) + .execution_options(synchronize_session=False) + ) diff --git a/homeassistant/components/recorder/table_managers/event_types.py b/homeassistant/components/recorder/table_managers/event_types.py new file mode 100644 index 00000000000..15dfff28b88 --- /dev/null +++ b/homeassistant/components/recorder/table_managers/event_types.py @@ -0,0 +1,87 @@ +"""Support managing EventTypes.""" +from __future__ import annotations + +from collections.abc import Iterable +from typing import cast + +from lru import LRU # pylint: disable=no-name-in-module +from sqlalchemy.orm.session import Session + +from homeassistant.core import Event + +from ..db_schema import EventTypes +from ..queries import find_event_type_ids + +CACHE_SIZE = 2048 + + +class EventTypeManager: + """Manage the EventTypes table.""" + + def __init__(self) -> None: + """Initialize the event type manager.""" + self._id_map: dict[str, int] = LRU(CACHE_SIZE) + self._pending: dict[str, EventTypes] = {} + self.active = False + + def load(self, events: list[Event], session: Session) -> None: + """Load the event_type to event_type_ids mapping into memory.""" + self.get_many( + (event.event_type for event in events if event.event_type is not None), + session, + ) + + def get(self, event_type: str, session: Session) -> int | None: + """Resolve event_type to the event_type_id.""" + return self.get_many((event_type,), session)[event_type] + + def get_many( + self, event_types: Iterable[str], session: Session + ) -> dict[str, int | None]: + """Resolve event_types to event_type_ids.""" + results: dict[str, int | None] = {} + missing: list[str] = [] + for event_type in event_types: + if (event_type_id := self._id_map.get(event_type)) is None: + missing.append(event_type) + + results[event_type] = event_type_id + + if not missing: + return results + + with session.no_autoflush: + for event_type_id, event_type in session.execute( + find_event_type_ids(missing) + ): + results[event_type] = self._id_map[event_type] = cast( + int, event_type_id + ) + + return results + + def get_pending(self, event_type: str) -> EventTypes | None: + """Get pending EventTypes that have not be assigned ids yet.""" + return self._pending.get(event_type) + + def add_pending(self, db_event_type: EventTypes) -> None: + """Add a pending EventTypes that will be committed at the next interval.""" + assert db_event_type.event_type is not None + event_type: str = db_event_type.event_type + self._pending[event_type] = db_event_type + + def post_commit_pending(self) -> None: + """Call after commit to load the event_type_ids of the new EventTypes into the LRU.""" + for event_type, db_event_types in self._pending.items(): + self._id_map[event_type] = db_event_types.event_type_id + self._pending.clear() + + def reset(self) -> None: + """Reset the event manager after the database has been reset or changed.""" + self._id_map.clear() + self._pending.clear() + + def evict_purged(self, event_types: Iterable[str]) -> None: + """Evict purged event_types from the cache when they are no longer used.""" + for event_type in event_types: + self._id_map.pop(event_type, None) diff --git a/homeassistant/components/recorder/tasks.py b/homeassistant/components/recorder/tasks.py index 37a02772572..81a105742b4 100644 --- a/homeassistant/components/recorder/tasks.py +++ b/homeassistant/components/recorder/tasks.py @@ -356,3 +356,19 @@ class ContextIDMigrationTask(RecorderTask): if not instance._migrate_context_ids(): # pylint: disable=[protected-access] # Schedule a new migration task if this one didn't finish instance.queue_task(ContextIDMigrationTask()) + + +@dataclass +class EventTypeIDMigrationTask(RecorderTask): + """An object to insert into the recorder queue to migrate event type ids.""" + + commit_before = True + # We have to commit before to make sure there are + # no new pending event_types about to be added to + # the db since this happens live + + def run(self, instance: Recorder) -> None: + """Run event type id migration task.""" + if not instance._migrate_event_type_ids(): # pylint: disable=[protected-access] + # Schedule a new migration task if this one didn't finish + instance.queue_task(EventTypeIDMigrationTask()) diff --git a/tests/components/history/test_init_db_schema_30.py b/tests/components/history/test_init_db_schema_30.py index 392c06f8433..7c1b7a5e97b 100644 --- a/tests/components/history/test_init_db_schema_30.py +++ b/tests/components/history/test_init_db_schema_30.py @@ -69,7 +69,9 @@ def db_schema_30(): with patch.object(recorder, "db_schema", old_db_schema), patch.object( recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION - ), patch.object(core, "EventData", old_db_schema.EventData), patch.object( + ), patch.object(core, "EventTypes", old_db_schema.EventTypes), patch.object( + core, "EventData", old_db_schema.EventData + ), patch.object( core, "States", old_db_schema.States ), patch.object( core, "Events", old_db_schema.Events diff --git a/tests/components/recorder/db_schema_23_with_newer_columns.py b/tests/components/recorder/db_schema_23_with_newer_columns.py index 0cd3f414901..c8c87ca82dd 100644 --- a/tests/components/recorder/db_schema_23_with_newer_columns.py +++ b/tests/components/recorder/db_schema_23_with_newer_columns.py @@ -69,10 +69,12 @@ TABLE_STATISTICS_META = "statistics_meta" TABLE_STATISTICS_RUNS = "statistics_runs" TABLE_STATISTICS_SHORT_TERM = "statistics_short_term" TABLE_EVENT_DATA = "event_data" +TABLE_EVENT_TYPES = "event_types" ALL_TABLES = [ TABLE_STATES, TABLE_EVENTS, + TABLE_EVENT_TYPES, TABLE_RECORDER_RUNS, TABLE_SCHEMA_CHANGES, TABLE_STATISTICS, @@ -141,9 +143,13 @@ class Events(Base): # type: ignore context_parent_id_bin = Column( LargeBinary(CONTEXT_ID_BIN_MAX_LENGTH) ) # *** Not originally in v23, only added for recorder to startup ok + event_type_id = Column( + Integer, ForeignKey("event_types.event_type_id"), index=True + ) # *** Not originally in v23, only added for recorder to startup ok event_data_rel = relationship( "EventData" ) # *** Not originally in v23, only added for recorder to startup ok + event_type_rel = relationship("EventTypes") def __repr__(self) -> str: """Return string representation of instance for debugging.""" @@ -204,6 +210,19 @@ class EventData(Base): # type: ignore[misc,valid-type] shared_data = Column(Text().with_variant(mysql.LONGTEXT, "mysql")) +# *** Not originally in v23, only added for recorder to startup ok +# This is not being tested by the v23 statistics migration tests +class EventTypes(Base): # type: ignore[misc,valid-type] + """Event type history.""" + + __table_args__ = ( + {"mysql_default_charset": "utf8mb4", "mysql_collate": "utf8mb4_unicode_ci"}, + ) + __tablename__ = TABLE_EVENT_TYPES + event_type_id = Column(Integer, Identity(), primary_key=True) + event_type = Column(String(MAX_LENGTH_EVENT_EVENT_TYPE)) + + class States(Base): # type: ignore """State change history.""" diff --git a/tests/components/recorder/db_schema_28.py b/tests/components/recorder/db_schema_28.py index 422f317a6f1..f7152cec508 100644 --- a/tests/components/recorder/db_schema_28.py +++ b/tests/components/recorder/db_schema_28.py @@ -21,6 +21,7 @@ from sqlalchemy import ( Identity, Index, Integer, + LargeBinary, SmallInteger, String, Text, @@ -54,6 +55,7 @@ DB_TIMEZONE = "+00:00" TABLE_EVENTS = "events" TABLE_EVENT_DATA = "event_data" +TABLE_EVENT_TYPES = "event_types" TABLE_STATES = "states" TABLE_STATE_ATTRIBUTES = "state_attributes" TABLE_RECORDER_RUNS = "recorder_runs" @@ -68,6 +70,7 @@ ALL_TABLES = [ TABLE_STATE_ATTRIBUTES, TABLE_EVENTS, TABLE_EVENT_DATA, + TABLE_EVENT_TYPES, TABLE_RECORDER_RUNS, TABLE_SCHEMA_CHANGES, TABLE_STATISTICS, @@ -98,6 +101,11 @@ DOUBLE_TYPE = ( ) EVENT_ORIGIN_ORDER = [EventOrigin.local, EventOrigin.remote] EVENT_ORIGIN_TO_IDX = {origin: idx for idx, origin in enumerate(EVENT_ORIGIN_ORDER)} +CONTEXT_ID_BIN_MAX_LENGTH = 16 +EVENTS_CONTEXT_ID_BIN_INDEX = "ix_events_context_id_bin" +STATES_CONTEXT_ID_BIN_INDEX = "ix_states_context_id_bin" + +TIMESTAMP_TYPE = DOUBLE_TYPE class Events(Base): # type: ignore[misc,valid-type] @@ -107,6 +115,12 @@ class Events(Base): # type: ignore[misc,valid-type] # Used for fetching events at a specific time # see logbook Index("ix_events_event_type_time_fired", "event_type", "time_fired"), + Index( + EVENTS_CONTEXT_ID_BIN_INDEX, + "context_id_bin", + mysql_length=CONTEXT_ID_BIN_MAX_LENGTH, + mariadb_length=CONTEXT_ID_BIN_MAX_LENGTH, + ), {"mysql_default_charset": "utf8mb4", "mysql_collate": "utf8mb4_unicode_ci"}, ) __tablename__ = TABLE_EVENTS @@ -116,11 +130,27 @@ class Events(Base): # type: ignore[misc,valid-type] origin = Column(String(MAX_LENGTH_EVENT_ORIGIN)) # no longer used origin_idx = Column(SmallInteger) time_fired = Column(DATETIME_TYPE, index=True) + time_fired_ts = Column( + TIMESTAMP_TYPE, index=True + ) # *** Not originally in v30, only added for recorder to startup ok context_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID), index=True) context_user_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID)) context_parent_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID)) data_id = Column(Integer, ForeignKey("event_data.data_id"), index=True) + context_id_bin = Column( + LargeBinary(CONTEXT_ID_BIN_MAX_LENGTH) + ) # *** Not originally in v28, only added for recorder to startup ok + context_user_id_bin = Column( + LargeBinary(CONTEXT_ID_BIN_MAX_LENGTH) + ) # *** Not originally in v28, only added for recorder to startup ok + context_parent_id_bin = Column( + LargeBinary(CONTEXT_ID_BIN_MAX_LENGTH) + ) # *** Not originally in v28, only added for recorder to startup ok + event_type_id = Column( + Integer, ForeignKey("event_types.event_type_id"), index=True + ) # *** Not originally in v28, only added for recorder to startup ok event_data_rel = relationship("EventData") + event_type_rel = relationship("EventTypes") def __repr__(self) -> str: """Return string representation of instance for debugging.""" @@ -214,6 +244,19 @@ class EventData(Base): # type: ignore[misc,valid-type] return {} +# *** Not originally in v28, only added for recorder to startup ok +# This is not being tested by the v28 statistics migration tests +class EventTypes(Base): # type: ignore[misc,valid-type] + """Event type history.""" + + __table_args__ = ( + {"mysql_default_charset": "utf8mb4", "mysql_collate": "utf8mb4_unicode_ci"}, + ) + __tablename__ = TABLE_EVENT_TYPES + event_type_id = Column(Integer, Identity(), primary_key=True) + event_type = Column(String(MAX_LENGTH_EVENT_EVENT_TYPE)) + + class States(Base): # type: ignore[misc,valid-type] """State change history.""" diff --git a/tests/components/recorder/db_schema_30.py b/tests/components/recorder/db_schema_30.py index 7862ad06142..ed9fb89e464 100644 --- a/tests/components/recorder/db_schema_30.py +++ b/tests/components/recorder/db_schema_30.py @@ -64,6 +64,7 @@ _LOGGER = logging.getLogger(__name__) TABLE_EVENTS = "events" TABLE_EVENT_DATA = "event_data" +TABLE_EVENT_TYPES = "event_types" TABLE_STATES = "states" TABLE_STATE_ATTRIBUTES = "state_attributes" TABLE_RECORDER_RUNS = "recorder_runs" @@ -78,6 +79,7 @@ ALL_TABLES = [ TABLE_STATE_ATTRIBUTES, TABLE_EVENTS, TABLE_EVENT_DATA, + TABLE_EVENT_TYPES, TABLE_RECORDER_RUNS, TABLE_SCHEMA_CHANGES, TABLE_STATISTICS, @@ -212,6 +214,9 @@ class Events(Base): # type: ignore[misc,valid-type] origin = Column(String(MAX_LENGTH_EVENT_ORIGIN)) # no longer used for new rows origin_idx = Column(SmallInteger) time_fired = Column(DATETIME_TYPE, index=True) + time_fired_ts = Column( + TIMESTAMP_TYPE, index=True + ) # *** Not originally in v30, only added for recorder to startup ok context_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID), index=True) context_user_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID)) context_parent_id = Column(String(MAX_LENGTH_EVENT_CONTEXT_ID)) @@ -225,7 +230,11 @@ class Events(Base): # type: ignore[misc,valid-type] context_parent_id_bin = Column( LargeBinary(CONTEXT_ID_BIN_MAX_LENGTH) ) # *** Not originally in v30, only added for recorder to startup ok + event_type_id = Column( + Integer, ForeignKey("event_types.event_type_id"), index=True + ) # *** Not originally in v30, only added for recorder to startup ok event_data_rel = relationship("EventData") + event_type_rel = relationship("EventTypes") def __repr__(self) -> str: """Return string representation of instance for debugging.""" @@ -322,6 +331,19 @@ class EventData(Base): # type: ignore[misc,valid-type] return {} +# *** Not originally in v30, only added for recorder to startup ok +# This is not being tested by the v30 statistics migration tests +class EventTypes(Base): # type: ignore[misc,valid-type] + """Event type history.""" + + __table_args__ = ( + {"mysql_default_charset": "utf8mb4", "mysql_collate": "utf8mb4_unicode_ci"}, + ) + __tablename__ = TABLE_EVENT_TYPES + event_type_id = Column(Integer, Identity(), primary_key=True) + event_type = Column(String(MAX_LENGTH_EVENT_EVENT_TYPE)) + + class States(Base): # type: ignore[misc,valid-type] """State change history.""" diff --git a/tests/components/recorder/test_history_db_schema_30.py b/tests/components/recorder/test_history_db_schema_30.py index e0f24b35f97..ae37d50f03b 100644 --- a/tests/components/recorder/test_history_db_schema_30.py +++ b/tests/components/recorder/test_history_db_schema_30.py @@ -65,7 +65,9 @@ def db_schema_30(): with patch.object(recorder, "db_schema", old_db_schema), patch.object( recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION - ), patch.object(core, "EventData", old_db_schema.EventData), patch.object( + ), patch.object(core, "EventTypes", old_db_schema.EventTypes), patch.object( + core, "EventData", old_db_schema.EventData + ), patch.object( core, "States", old_db_schema.States ), patch.object( core, "Events", old_db_schema.Events diff --git a/tests/components/recorder/test_init.py b/tests/components/recorder/test_init.py index a810d74556b..c46d77677af 100644 --- a/tests/components/recorder/test_init.py +++ b/tests/components/recorder/test_init.py @@ -39,12 +39,14 @@ from homeassistant.components.recorder.db_schema import ( SCHEMA_VERSION, EventData, Events, + EventTypes, RecorderRuns, StateAttributes, States, StatisticsRuns, ) from homeassistant.components.recorder.models import process_timestamp +from homeassistant.components.recorder.queries import select_event_type_ids from homeassistant.components.recorder.services import ( SERVICE_DISABLE, SERVICE_ENABLE, @@ -483,16 +485,19 @@ def test_saving_event(hass_recorder: Callable[..., HomeAssistant]) -> None: events: list[Event] = [] with session_scope(hass=hass) as session: - for select_event, event_data in ( - session.query(Events, EventData) - .filter_by(event_type=event_type) + for select_event, event_data, event_types in ( + session.query(Events, EventData, EventTypes) + .filter(Events.event_type_id.in_(select_event_type_ids((event_type,)))) + .outerjoin(EventTypes, (Events.event_type_id == EventTypes.event_type_id)) .outerjoin(EventData, Events.data_id == EventData.data_id) ): select_event = cast(Events, select_event) event_data = cast(EventData, event_data) + event_types = cast(EventTypes, event_types) native_event = select_event.to_native() native_event.data = event_data.to_native() + native_event.event_type = event_types.event_type events.append(native_event) db_event = events[0] @@ -555,15 +560,19 @@ def _add_events(hass, events): with session_scope(hass=hass) as session: events = [] - for event, event_data in session.query(Events, EventData).outerjoin( - EventData, Events.data_id == EventData.data_id + for event, event_data, event_types in ( + session.query(Events, EventData, EventTypes) + .outerjoin(EventTypes, (Events.event_type_id == EventTypes.event_type_id)) + .outerjoin(EventData, Events.data_id == EventData.data_id) ): event = cast(Events, event) event_data = cast(EventData, event_data) + event_types = cast(EventTypes, event_types) native_event = event.to_native() if event_data: native_event.data = event_data.to_native() + native_event.event_type = event_types.event_type events.append(native_event) return events @@ -1349,7 +1358,11 @@ def test_service_disable_events_not_recording( event = events[0] with session_scope(hass=hass) as session: - db_events = list(session.query(Events).filter_by(event_type=event_type)) + db_events = list( + session.query(Events) + .filter(Events.event_type_id.in_(select_event_type_ids((event_type,)))) + .outerjoin(EventTypes, (Events.event_type_id == EventTypes.event_type_id)) + ) assert len(db_events) == 0 assert hass.services.call( @@ -1369,16 +1382,19 @@ def test_service_disable_events_not_recording( db_events = [] with session_scope(hass=hass) as session: - for select_event, event_data in ( - session.query(Events, EventData) - .filter_by(event_type=event_type) + for select_event, event_data, event_types in ( + session.query(Events, EventData, EventTypes) + .filter(Events.event_type_id.in_(select_event_type_ids((event_type,)))) + .outerjoin(EventTypes, (Events.event_type_id == EventTypes.event_type_id)) .outerjoin(EventData, Events.data_id == EventData.data_id) ): select_event = cast(Events, select_event) event_data = cast(EventData, event_data) + event_types = cast(EventTypes, event_types) native_event = select_event.to_native() native_event.data = event_data.to_native() + native_event.event_type = event_types.event_type db_events.append(native_event) assert len(db_events) == 1 @@ -1558,6 +1574,7 @@ def test_entity_id_filter(hass_recorder: Callable[..., HomeAssistant]) -> None: hass = hass_recorder( {"include": {"domains": "hello"}, "exclude": {"domains": "hidden_domain"}} ) + event_types = ("hello",) for idx, data in enumerate( ( @@ -1572,7 +1589,11 @@ def test_entity_id_filter(hass_recorder: Callable[..., HomeAssistant]) -> None: wait_recording_done(hass) with session_scope(hass=hass) as session: - db_events = list(session.query(Events).filter_by(event_type="hello")) + db_events = list( + session.query(Events).filter( + Events.event_type_id.in_(select_event_type_ids(event_types)) + ) + ) assert len(db_events) == idx + 1, data for data in ( @@ -1583,7 +1604,11 @@ def test_entity_id_filter(hass_recorder: Callable[..., HomeAssistant]) -> None: wait_recording_done(hass) with session_scope(hass=hass) as session: - db_events = list(session.query(Events).filter_by(event_type="hello")) + db_events = list( + session.query(Events).filter( + Events.event_type_id.in_(select_event_type_ids(event_types)) + ) + ) # Keep referring idx + 1, as no new events are being added assert len(db_events) == idx + 1, data @@ -1608,10 +1633,16 @@ async def test_database_lock_and_unlock( } await async_setup_recorder_instance(hass, config) await hass.async_block_till_done() + event_type = "EVENT_TEST" + event_types = (event_type,) def _get_db_events(): with session_scope(hass=hass) as session: - return list(session.query(Events).filter_by(event_type=event_type)) + return list( + session.query(Events).filter( + Events.event_type_id.in_(select_event_type_ids(event_types)) + ) + ) instance = get_instance(hass) @@ -1619,7 +1650,6 @@ async def test_database_lock_and_unlock( assert not await instance.lock_database() - event_type = "EVENT_TEST" event_data = {"test_attr": 5, "test_attr_10": "nice"} hass.bus.async_fire(event_type, event_data) task = asyncio.create_task(async_wait_recording_done(hass)) @@ -1658,10 +1688,16 @@ async def test_database_lock_and_overflow( } await async_setup_recorder_instance(hass, config) await hass.async_block_till_done() + event_type = "EVENT_TEST" + event_types = (event_type,) def _get_db_events(): with session_scope(hass=hass) as session: - return list(session.query(Events).filter_by(event_type=event_type)) + return list( + session.query(Events).filter( + Events.event_type_id.in_(select_event_type_ids(event_types)) + ) + ) instance = get_instance(hass) @@ -1670,7 +1706,6 @@ async def test_database_lock_and_overflow( ): await instance.lock_database() - event_type = "EVENT_TEST" event_data = {"test_attr": 5, "test_attr_10": "nice"} hass.bus.fire(event_type, event_data) @@ -1793,9 +1828,11 @@ def test_deduplication_event_data_inside_commit_interval( wait_recording_done(hass) with session_scope(hass=hass) as session: + event_types = ("this_event",) events = list( session.query(Events) - .filter(Events.event_type == "this_event") + .filter(Events.event_type_id.in_(select_event_type_ids(event_types))) + .outerjoin(EventTypes, (Events.event_type_id == EventTypes.event_type_id)) .outerjoin(EventData, (Events.data_id == EventData.data_id)) ) assert len(events) == 20 diff --git a/tests/components/recorder/test_migrate.py b/tests/components/recorder/test_migrate.py index 730d90e14ba..062013e7280 100644 --- a/tests/components/recorder/test_migrate.py +++ b/tests/components/recorder/test_migrate.py @@ -25,10 +25,15 @@ from homeassistant.components.recorder import db_schema, migration from homeassistant.components.recorder.db_schema import ( SCHEMA_VERSION, Events, + EventTypes, RecorderRuns, States, ) -from homeassistant.components.recorder.tasks import ContextIDMigrationTask +from homeassistant.components.recorder.queries import select_event_type_ids +from homeassistant.components.recorder.tasks import ( + ContextIDMigrationTask, + EventTypeIDMigrationTask, +) from homeassistant.components.recorder.util import session_scope from homeassistant.core import HomeAssistant from homeassistant.helpers import recorder as recorder_helper @@ -688,3 +693,74 @@ async def test_migrate_context_ids( assert invalid_context_id_event["context_id_bin"] == b"\x00" * 16 assert invalid_context_id_event["context_user_id_bin"] is None assert invalid_context_id_event["context_parent_id_bin"] is None + + +@pytest.mark.parametrize("enable_migrate_event_type_ids", [True]) +async def test_migrate_event_type_ids( + async_setup_recorder_instance: RecorderInstanceGenerator, hass: HomeAssistant +) -> None: + """Test we can migrate event_types to the EventTypes table.""" + instance = await async_setup_recorder_instance(hass) + await async_wait_recording_done(hass) + + def _insert_events(): + with session_scope(hass=hass) as session: + session.add_all( + ( + Events( + event_type="event_type_one", + origin_idx=0, + time_fired_ts=1677721632.452529, + ), + Events( + event_type="event_type_one", + origin_idx=0, + time_fired_ts=1677721632.552529, + ), + Events( + event_type="event_type_two", + origin_idx=0, + time_fired_ts=1677721632.552529, + ), + ) + ) + + await instance.async_add_executor_job(_insert_events) + + await async_wait_recording_done(hass) + # This is a threadsafe way to add a task to the recorder + instance.queue_task(EventTypeIDMigrationTask()) + await async_recorder_block_till_done(hass) + + def _fetch_migrated_events(): + with session_scope(hass=hass) as session: + events = ( + session.query(Events.event_id, Events.time_fired, EventTypes.event_type) + .filter( + Events.event_type_id.in_( + select_event_type_ids( + ( + "event_type_one", + "event_type_two", + ) + ) + ) + ) + .outerjoin(EventTypes, Events.event_type_id == EventTypes.event_type_id) + .all() + ) + assert len(events) == 3 + result = {} + for event in events: + result.setdefault(event.event_type, []).append( + { + "event_id": event.event_id, + "time_fired": event.time_fired, + "event_type": event.event_type, + } + ) + return result + + events_by_type = await instance.async_add_executor_job(_fetch_migrated_events) + assert len(events_by_type["event_type_one"]) == 2 + assert len(events_by_type["event_type_two"]) == 1 diff --git a/tests/components/recorder/test_models.py b/tests/components/recorder/test_models.py index 6f4de420b7b..df8ffa0d348 100644 --- a/tests/components/recorder/test_models.py +++ b/tests/components/recorder/test_models.py @@ -31,6 +31,7 @@ def test_from_event_to_db_event() -> None: db_event = Events.from_event(event) dialect = SupportedDialect.MYSQL db_event.event_data = EventData.shared_data_bytes_from_event(event, dialect) + db_event.event_type = event.event_type assert event.as_dict() == db_event.to_native().as_dict() @@ -232,11 +233,13 @@ async def test_event_to_db_model() -> None: db_event = Events.from_event(event) dialect = SupportedDialect.MYSQL db_event.event_data = EventData.shared_data_bytes_from_event(event, dialect) + db_event.event_type = event.event_type native = db_event.to_native() assert native.as_dict() == event.as_dict() native = Events.from_event(event).to_native() event.data = {} + native.event_type = event.event_type assert native.as_dict() == event.as_dict() diff --git a/tests/components/recorder/test_purge.py b/tests/components/recorder/test_purge.py index 07c935129e9..fcabb2e83a8 100644 --- a/tests/components/recorder/test_purge.py +++ b/tests/components/recorder/test_purge.py @@ -16,6 +16,7 @@ from homeassistant.components.recorder.const import ( from homeassistant.components.recorder.db_schema import ( EventData, Events, + EventTypes, RecorderRuns, StateAttributes, States, @@ -31,6 +32,7 @@ from homeassistant.components.recorder.tasks import PurgeTask from homeassistant.components.recorder.util import session_scope from homeassistant.const import EVENT_STATE_CHANGED, EVENT_THEMES_UPDATED, STATE_ON from homeassistant.core import HomeAssistant +from homeassistant.helpers.json import json_dumps from homeassistant.helpers.typing import ConfigType from homeassistant.util import dt as dt_util @@ -1684,3 +1686,113 @@ async def test_purge_can_mix_legacy_and_new_format( # does not prevent future purges. Its ignored. assert states_with_event_id.count() == 0 assert states_without_event_id.count() == 1 + + +async def test_purge_old_events_purges_the_event_type_ids( + async_setup_recorder_instance: RecorderInstanceGenerator, hass: HomeAssistant +) -> None: + """Test deleting old events purges event type ids.""" + instance = await async_setup_recorder_instance(hass) + assert instance.event_type_manager.active is True + + utcnow = dt_util.utcnow() + five_days_ago = utcnow - timedelta(days=5) + eleven_days_ago = utcnow - timedelta(days=11) + far_past = utcnow - timedelta(days=1000) + event_data = {"test_attr": 5, "test_attr_10": "nice"} + + await hass.async_block_till_done() + await async_wait_recording_done(hass) + + def _insert_events(): + with session_scope(hass=hass) as session: + event_type_test_auto_purge = EventTypes(event_type="EVENT_TEST_AUTOPURGE") + event_type_test_purge = EventTypes(event_type="EVENT_TEST_PURGE") + event_type_test = EventTypes(event_type="EVENT_TEST") + event_type_unused = EventTypes(event_type="EVENT_TEST_UNUSED") + session.add_all( + ( + event_type_test_auto_purge, + event_type_test_purge, + event_type_test, + event_type_unused, + ) + ) + session.flush() + for _ in range(5): + for event_id in range(6): + if event_id < 2: + timestamp = eleven_days_ago + event_type = event_type_test_auto_purge + elif event_id < 4: + timestamp = five_days_ago + event_type = event_type_test_purge + else: + timestamp = utcnow + event_type = event_type_test + + session.add( + Events( + event_type=None, + event_type_id=event_type.event_type_id, + event_data=json_dumps(event_data), + origin="LOCAL", + time_fired_ts=dt_util.utc_to_timestamp(timestamp), + ) + ) + return instance.event_type_manager.get_many( + [ + "EVENT_TEST_AUTOPURGE", + "EVENT_TEST_PURGE", + "EVENT_TEST", + "EVENT_TEST_UNUSED", + ], + session, + ) + + event_type_to_id = await instance.async_add_executor_job(_insert_events) + test_event_type_ids = event_type_to_id.values() + with session_scope(hass=hass) as session: + events = session.query(Events).where( + Events.event_type_id.in_(test_event_type_ids) + ) + event_types = session.query(EventTypes).where( + EventTypes.event_type_id.in_(test_event_type_ids) + ) + + assert events.count() == 30 + assert event_types.count() == 4 + + # run purge_old_data() + finished = purge_old_data( + instance, + far_past, + repack=False, + ) + assert finished + assert events.count() == 30 + # We should remove the unused event type + assert event_types.count() == 3 + + assert "EVENT_TEST_UNUSED" not in instance.event_type_manager._id_map + + # we should only have 10 events left since + # only one event type was recorded now + finished = purge_old_data( + instance, + utcnow, + repack=False, + ) + assert finished + assert events.count() == 10 + assert event_types.count() == 1 + + # Purge everything + finished = purge_old_data( + instance, + utcnow + timedelta(seconds=1), + repack=False, + ) + assert finished + assert events.count() == 0 + assert event_types.count() == 0 diff --git a/tests/components/recorder/test_v32_migration.py b/tests/components/recorder/test_v32_migration.py index e31d4472aaf..6fe810758fb 100644 --- a/tests/components/recorder/test_v32_migration.py +++ b/tests/components/recorder/test_v32_migration.py @@ -11,6 +11,7 @@ from sqlalchemy.orm import Session from homeassistant.components import recorder from homeassistant.components.recorder import SQLITE_URL_PREFIX, core, statistics +from homeassistant.components.recorder.queries import select_event_type_ids from homeassistant.components.recorder.util import session_scope from homeassistant.core import EVENT_STATE_CHANGED, Event, EventOrigin, State from homeassistant.helpers import recorder as recorder_helper @@ -87,7 +88,9 @@ def test_migrate_times(caplog: pytest.LogCaptureFixture, tmpdir) -> None: with patch.object(recorder, "db_schema", old_db_schema), patch.object( recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION - ), patch.object(core, "EventData", old_db_schema.EventData), patch.object( + ), patch.object(core, "EventTypes", old_db_schema.EventTypes), patch.object( + core, "EventData", old_db_schema.EventData + ), patch.object( core, "States", old_db_schema.States ), patch.object( core, "Events", old_db_schema.Events @@ -117,8 +120,10 @@ def test_migrate_times(caplog: pytest.LogCaptureFixture, tmpdir) -> None: wait_recording_done(hass) with session_scope(hass=hass) as session: result = list( - session.query(recorder.db_schema.Events).where( - recorder.db_schema.Events.event_type == "custom_event" + session.query(recorder.db_schema.Events).filter( + recorder.db_schema.Events.event_type_id.in_( + select_event_type_ids(("custom_event",)) + ) ) ) assert len(result) == 1 diff --git a/tests/conftest.py b/tests/conftest.py index ed5a95f1b25..25ee8143829 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1148,6 +1148,16 @@ def enable_migrate_context_ids() -> bool: return False +@pytest.fixture +def enable_migrate_event_type_ids() -> bool: + """Fixture to control enabling of recorder's event type id migration. + + To enable context id migration, tests can be marked with: + @pytest.mark.parametrize("enable_migrate_event_type_ids", [True]) + """ + return False + + @pytest.fixture def recorder_config() -> dict[str, Any] | None: """Fixture to override recorder config. @@ -1291,6 +1301,7 @@ async def async_setup_recorder_instance( enable_statistics: bool, enable_statistics_table_validation: bool, enable_migrate_context_ids: bool, + enable_migrate_event_type_ids: bool, ) -> AsyncGenerator[RecorderInstanceGenerator, None]: """Yield callable to setup recorder instance.""" # pylint: disable-next=import-outside-toplevel @@ -1309,6 +1320,11 @@ async def async_setup_recorder_instance( migrate_context_ids = ( recorder.Recorder._migrate_context_ids if enable_migrate_context_ids else None ) + migrate_event_type_ids = ( + recorder.Recorder._migrate_event_type_ids + if enable_migrate_event_type_ids + else None + ) with patch( "homeassistant.components.recorder.Recorder.async_nightly_tasks", side_effect=nightly, @@ -1325,6 +1341,10 @@ async def async_setup_recorder_instance( "homeassistant.components.recorder.Recorder._migrate_context_ids", side_effect=migrate_context_ids, autospec=True, + ), patch( + "homeassistant.components.recorder.Recorder._migrate_event_type_ids", + side_effect=migrate_event_type_ids, + autospec=True, ): async def async_setup_recorder(