mirror of
https://github.com/home-assistant/core.git
synced 2025-08-08 15:15:09 +02:00
fix stale docs strings, remove dead code
This commit is contained in:
@@ -50,15 +50,6 @@ def get_shared_event_datas(hashes: list[int]) -> StatementLambdaElement:
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def find_event_type_id(event_type: str) -> StatementLambdaElement:
|
|
||||||
"""Find an event_type id by event_type."""
|
|
||||||
return lambda_stmt(
|
|
||||||
lambda: select(EventTypes.event_type_id).filter(
|
|
||||||
EventTypes.event_type == event_type
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def find_event_type_ids(event_types: Iterable[str]) -> StatementLambdaElement:
|
def find_event_type_ids(event_types: Iterable[str]) -> StatementLambdaElement:
|
||||||
"""Find an event_type id by event_type."""
|
"""Find an event_type id by event_type."""
|
||||||
return lambda_stmt(
|
return lambda_stmt(
|
||||||
|
@@ -16,29 +16,29 @@ CACHE_SIZE = 2048
|
|||||||
|
|
||||||
|
|
||||||
class EventTypeManager:
|
class EventTypeManager:
|
||||||
"""Manage event types."""
|
"""Manage the EventTypes table."""
|
||||||
|
|
||||||
def __init__(self) -> None:
|
def __init__(self) -> None:
|
||||||
"""Initialize the event manager."""
|
"""Initialize the event type manager."""
|
||||||
self._id_map: dict[str, int] = LRU(CACHE_SIZE)
|
self._id_map: dict[str, int] = LRU(CACHE_SIZE)
|
||||||
self._pending: dict[str, EventTypes] = {}
|
self._pending: dict[str, EventTypes] = {}
|
||||||
self.active = False
|
self.active = False
|
||||||
|
|
||||||
def load(self, events: list[Event], session: Session) -> None:
|
def load(self, events: list[Event], session: Session) -> None:
|
||||||
"""Load the event types into memory."""
|
"""Load the event_type to event_type_ids mapping into memory."""
|
||||||
self.get_many(
|
self.get_many(
|
||||||
(event.event_type for event in events if event.event_type is not None),
|
(event.event_type for event in events if event.event_type is not None),
|
||||||
session,
|
session,
|
||||||
)
|
)
|
||||||
|
|
||||||
def get(self, event_type: str, session: Session) -> int | None:
|
def get(self, event_type: str, session: Session) -> int | None:
|
||||||
"""Resolve events to event data."""
|
"""Resolve event_type to the event_type_id."""
|
||||||
return self.get_many((event_type,), session)[event_type]
|
return self.get_many((event_type,), session)[event_type]
|
||||||
|
|
||||||
def get_many(
|
def get_many(
|
||||||
self, event_types: Iterable[str], session: Session
|
self, event_types: Iterable[str], session: Session
|
||||||
) -> dict[str, int | None]:
|
) -> dict[str, int | None]:
|
||||||
"""Resolve events to event data."""
|
"""Resolve event_types to event_type_ids."""
|
||||||
results: dict[str, int | None] = {}
|
results: dict[str, int | None] = {}
|
||||||
missing: list[str] = []
|
missing: list[str] = []
|
||||||
for event_type in event_types:
|
for event_type in event_types:
|
||||||
@@ -61,27 +61,27 @@ class EventTypeManager:
|
|||||||
return results
|
return results
|
||||||
|
|
||||||
def get_pending(self, event_type: str) -> EventTypes | None:
|
def get_pending(self, event_type: str) -> EventTypes | None:
|
||||||
"""Get pending event type."""
|
"""Get pending EventTypes that have not be assigned ids yet."""
|
||||||
return self._pending.get(event_type)
|
return self._pending.get(event_type)
|
||||||
|
|
||||||
def add_pending(self, db_event_type: EventTypes) -> None:
|
def add_pending(self, db_event_type: EventTypes) -> None:
|
||||||
"""Add a pending event."""
|
"""Add a pending EventTypes that will be committed at the next interval."""
|
||||||
assert db_event_type.event_type is not None
|
assert db_event_type.event_type is not None
|
||||||
event_type: str = db_event_type.event_type
|
event_type: str = db_event_type.event_type
|
||||||
self._pending[event_type] = db_event_type
|
self._pending[event_type] = db_event_type
|
||||||
|
|
||||||
def post_commit_pending(self) -> None:
|
def post_commit_pending(self) -> None:
|
||||||
"""Flush pending events."""
|
"""Call after commit to load the event_type_ids of the new EventTypes into the LRU."""
|
||||||
for event_type, db_event_types in self._pending.items():
|
for event_type, db_event_types in self._pending.items():
|
||||||
self._id_map[event_type] = db_event_types.event_type_id
|
self._id_map[event_type] = db_event_types.event_type_id
|
||||||
self._pending.clear()
|
self._pending.clear()
|
||||||
|
|
||||||
def reset(self) -> None:
|
def reset(self) -> None:
|
||||||
"""Reset the event manager."""
|
"""Reset the event manager after the database has been reset or changed."""
|
||||||
self._id_map.clear()
|
self._id_map.clear()
|
||||||
self._pending.clear()
|
self._pending.clear()
|
||||||
|
|
||||||
def evict_purged(self, event_types: Iterable[str]) -> None:
|
def evict_purged(self, event_types: Iterable[str]) -> None:
|
||||||
"""Evict purged event types."""
|
"""Evict purged event_types from the cache when they are no longer used."""
|
||||||
for event_type in event_types:
|
for event_type in event_types:
|
||||||
self._id_map.pop(event_type, None)
|
self._id_map.pop(event_type, None)
|
||||||
|
Reference in New Issue
Block a user