From 19d2d023ab2ad4a900b325fd27c1c9df049e9a2f Mon Sep 17 00:00:00 2001 From: "J. Nick Koston" Date: Sun, 14 Jul 2024 16:20:52 -0500 Subject: [PATCH] Ensure states table rebuild still happens if the event_id index was removed (#121938) * Ensure states table rebuild still happens if the event_id index was removed If ix_states_event_id was removed by the foreign key still exists, the states table would not get rebuilt. This should not happen under normal circumstances and seems to only be possible if the index was removed manually or Home Assistant was restarted forcefully in the middle of a previous migration from years ago. * cover * fix tests * mysql wont allow at that point but thats ok as long as its gone at the end --- homeassistant/components/recorder/core.py | 36 +++- .../components/recorder/test_v32_migration.py | 188 +++++++++++++++--- tests/conftest.py | 21 ++ 3 files changed, 208 insertions(+), 37 deletions(-) diff --git a/homeassistant/components/recorder/core.py b/homeassistant/components/recorder/core.py index db9f4239480..5d7d81f38d2 100644 --- a/homeassistant/components/recorder/core.py +++ b/homeassistant/components/recorder/core.py @@ -16,7 +16,14 @@ import time from typing import TYPE_CHECKING, Any, cast import psutil_home_assistant as ha_psutil -from sqlalchemy import create_engine, event as sqlalchemy_event, exc, select, update +from sqlalchemy import ( + create_engine, + event as sqlalchemy_event, + exc, + inspect, + select, + update, +) from sqlalchemy.engine import Engine from sqlalchemy.engine.interfaces import DBAPIConnection from sqlalchemy.exc import SQLAlchemyError @@ -820,7 +827,7 @@ class Recorder(threading.Thread): # If ix_states_entity_id_last_updated_ts still exists # on the states table it means the entity id migration # finished by the EntityIDPostMigrationTask did not - # because they restarted in the middle of it. We need + # complete because they restarted in the middle of it. We need # to pick back up where we left off. if get_index_by_name( session, @@ -832,9 +839,13 @@ class Recorder(threading.Thread): if self.schema_version > LEGACY_STATES_EVENT_ID_INDEX_SCHEMA_VERSION: with contextlib.suppress(SQLAlchemyError): # If the index of event_ids on the states table is still present - # we need to queue a task to remove it. - if get_index_by_name( - session, TABLE_STATES, LEGACY_STATES_EVENT_ID_INDEX + # or the event_id foreign key still exists we need to queue a + # task to remove it. + if ( + get_index_by_name( + session, TABLE_STATES, LEGACY_STATES_EVENT_ID_INDEX + ) + or self._legacy_event_id_foreign_key_exists() ): self.queue_task(EventIdMigrationTask()) self.use_legacy_events_index = True @@ -1285,6 +1296,21 @@ class Recorder(threading.Thread): """Run post schema migration tasks.""" migration.post_schema_migration(self, old_version, new_version) + def _legacy_event_id_foreign_key_exists(self) -> bool: + """Check if the legacy event_id foreign key exists.""" + engine = self.engine + assert engine is not None + return bool( + next( + ( + fk + for fk in inspect(engine).get_foreign_keys(TABLE_STATES) + if fk["constrained_columns"] == ["event_id"] + ), + None, + ) + ) + def _migrate_states_context_ids(self) -> bool: """Migrate states context ids if needed.""" return migration.migrate_states_context_ids(self) diff --git a/tests/components/recorder/test_v32_migration.py b/tests/components/recorder/test_v32_migration.py index 4e809d02446..666629d4bcf 100644 --- a/tests/components/recorder/test_v32_migration.py +++ b/tests/components/recorder/test_v32_migration.py @@ -3,14 +3,14 @@ from datetime import timedelta import importlib import sys -from unittest.mock import patch +from unittest.mock import DEFAULT, patch import pytest from sqlalchemy import create_engine, inspect from sqlalchemy.orm import Session from homeassistant.components import recorder -from homeassistant.components.recorder import core, statistics +from homeassistant.components.recorder import core, migration, statistics from homeassistant.components.recorder.queries import select_event_type_ids from homeassistant.components.recorder.util import session_scope from homeassistant.core import EVENT_STATE_CHANGED, Event, EventOrigin, State @@ -104,21 +104,14 @@ async def test_migrate_times( patch.object(core, "States", old_db_schema.States), patch.object(core, "Events", old_db_schema.Events), patch(CREATE_ENGINE_TARGET, new=_create_engine_test), - patch( - "homeassistant.components.recorder.Recorder._migrate_events_context_ids", - ), - patch( - "homeassistant.components.recorder.Recorder._migrate_states_context_ids", - ), - patch( - "homeassistant.components.recorder.Recorder._migrate_event_type_ids", - ), - patch( - "homeassistant.components.recorder.Recorder._migrate_entity_ids", - ), - patch("homeassistant.components.recorder.Recorder._post_migrate_entity_ids"), - patch( - "homeassistant.components.recorder.Recorder._cleanup_legacy_states_event_ids" + patch.multiple( + "homeassistant.components.recorder.Recorder", + _migrate_events_context_ids=DEFAULT, + _migrate_states_context_ids=DEFAULT, + _migrate_event_type_ids=DEFAULT, + _migrate_entity_ids=DEFAULT, + _post_migrate_entity_ids=DEFAULT, + _cleanup_legacy_states_event_ids=DEFAULT, ), ): async with ( @@ -267,21 +260,14 @@ async def test_migrate_can_resume_entity_id_post_migration( patch.object(core, "States", old_db_schema.States), patch.object(core, "Events", old_db_schema.Events), patch(CREATE_ENGINE_TARGET, new=_create_engine_test), - patch( - "homeassistant.components.recorder.Recorder._migrate_events_context_ids", - ), - patch( - "homeassistant.components.recorder.Recorder._migrate_states_context_ids", - ), - patch( - "homeassistant.components.recorder.Recorder._migrate_event_type_ids", - ), - patch( - "homeassistant.components.recorder.Recorder._migrate_entity_ids", - ), - patch("homeassistant.components.recorder.Recorder._post_migrate_entity_ids"), - patch( - "homeassistant.components.recorder.Recorder._cleanup_legacy_states_event_ids" + patch.multiple( + "homeassistant.components.recorder.Recorder", + _migrate_events_context_ids=DEFAULT, + _migrate_states_context_ids=DEFAULT, + _migrate_event_type_ids=DEFAULT, + _migrate_entity_ids=DEFAULT, + _post_migrate_entity_ids=DEFAULT, + _cleanup_legacy_states_event_ids=DEFAULT, ), ): async with ( @@ -328,5 +314,143 @@ async def test_migrate_can_resume_entity_id_post_migration( states_indexes = await instance.async_add_executor_job(_get_states_index_names) states_index_names = {index["name"] for index in states_indexes} assert "ix_states_entity_id_last_updated_ts" not in states_index_names + assert "ix_states_event_id" not in states_index_names + + await hass.async_stop() + + +@pytest.mark.parametrize("persistent_database", [True]) +@pytest.mark.usefixtures("hass_storage") # Prevent test hass from writing to storage +async def test_migrate_can_resume_ix_states_event_id_removed( + async_test_recorder: RecorderInstanceGenerator, + caplog: pytest.LogCaptureFixture, + recorder_db_url: str, +) -> None: + """Test we resume the entity id post migration after a restart. + + This case tests the migration still happens if + ix_states_event_id is removed from the states table. + """ + importlib.import_module(SCHEMA_MODULE) + old_db_schema = sys.modules[SCHEMA_MODULE] + now = dt_util.utcnow() + one_second_past = now - timedelta(seconds=1) + mock_state = State( + "sensor.test", + "old", + {"last_reset": now.isoformat()}, + last_changed=one_second_past, + last_updated=now, + ) + state_changed_event = Event( + EVENT_STATE_CHANGED, + { + "entity_id": "sensor.test", + "old_state": None, + "new_state": mock_state, + }, + EventOrigin.local, + time_fired_timestamp=now.timestamp(), + ) + custom_event = Event( + "custom_event", + {"entity_id": "sensor.custom"}, + EventOrigin.local, + time_fired_timestamp=now.timestamp(), + ) + number_of_migrations = 5 + + def _get_event_id_foreign_keys(): + assert instance.engine is not None + return next( + ( + fk # type: ignore[misc] + for fk in inspect(instance.engine).get_foreign_keys("states") + if fk["constrained_columns"] == ["event_id"] + ), + None, + ) + + def _get_states_index_names(): + with session_scope(hass=hass) as session: + return inspect(session.connection()).get_indexes("states") + + with ( + patch.object(recorder, "db_schema", old_db_schema), + patch.object( + recorder.migration, "SCHEMA_VERSION", old_db_schema.SCHEMA_VERSION + ), + patch.object(core, "StatesMeta", old_db_schema.StatesMeta), + patch.object(core, "EventTypes", old_db_schema.EventTypes), + patch.object(core, "EventData", old_db_schema.EventData), + patch.object(core, "States", old_db_schema.States), + patch.object(core, "Events", old_db_schema.Events), + patch(CREATE_ENGINE_TARGET, new=_create_engine_test), + patch.multiple( + "homeassistant.components.recorder.Recorder", + _migrate_events_context_ids=DEFAULT, + _migrate_states_context_ids=DEFAULT, + _migrate_event_type_ids=DEFAULT, + _migrate_entity_ids=DEFAULT, + _post_migrate_entity_ids=DEFAULT, + _cleanup_legacy_states_event_ids=DEFAULT, + ), + ): + async with ( + async_test_home_assistant() as hass, + async_test_recorder(hass) as instance, + ): + await hass.async_block_till_done() + await async_wait_recording_done(hass) + await async_wait_recording_done(hass) + + def _add_data(): + with session_scope(hass=hass) as session: + session.add(old_db_schema.Events.from_event(custom_event)) + session.add(old_db_schema.States.from_event(state_changed_event)) + + await instance.async_add_executor_job(_add_data) + await hass.async_block_till_done() + await instance.async_block_till_done() + + await instance.async_add_executor_job( + migration._drop_index, + instance.get_session, + "states", + "ix_states_event_id", + ) + + states_indexes = await instance.async_add_executor_job( + _get_states_index_names + ) + states_index_names = {index["name"] for index in states_indexes} + assert instance.use_legacy_events_index is True + assert ( + await instance.async_add_executor_job(_get_event_id_foreign_keys) + is not None + ) + + await hass.async_stop() + await hass.async_block_till_done() + + assert "ix_states_entity_id_last_updated_ts" in states_index_names + + async with ( + async_test_home_assistant() as hass, + async_test_recorder(hass) as instance, + ): + await hass.async_block_till_done() + + # We need to wait for all the migration tasks to complete + # before we can check the database. + for _ in range(number_of_migrations): + await instance.async_block_till_done() + await async_wait_recording_done(hass) + + states_indexes = await instance.async_add_executor_job(_get_states_index_names) + states_index_names = {index["name"] for index in states_indexes} + assert "ix_states_entity_id_last_updated_ts" not in states_index_names + assert "ix_states_event_id" not in states_index_names + assert await instance.async_add_executor_job(_get_event_id_foreign_keys) is None await hass.async_stop() diff --git a/tests/conftest.py b/tests/conftest.py index b96bd783331..85f4671f6c0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1300,6 +1300,16 @@ def enable_migrate_entity_ids() -> bool: return False +@pytest.fixture +def enable_migrate_event_ids() -> bool: + """Fixture to control enabling of recorder's event id migration. + + To enable context id migration, tests can be marked with: + @pytest.mark.parametrize("enable_migrate_event_ids", [True]) + """ + return False + + @pytest.fixture def recorder_config() -> dict[str, Any] | None: """Fixture to override recorder config. @@ -1416,6 +1426,7 @@ async def async_test_recorder( enable_migrate_context_ids: bool, enable_migrate_event_type_ids: bool, enable_migrate_entity_ids: bool, + enable_migrate_event_ids: bool, ) -> AsyncGenerator[RecorderInstanceGenerator]: """Yield context manager to setup recorder instance.""" # pylint: disable-next=import-outside-toplevel @@ -1457,6 +1468,11 @@ async def async_test_recorder( migrate_entity_ids = ( recorder.Recorder._migrate_entity_ids if enable_migrate_entity_ids else None ) + legacy_event_id_foreign_key_exists = ( + recorder.Recorder._legacy_event_id_foreign_key_exists + if enable_migrate_event_ids + else None + ) with ( patch( "homeassistant.components.recorder.Recorder.async_nightly_tasks", @@ -1493,6 +1509,11 @@ async def async_test_recorder( side_effect=migrate_entity_ids, autospec=True, ), + patch( + "homeassistant.components.recorder.Recorder._legacy_event_id_foreign_key_exists", + side_effect=legacy_event_id_foreign_key_exists, + autospec=True, + ), patch( "homeassistant.components.recorder.Recorder._schedule_compile_missing_statistics", side_effect=compile_missing,