mirror of
https://github.com/home-assistant/core.git
synced 2026-04-14 05:36:13 +02:00
Compare commits
25 Commits
dev
...
improve_lo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d91d05750a | ||
|
|
6438c44895 | ||
|
|
8b5310efe9 | ||
|
|
46900109e0 | ||
|
|
604c1c44e8 | ||
|
|
fc8170ba12 | ||
|
|
231b69d433 | ||
|
|
1057cee988 | ||
|
|
804ac0c6a0 | ||
|
|
7cd2d67cca | ||
|
|
f987b1485f | ||
|
|
e7adeb616a | ||
|
|
943bfabf6d | ||
|
|
c5a6b9c002 | ||
|
|
acee1d48f1 | ||
|
|
baa103acc5 | ||
|
|
3957e409ca | ||
|
|
cc7e4422fd | ||
|
|
99d166e6e8 | ||
|
|
545ead149a | ||
|
|
42c85b9863 | ||
|
|
93b1401e4b | ||
|
|
278a3b4928 | ||
|
|
f69b0de2c7 | ||
|
|
4cd93b0e7c |
@@ -11,7 +11,9 @@ from homeassistant.const import (
|
||||
ATTR_DEVICE_ID,
|
||||
ATTR_DOMAIN,
|
||||
ATTR_ENTITY_ID,
|
||||
ATTR_SERVICE_DATA,
|
||||
ATTR_UNIT_OF_MEASUREMENT,
|
||||
EVENT_CALL_SERVICE,
|
||||
EVENT_LOGBOOK_ENTRY,
|
||||
EVENT_STATE_CHANGED,
|
||||
)
|
||||
@@ -104,10 +106,22 @@ def async_determine_event_types(
|
||||
|
||||
|
||||
@callback
|
||||
def extract_attr(source: Mapping[str, Any], attr: str) -> list[str]:
|
||||
"""Extract an attribute as a list or string."""
|
||||
def extract_attr(
|
||||
event_type: EventType[Any] | str, source: Mapping[str, Any], attr: str
|
||||
) -> list[str]:
|
||||
"""Extract an attribute as a list or string.
|
||||
|
||||
For EVENT_CALL_SERVICE events, the entity_id is inside service_data,
|
||||
not at the top level. Check service_data as a fallback.
|
||||
"""
|
||||
if (value := source.get(attr)) is None:
|
||||
return []
|
||||
# Early return to avoid unnecessary dict lookups for non-service events
|
||||
if event_type != EVENT_CALL_SERVICE:
|
||||
return []
|
||||
if service_data := source.get(ATTR_SERVICE_DATA):
|
||||
value = service_data.get(attr)
|
||||
if value is None:
|
||||
return []
|
||||
if isinstance(value, list):
|
||||
return value
|
||||
return str(value).split(",")
|
||||
@@ -135,7 +149,7 @@ def event_forwarder_filtered(
|
||||
def _forward_events_filtered_by_entities_filter(event: Event) -> None:
|
||||
assert entities_filter is not None
|
||||
event_data = event.data
|
||||
entity_ids = extract_attr(event_data, ATTR_ENTITY_ID)
|
||||
entity_ids = extract_attr(event.event_type, event_data, ATTR_ENTITY_ID)
|
||||
if entity_ids and not any(
|
||||
entities_filter(entity_id) for entity_id in entity_ids
|
||||
):
|
||||
@@ -157,9 +171,12 @@ def event_forwarder_filtered(
|
||||
@callback
|
||||
def _forward_events_filtered_by_device_entity_ids(event: Event) -> None:
|
||||
event_data = event.data
|
||||
event_type = event.event_type
|
||||
if entity_ids_set.intersection(
|
||||
extract_attr(event_data, ATTR_ENTITY_ID)
|
||||
) or device_ids_set.intersection(extract_attr(event_data, ATTR_DEVICE_ID)):
|
||||
extract_attr(event_type, event_data, ATTR_ENTITY_ID)
|
||||
) or device_ids_set.intersection(
|
||||
extract_attr(event_type, event_data, ATTR_DEVICE_ID)
|
||||
):
|
||||
target(event)
|
||||
|
||||
return _forward_events_filtered_by_device_entity_ids
|
||||
|
||||
@@ -162,7 +162,10 @@ def async_event_to_row(event: Event) -> EventAsRow:
|
||||
# that are missing new_state or old_state
|
||||
# since the logbook does not show these
|
||||
new_state: State = event.data["new_state"]
|
||||
context = new_state.context
|
||||
# Use the event's context rather than the state's context because
|
||||
# State.expire() replaces the context with a copy that loses
|
||||
# origin_event, which is needed for context augmentation.
|
||||
context = event.context
|
||||
return EventAsRow(
|
||||
row_id=hash(event),
|
||||
event_type=None,
|
||||
|
||||
@@ -3,14 +3,16 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Callable, Generator, Sequence
|
||||
from dataclasses import dataclass
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime as dt
|
||||
import logging
|
||||
import time
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
from lru import LRU
|
||||
from sqlalchemy.engine import Result
|
||||
from sqlalchemy.engine.row import Row
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from homeassistant.components.recorder import get_instance
|
||||
from homeassistant.components.recorder.filters import Filters
|
||||
@@ -37,6 +39,7 @@ from homeassistant.const import (
|
||||
from homeassistant.core import HomeAssistant, split_entity_id
|
||||
from homeassistant.helpers import entity_registry as er
|
||||
from homeassistant.util import dt as dt_util
|
||||
from homeassistant.util.collection import chunked_or_all
|
||||
from homeassistant.util.event_type import EventType
|
||||
|
||||
from .const import (
|
||||
@@ -80,10 +83,18 @@ from .models import (
|
||||
async_event_to_row,
|
||||
)
|
||||
from .queries import statement_for_request
|
||||
from .queries.common import PSEUDO_EVENT_STATE_CHANGED
|
||||
from .queries.common import (
|
||||
PSEUDO_EVENT_STATE_CHANGED,
|
||||
select_context_user_ids_for_context_ids,
|
||||
)
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
# Bound for the parent-context user-id cache — only needs to bridge the
|
||||
# historical→live handoff, so the in-flight set is realistically ~tens with
|
||||
# peak bursts of ~100. Ceiling bounds memory in pathological cases.
|
||||
MAX_CONTEXT_USER_IDS_CACHE = 256
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class LogbookRun:
|
||||
@@ -99,6 +110,14 @@ class LogbookRun:
|
||||
include_entity_name: bool
|
||||
timestamp: bool
|
||||
memoize_new_contexts: bool = True
|
||||
# True when this run will switch to a live stream; gates population of
|
||||
# context_user_ids (wasted work for one-shot REST/get_events callers).
|
||||
for_live_stream: bool = False
|
||||
# context_id -> user_id for parent context attribution; persisted across
|
||||
# batches so child rows can inherit user_id from a parent seen earlier.
|
||||
context_user_ids: LRU[bytes, bytes] = field(
|
||||
default_factory=lambda: LRU(MAX_CONTEXT_USER_IDS_CACHE)
|
||||
)
|
||||
|
||||
|
||||
class EventProcessor:
|
||||
@@ -113,6 +132,7 @@ class EventProcessor:
|
||||
context_id: str | None = None,
|
||||
timestamp: bool = False,
|
||||
include_entity_name: bool = True,
|
||||
for_live_stream: bool = False,
|
||||
) -> None:
|
||||
"""Init the event stream."""
|
||||
assert not (context_id and (entity_ids or device_ids)), (
|
||||
@@ -133,6 +153,7 @@ class EventProcessor:
|
||||
entity_name_cache=EntityNameCache(self.hass),
|
||||
include_entity_name=include_entity_name,
|
||||
timestamp=timestamp,
|
||||
for_live_stream=for_live_stream,
|
||||
)
|
||||
self.context_augmenter = ContextAugmenter(self.logbook_run)
|
||||
|
||||
@@ -180,13 +201,67 @@ class EventProcessor:
|
||||
self.filters,
|
||||
self.context_id,
|
||||
)
|
||||
return self.humanify(
|
||||
execute_stmt_lambda_element(session, stmt, orm_rows=False)
|
||||
rows = execute_stmt_lambda_element(session, stmt, orm_rows=False)
|
||||
query_parent_user_ids: dict[bytes, bytes] | None = None
|
||||
if self.entity_ids or self.device_ids:
|
||||
# Filtered queries exclude parent call_service rows for
|
||||
# unrelated targets, so child contexts lose user attribution
|
||||
# without a pre-pass. all_stmt already includes them.
|
||||
rows = list(rows)
|
||||
query_parent_user_ids = self._fetch_parent_user_ids(
|
||||
session, rows, instance.max_bind_vars
|
||||
)
|
||||
return self.humanify(rows, query_parent_user_ids)
|
||||
|
||||
def _fetch_parent_user_ids(
|
||||
self,
|
||||
session: Session,
|
||||
rows: list[Row],
|
||||
max_bind_vars: int,
|
||||
) -> dict[bytes, bytes] | None:
|
||||
"""Resolve parent-context user_ids for rows in a filtered query.
|
||||
|
||||
Done in Python rather than as a SQL union branch because the
|
||||
context_parent_id_bin column is sparsely populated — scanning the
|
||||
States table for non-null parents costs ~40% of the overall query
|
||||
on real datasets. Here we collect only the parent ids we actually
|
||||
need and fetch them via an indexed point-lookup on context_id_bin.
|
||||
"""
|
||||
cache = self.logbook_run.context_user_ids
|
||||
pending: set[bytes] = {
|
||||
parent_id
|
||||
for row in rows
|
||||
if (parent_id := row[CONTEXT_PARENT_ID_BIN_POS]) and parent_id not in cache
|
||||
}
|
||||
if not pending:
|
||||
return None
|
||||
query_parent_user_ids: dict[bytes, bytes] = {}
|
||||
# The lambda statement unions events and states, so each id appears
|
||||
# in two IN clauses — halve the chunk size to stay under the
|
||||
# database's max bind variable count.
|
||||
for pending_chunk in chunked_or_all(pending, max_bind_vars // 2):
|
||||
# Schema allows NULL but the query's WHERE clauses exclude it;
|
||||
# explicit checks satisfy the type checker.
|
||||
query_parent_user_ids.update(
|
||||
{
|
||||
parent_id: user_id
|
||||
for parent_id, user_id in execute_stmt_lambda_element(
|
||||
session,
|
||||
select_context_user_ids_for_context_ids(pending_chunk),
|
||||
orm_rows=False,
|
||||
)
|
||||
if parent_id is not None and user_id is not None
|
||||
}
|
||||
)
|
||||
if self.logbook_run.for_live_stream:
|
||||
cache.update(query_parent_user_ids)
|
||||
return query_parent_user_ids
|
||||
|
||||
def humanify(
|
||||
self, rows: Generator[EventAsRow] | Sequence[Row] | Result
|
||||
) -> list[dict[str, str]]:
|
||||
self,
|
||||
rows: Generator[EventAsRow] | Sequence[Row] | Result,
|
||||
query_parent_user_ids: dict[bytes, bytes] | None = None,
|
||||
) -> list[dict[str, Any]]:
|
||||
"""Humanify rows."""
|
||||
return list(
|
||||
_humanify(
|
||||
@@ -195,6 +270,7 @@ class EventProcessor:
|
||||
self.ent_reg,
|
||||
self.logbook_run,
|
||||
self.context_augmenter,
|
||||
query_parent_user_ids,
|
||||
)
|
||||
)
|
||||
|
||||
@@ -205,6 +281,7 @@ def _humanify(
|
||||
ent_reg: er.EntityRegistry,
|
||||
logbook_run: LogbookRun,
|
||||
context_augmenter: ContextAugmenter,
|
||||
query_parent_user_ids: dict[bytes, bytes] | None,
|
||||
) -> Generator[dict[str, Any]]:
|
||||
"""Generate a converted list of events into entries."""
|
||||
# Continuous sensors, will be excluded from the logbook
|
||||
@@ -220,11 +297,21 @@ def _humanify(
|
||||
context_id_bin: bytes
|
||||
data: dict[str, Any]
|
||||
|
||||
context_user_ids = logbook_run.context_user_ids
|
||||
# Skip the LRU write on one-shot runs — the LogbookRun is discarded.
|
||||
populate_context_user_ids = logbook_run.for_live_stream
|
||||
|
||||
# Process rows
|
||||
for row in rows:
|
||||
context_id_bin = row[CONTEXT_ID_BIN_POS]
|
||||
if memoize_new_contexts and context_id_bin not in context_lookup:
|
||||
context_lookup[context_id_bin] = row
|
||||
if (
|
||||
populate_context_user_ids
|
||||
and (context_user_id_bin := row[CONTEXT_USER_ID_BIN_POS])
|
||||
and context_id_bin not in context_user_ids
|
||||
):
|
||||
context_user_ids[context_id_bin] = context_user_id_bin
|
||||
if row[CONTEXT_ONLY_POS]:
|
||||
continue
|
||||
event_type = row[EVENT_TYPE_POS]
|
||||
@@ -307,6 +394,28 @@ def _humanify(
|
||||
):
|
||||
context_augmenter.augment(data, context_row)
|
||||
|
||||
# Fall back to the parent context for child contexts that inherit
|
||||
# user attribution (e.g., generic_thermostat -> switch turn_on).
|
||||
# Read from context_lookup directly instead of get_context() to
|
||||
# avoid the origin_event fallback which would return the *child*
|
||||
# row's origin event, not the parent's.
|
||||
if CONTEXT_USER_ID not in data and (
|
||||
context_parent_id_bin := row[CONTEXT_PARENT_ID_BIN_POS]
|
||||
):
|
||||
parent_user_id_bin: bytes | None = context_user_ids.get(
|
||||
context_parent_id_bin
|
||||
)
|
||||
if parent_user_id_bin is None and query_parent_user_ids is not None:
|
||||
parent_user_id_bin = query_parent_user_ids.get(context_parent_id_bin)
|
||||
if (
|
||||
parent_user_id_bin is None
|
||||
and (parent_row := context_lookup.get(context_parent_id_bin))
|
||||
is not None
|
||||
):
|
||||
parent_user_id_bin = parent_row[CONTEXT_USER_ID_BIN_POS]
|
||||
if parent_user_id_bin:
|
||||
data[CONTEXT_USER_ID] = bytes_to_uuid_hex_or_none(parent_user_id_bin)
|
||||
|
||||
yield data
|
||||
|
||||
|
||||
|
||||
@@ -2,12 +2,14 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Collection
|
||||
from typing import Final
|
||||
|
||||
import sqlalchemy
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy import lambda_stmt, select, union_all
|
||||
from sqlalchemy.sql.elements import BooleanClauseList, ColumnElement
|
||||
from sqlalchemy.sql.expression import literal
|
||||
from sqlalchemy.sql.lambdas import StatementLambdaElement
|
||||
from sqlalchemy.sql.selectable import Select
|
||||
|
||||
from homeassistant.components.recorder.db_schema import (
|
||||
@@ -122,6 +124,26 @@ def select_events_context_id_subquery(
|
||||
)
|
||||
|
||||
|
||||
def select_context_user_ids_for_context_ids(
|
||||
context_ids: Collection[bytes],
|
||||
) -> StatementLambdaElement:
|
||||
"""Select (context_id_bin, context_user_id_bin) for the given context ids.
|
||||
|
||||
Union of events and states since a parent context can originate from
|
||||
either table (e.g., a state set directly via the API).
|
||||
"""
|
||||
return lambda_stmt(
|
||||
lambda: union_all(
|
||||
select(Events.context_id_bin, Events.context_user_id_bin)
|
||||
.where(Events.context_id_bin.in_(context_ids))
|
||||
.where(Events.context_user_id_bin.is_not(None)),
|
||||
select(States.context_id_bin, States.context_user_id_bin)
|
||||
.where(States.context_id_bin.in_(context_ids))
|
||||
.where(States.context_user_id_bin.is_not(None)),
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def select_events_context_only() -> Select:
|
||||
"""Generate an events query that mark them as for context_only.
|
||||
|
||||
|
||||
@@ -14,11 +14,13 @@ import voluptuous as vol
|
||||
from homeassistant.components import websocket_api
|
||||
from homeassistant.components.recorder import get_instance
|
||||
from homeassistant.components.websocket_api import ActiveConnection, messages
|
||||
from homeassistant.const import EVENT_CALL_SERVICE
|
||||
from homeassistant.core import CALLBACK_TYPE, Event, HomeAssistant, callback
|
||||
from homeassistant.helpers.event import async_track_point_in_utc_time
|
||||
from homeassistant.helpers.json import json_bytes
|
||||
from homeassistant.util import dt as dt_util
|
||||
from homeassistant.util.async_ import create_eager_task
|
||||
from homeassistant.util.event_type import EventType
|
||||
|
||||
from .const import DOMAIN
|
||||
from .helpers import (
|
||||
@@ -289,6 +291,8 @@ async def ws_event_stream(
|
||||
return
|
||||
|
||||
event_types = async_determine_event_types(hass, entity_ids, device_ids)
|
||||
# A past end_time makes this a one-shot fetch that never goes live.
|
||||
will_go_live = not (end_time and end_time <= utc_now)
|
||||
event_processor = EventProcessor(
|
||||
hass,
|
||||
event_types,
|
||||
@@ -297,6 +301,7 @@ async def ws_event_stream(
|
||||
None,
|
||||
timestamp=True,
|
||||
include_entity_name=False,
|
||||
for_live_stream=will_go_live,
|
||||
)
|
||||
|
||||
if end_time and end_time <= utc_now:
|
||||
@@ -357,11 +362,20 @@ async def ws_event_stream(
|
||||
logbook_config: LogbookConfig = hass.data[DOMAIN]
|
||||
entities_filter = logbook_config.entity_filter
|
||||
|
||||
# Live subscription needs call_service events so the live consumer can
|
||||
# cache parent user_ids as they fire. Historical queries don't — the
|
||||
# context_only join fetches them by context_id regardless of type.
|
||||
# Unfiltered streams already include it via BUILT_IN_EVENTS.
|
||||
live_event_types: tuple[EventType[Any] | str, ...] = (
|
||||
event_types
|
||||
if EVENT_CALL_SERVICE in event_types
|
||||
else (*event_types, EVENT_CALL_SERVICE)
|
||||
)
|
||||
async_subscribe_events(
|
||||
hass,
|
||||
subscriptions,
|
||||
_queue_or_cancel,
|
||||
event_types,
|
||||
live_event_types,
|
||||
entities_filter,
|
||||
entity_ids,
|
||||
device_ids,
|
||||
|
||||
@@ -13,7 +13,16 @@ from homeassistant.components.recorder.models import (
|
||||
ulid_to_bytes_or_none,
|
||||
uuid_hex_to_bytes_or_none,
|
||||
)
|
||||
from homeassistant.core import Context
|
||||
from homeassistant.const import (
|
||||
ATTR_DOMAIN,
|
||||
ATTR_ENTITY_ID,
|
||||
ATTR_FRIENDLY_NAME,
|
||||
ATTR_SERVICE,
|
||||
EVENT_CALL_SERVICE,
|
||||
STATE_OFF,
|
||||
STATE_ON,
|
||||
)
|
||||
from homeassistant.core import Context, HomeAssistant
|
||||
from homeassistant.helpers import entity_registry as er
|
||||
from homeassistant.helpers.json import JSONEncoder
|
||||
from homeassistant.util import dt as dt_util
|
||||
@@ -65,6 +74,97 @@ class MockRow:
|
||||
return process_timestamp_to_utc_isoformat(self.time_fired)
|
||||
|
||||
|
||||
def setup_thermostat_context_test_entities(hass_: HomeAssistant) -> None:
|
||||
"""Set up initial states for the thermostat context chain test entities."""
|
||||
hass_.states.async_set(
|
||||
"climate.living_room",
|
||||
"off",
|
||||
{ATTR_FRIENDLY_NAME: "Living Room Thermostat"},
|
||||
)
|
||||
hass_.states.async_set("switch.heater", STATE_OFF)
|
||||
|
||||
|
||||
def simulate_thermostat_context_chain(
|
||||
hass_: HomeAssistant,
|
||||
user_id: str = "b400facee45711eaa9308bfd3d19e474",
|
||||
) -> tuple[Context, Context]:
|
||||
"""Simulate the generic_thermostat context chain.
|
||||
|
||||
Fires events in the realistic order:
|
||||
1. EVENT_CALL_SERVICE for set_hvac_mode (parent context)
|
||||
2. EVENT_CALL_SERVICE for homeassistant.turn_on (child context)
|
||||
3. Climate state changes off → heat (parent context)
|
||||
4. Switch state changes off → on (child context)
|
||||
|
||||
Returns the (parent_context, child_context) tuple.
|
||||
"""
|
||||
parent_context = Context(
|
||||
id="01GTDGKBCH00GW0X476W5TVAAA",
|
||||
user_id=user_id,
|
||||
)
|
||||
child_context = Context(
|
||||
id="01GTDGKBCH00GW0X476W5TVDDD",
|
||||
parent_id=parent_context.id,
|
||||
)
|
||||
|
||||
hass_.bus.async_fire(
|
||||
EVENT_CALL_SERVICE,
|
||||
{
|
||||
ATTR_DOMAIN: "climate",
|
||||
ATTR_SERVICE: "set_hvac_mode",
|
||||
"service_data": {ATTR_ENTITY_ID: "climate.living_room"},
|
||||
},
|
||||
context=parent_context,
|
||||
)
|
||||
hass_.bus.async_fire(
|
||||
EVENT_CALL_SERVICE,
|
||||
{
|
||||
ATTR_DOMAIN: "homeassistant",
|
||||
ATTR_SERVICE: "turn_on",
|
||||
"service_data": {ATTR_ENTITY_ID: "switch.heater"},
|
||||
},
|
||||
context=child_context,
|
||||
)
|
||||
hass_.states.async_set(
|
||||
"climate.living_room",
|
||||
"heat",
|
||||
{ATTR_FRIENDLY_NAME: "Living Room Thermostat"},
|
||||
context=parent_context,
|
||||
)
|
||||
hass_.states.async_set(
|
||||
"switch.heater",
|
||||
STATE_ON,
|
||||
{ATTR_FRIENDLY_NAME: "Heater"},
|
||||
context=child_context,
|
||||
)
|
||||
return parent_context, child_context
|
||||
|
||||
|
||||
def assert_thermostat_context_chain_events(
|
||||
events: list[dict[str, Any]], parent_context: Context
|
||||
) -> None:
|
||||
"""Assert the logbook events for a thermostat context chain.
|
||||
|
||||
Verifies that climate and switch state changes have correct
|
||||
state, user attribution, and service call context.
|
||||
"""
|
||||
climate_entries = [e for e in events if e.get("entity_id") == "climate.living_room"]
|
||||
assert len(climate_entries) == 1
|
||||
assert climate_entries[0]["state"] == "heat"
|
||||
assert climate_entries[0]["context_user_id"] == parent_context.user_id
|
||||
assert climate_entries[0]["context_event_type"] == EVENT_CALL_SERVICE
|
||||
assert climate_entries[0]["context_domain"] == "climate"
|
||||
assert climate_entries[0]["context_service"] == "set_hvac_mode"
|
||||
|
||||
heater_entries = [e for e in events if e.get("entity_id") == "switch.heater"]
|
||||
assert len(heater_entries) == 1
|
||||
assert heater_entries[0]["state"] == "on"
|
||||
assert heater_entries[0]["context_user_id"] == parent_context.user_id
|
||||
assert heater_entries[0]["context_event_type"] == EVENT_CALL_SERVICE
|
||||
assert heater_entries[0]["context_domain"] == "homeassistant"
|
||||
assert heater_entries[0]["context_service"] == "turn_on"
|
||||
|
||||
|
||||
def mock_humanify(hass_, rows):
|
||||
"""Wrap humanify with mocked logbook objects."""
|
||||
entity_name_cache = processor.EntityNameCache(hass_)
|
||||
@@ -89,5 +189,6 @@ def mock_humanify(hass_, rows):
|
||||
ent_reg,
|
||||
logbook_run,
|
||||
context_augmenter,
|
||||
None,
|
||||
),
|
||||
)
|
||||
|
||||
@@ -20,6 +20,7 @@ from homeassistant.components.logbook.models import EventAsRow, LazyEventPartial
|
||||
from homeassistant.components.logbook.processor import EventProcessor
|
||||
from homeassistant.components.logbook.queries.common import PSEUDO_EVENT_STATE_CHANGED
|
||||
from homeassistant.components.recorder import Recorder
|
||||
from homeassistant.components.recorder.models import ulid_to_bytes_or_none
|
||||
from homeassistant.components.script import EVENT_SCRIPT_STARTED
|
||||
from homeassistant.components.sensor import SensorStateClass
|
||||
from homeassistant.const import (
|
||||
@@ -41,13 +42,19 @@ from homeassistant.const import (
|
||||
STATE_OFF,
|
||||
STATE_ON,
|
||||
)
|
||||
from homeassistant.core import Event, HomeAssistant
|
||||
from homeassistant.core import Context, Event, HomeAssistant
|
||||
from homeassistant.helpers import device_registry as dr, entity_registry as er
|
||||
from homeassistant.helpers.entityfilter import CONF_ENTITY_GLOBS
|
||||
from homeassistant.setup import async_setup_component
|
||||
from homeassistant.util import dt as dt_util
|
||||
|
||||
from .common import MockRow, mock_humanify
|
||||
from .common import (
|
||||
MockRow,
|
||||
assert_thermostat_context_chain_events,
|
||||
mock_humanify,
|
||||
setup_thermostat_context_test_entities,
|
||||
simulate_thermostat_context_chain,
|
||||
)
|
||||
|
||||
from tests.common import MockConfigEntry, async_capture_events, mock_platform
|
||||
from tests.components.recorder.common import (
|
||||
@@ -3002,3 +3009,346 @@ async def test_logbook_with_non_iterable_entity_filter(hass: HomeAssistant) -> N
|
||||
},
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("recorder_mock")
|
||||
async def test_logbook_user_id_from_parent_context(
|
||||
hass: HomeAssistant, hass_client: ClientSessionGenerator
|
||||
) -> None:
|
||||
"""Test user attribution is inherited through the full context chain.
|
||||
|
||||
Simulates the generic_thermostat pattern:
|
||||
1. User calls set_hvac_mode → parent context (has user_id)
|
||||
- Climate state changes off → heat (parent context)
|
||||
2. Thermostat calls homeassistant.turn_on → child context (no user_id)
|
||||
- SERVICE_CALL event fired (child context)
|
||||
3. Switch state changes off → on (child context)
|
||||
|
||||
All entries should have user_id attributed, either directly (step 1)
|
||||
or inherited from the parent context (steps 2-3).
|
||||
"""
|
||||
await asyncio.gather(
|
||||
*[
|
||||
async_setup_component(hass, comp, {})
|
||||
for comp in ("homeassistant", "logbook")
|
||||
]
|
||||
)
|
||||
|
||||
await async_recorder_block_till_done(hass)
|
||||
|
||||
setup_thermostat_context_test_entities(hass)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
parent_context, _ = simulate_thermostat_context_chain(hass)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
await async_wait_recording_done(hass)
|
||||
|
||||
client = await hass_client()
|
||||
|
||||
start = dt_util.utcnow().date()
|
||||
start_date = datetime(start.year, start.month, start.day, tzinfo=dt_util.UTC)
|
||||
end_time = start_date + timedelta(hours=24)
|
||||
|
||||
response = await client.get(
|
||||
f"/api/logbook/{start_date.isoformat()}",
|
||||
params={"end_time": end_time.isoformat()},
|
||||
)
|
||||
assert response.status == HTTPStatus.OK
|
||||
json_dict = await response.json()
|
||||
|
||||
assert_thermostat_context_chain_events(json_dict, parent_context)
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("recorder_mock")
|
||||
async def test_logbook_user_id_from_parent_context_state_changes_only(
|
||||
hass: HomeAssistant, hass_client: ClientSessionGenerator
|
||||
) -> None:
|
||||
"""Test user attribution is inherited when only state changes are present.
|
||||
|
||||
Same chain as the full test but without the EVENT_CALL_SERVICE event.
|
||||
This exercises the code path where context_lookup resolves the child
|
||||
context to the state change row itself, and augment walks up to the
|
||||
parent state change.
|
||||
"""
|
||||
await asyncio.gather(
|
||||
*[
|
||||
async_setup_component(hass, comp, {})
|
||||
for comp in ("homeassistant", "logbook")
|
||||
]
|
||||
)
|
||||
|
||||
await async_recorder_block_till_done(hass)
|
||||
|
||||
# Set initial states so that subsequent changes are real state transitions
|
||||
hass.states.async_set(
|
||||
"climate.living_room",
|
||||
"off",
|
||||
{ATTR_FRIENDLY_NAME: "Living Room Thermostat"},
|
||||
)
|
||||
hass.states.async_set("switch.heater", STATE_OFF)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# Parent context with user_id
|
||||
parent_context = ha.Context(
|
||||
id="01GTDGKBCH00GW0X476W5TVAAA",
|
||||
user_id="b400facee45711eaa9308bfd3d19e474",
|
||||
)
|
||||
|
||||
# Climate state change with the parent context
|
||||
hass.states.async_set(
|
||||
"climate.living_room",
|
||||
"heat",
|
||||
{ATTR_FRIENDLY_NAME: "Living Room Thermostat"},
|
||||
context=parent_context,
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# Child context WITHOUT user_id, no service call event
|
||||
child_context = ha.Context(
|
||||
id="01GTDGKBCH00GW0X476W5TVDDD",
|
||||
parent_id="01GTDGKBCH00GW0X476W5TVAAA",
|
||||
)
|
||||
|
||||
# Switch state change with the child context
|
||||
hass.states.async_set(
|
||||
"switch.heater",
|
||||
STATE_ON,
|
||||
{ATTR_FRIENDLY_NAME: "Heater"},
|
||||
context=child_context,
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# Climate updates again in response to switch state change
|
||||
hass.states.async_set(
|
||||
"climate.living_room",
|
||||
"heat",
|
||||
{ATTR_FRIENDLY_NAME: "Living Room Thermostat"},
|
||||
context=child_context,
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
await async_wait_recording_done(hass)
|
||||
|
||||
client = await hass_client()
|
||||
|
||||
start = dt_util.utcnow().date()
|
||||
start_date = datetime(start.year, start.month, start.day, tzinfo=dt_util.UTC)
|
||||
end_time = start_date + timedelta(hours=24)
|
||||
|
||||
response = await client.get(
|
||||
f"/api/logbook/{start_date.isoformat()}",
|
||||
params={"end_time": end_time.isoformat()},
|
||||
)
|
||||
assert response.status == HTTPStatus.OK
|
||||
json_dict = await response.json()
|
||||
|
||||
# Switch state change should be attributed to the climate entity
|
||||
# and inherit user_id from the parent context
|
||||
heater_entries = [
|
||||
entry for entry in json_dict if entry.get("entity_id") == "switch.heater"
|
||||
]
|
||||
assert len(heater_entries) == 1
|
||||
|
||||
heater_entry = heater_entries[0]
|
||||
assert heater_entry["context_entity_id"] == "climate.living_room"
|
||||
assert heater_entry["context_entity_id_name"] == "Living Room Thermostat"
|
||||
assert heater_entry["context_state"] == "heat"
|
||||
assert heater_entry["context_user_id"] == "b400facee45711eaa9308bfd3d19e474"
|
||||
|
||||
|
||||
async def test_context_user_ids_lru_eviction(
|
||||
hass: HomeAssistant,
|
||||
) -> None:
|
||||
"""Test that the parent context user-id cache is bounded by LRU eviction.
|
||||
|
||||
The cache must keep memory bounded under sustained load. New entries
|
||||
arriving after the cap evict the least recently used entries. An
|
||||
early parent context whose entry has been evicted should no longer
|
||||
contribute its user_id to a later child state change.
|
||||
"""
|
||||
user_id = "b400facee45711eaa9308bfd3d19e474"
|
||||
early_parent_context = ha.Context(
|
||||
id="01GTDGKBCH00GW0X476W5TVAAA",
|
||||
user_id=user_id,
|
||||
)
|
||||
child_context = ha.Context(
|
||||
id="01GTDGKBCH00GW0X476W5TVDDD",
|
||||
parent_id=early_parent_context.id,
|
||||
)
|
||||
|
||||
logbook_run = logbook.processor.LogbookRun(
|
||||
context_lookup={None: None},
|
||||
external_events={},
|
||||
event_cache=logbook.processor.EventCache({}),
|
||||
entity_name_cache=logbook.processor.EntityNameCache(hass),
|
||||
include_entity_name=True,
|
||||
timestamp=False,
|
||||
memoize_new_contexts=False,
|
||||
for_live_stream=True,
|
||||
)
|
||||
context_augmenter = logbook.processor.ContextAugmenter(logbook_run)
|
||||
ent_reg = er.async_get(hass)
|
||||
|
||||
processor = logbook.processor.EventProcessor.__new__(
|
||||
logbook.processor.EventProcessor
|
||||
)
|
||||
processor.hass = hass
|
||||
processor.ent_reg = ent_reg
|
||||
processor.logbook_run = logbook_run
|
||||
processor.context_augmenter = context_augmenter
|
||||
|
||||
hass.states.async_set("switch.heater", STATE_OFF)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# Seed: the early parent SERVICE_CALL event populates the cache.
|
||||
parent_row = MockRow(
|
||||
EVENT_CALL_SERVICE,
|
||||
{
|
||||
ATTR_DOMAIN: "climate",
|
||||
ATTR_SERVICE: "set_hvac_mode",
|
||||
"service_data": {ATTR_ENTITY_ID: "climate.living_room"},
|
||||
},
|
||||
context=early_parent_context,
|
||||
)
|
||||
parent_row.context_only = True
|
||||
parent_row.icon = None
|
||||
processor.humanify([parent_row])
|
||||
assert (
|
||||
ulid_to_bytes_or_none(early_parent_context.id) in logbook_run.context_user_ids
|
||||
)
|
||||
|
||||
# Flood the cache with MAX+1 unrelated parent contexts so the early
|
||||
# parent is evicted from the front of the LRU.
|
||||
filler_rows = []
|
||||
for index in range(logbook.processor.MAX_CONTEXT_USER_IDS_CACHE + 1):
|
||||
filler_context = ha.Context(
|
||||
user_id=f"ffffffff{index:024x}"[:32],
|
||||
)
|
||||
filler_row = MockRow(
|
||||
EVENT_CALL_SERVICE,
|
||||
{
|
||||
ATTR_DOMAIN: "test",
|
||||
ATTR_SERVICE: "noop",
|
||||
"service_data": {},
|
||||
},
|
||||
context=filler_context,
|
||||
)
|
||||
filler_row.context_only = True
|
||||
filler_row.icon = None
|
||||
filler_rows.append(filler_row)
|
||||
processor.humanify(filler_rows)
|
||||
|
||||
assert (
|
||||
len(logbook_run.context_user_ids)
|
||||
== logbook.processor.MAX_CONTEXT_USER_IDS_CACHE
|
||||
)
|
||||
assert (
|
||||
ulid_to_bytes_or_none(early_parent_context.id)
|
||||
not in logbook_run.context_user_ids
|
||||
)
|
||||
|
||||
# The child state change can no longer inherit the early parent's user_id
|
||||
# because that entry was evicted.
|
||||
child_row = MockRow(
|
||||
PSEUDO_EVENT_STATE_CHANGED,
|
||||
context=child_context,
|
||||
)
|
||||
child_row.state = STATE_ON
|
||||
child_row.entity_id = "switch.heater"
|
||||
child_row.icon = None
|
||||
results = processor.humanify([child_row])
|
||||
|
||||
heater_entries = [e for e in results if e.get("entity_id") == "switch.heater"]
|
||||
assert len(heater_entries) == 1
|
||||
assert "context_user_id" not in heater_entries[0]
|
||||
|
||||
|
||||
async def test_parent_user_attribution_does_not_use_origin_event_fallback(
|
||||
hass: HomeAssistant,
|
||||
) -> None:
|
||||
"""Test that parent context lookup doesn't fall back to origin_event.
|
||||
|
||||
ContextAugmenter.get_context() has a fallback: when a context_id isn't in
|
||||
context_lookup, it returns async_event_to_row(row.context.origin_event).
|
||||
This fallback uses the *child row's* origin event, not the parent's,
|
||||
so it can attribute the wrong user_id to a child context.
|
||||
|
||||
In practice this scenario is unlikely — child contexts don't carry a
|
||||
user_id, so the origin_event fallback would return None for user_id
|
||||
anyway. We guard against it nevertheless to ensure the lookup is
|
||||
semantically correct: the parent context should only be resolved via
|
||||
context_lookup, never via an unrelated fallback path.
|
||||
|
||||
Scenario:
|
||||
- A user_id is set directly on child_context (not realistic, but
|
||||
exercises the fallback path).
|
||||
- Creating an Event with that context sets context.origin_event,
|
||||
which carries the user_id.
|
||||
- A state change for switch.heater uses that same child_context.
|
||||
- The parent context is NOT in context_lookup (simulating live stream).
|
||||
- The parent user_id should NOT be resolved via the origin_event fallback.
|
||||
"""
|
||||
wrong_user_id = "aaaaaaaaaaa711eaa9308bfd3d19e474"
|
||||
parent_context = Context(id="01GTDGKBCH00GW0X476W5TVAAA")
|
||||
|
||||
# Child context whose origin_event will carry wrong_user_id
|
||||
child_context = Context(
|
||||
id="01GTDGKBCH00GW0X476W5TVDDD",
|
||||
parent_id=parent_context.id,
|
||||
user_id=wrong_user_id,
|
||||
)
|
||||
# Creating an Event sets context.origin_event = self, which carries
|
||||
# wrong_user_id via child_context.user_id.
|
||||
Event(EVENT_CALL_SERVICE, {}, context=child_context)
|
||||
assert child_context.origin_event is not None
|
||||
|
||||
hass.states.async_set("switch.heater", STATE_OFF)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
logbook_run = logbook.processor.LogbookRun(
|
||||
context_lookup={None: None},
|
||||
external_events={},
|
||||
event_cache=logbook.processor.EventCache({}),
|
||||
entity_name_cache=logbook.processor.EntityNameCache(hass),
|
||||
include_entity_name=True,
|
||||
timestamp=False,
|
||||
memoize_new_contexts=False,
|
||||
)
|
||||
context_augmenter = logbook.processor.ContextAugmenter(logbook_run)
|
||||
ent_reg = er.async_get(hass)
|
||||
|
||||
processor = logbook.processor.EventProcessor.__new__(
|
||||
logbook.processor.EventProcessor
|
||||
)
|
||||
processor.hass = hass
|
||||
processor.ent_reg = ent_reg
|
||||
processor.logbook_run = logbook_run
|
||||
processor.context_augmenter = context_augmenter
|
||||
|
||||
# Build a child state-change EventAsRow with the child_context.
|
||||
# The row itself has no user_id (context_user_id_bin=None) but
|
||||
# the child_context.origin_event carries wrong_user_id.
|
||||
child_row = EventAsRow(
|
||||
row_id=1,
|
||||
event_type=PSEUDO_EVENT_STATE_CHANGED,
|
||||
event_data=None,
|
||||
time_fired_ts=dt_util.utcnow().timestamp(),
|
||||
context_id_bin=ulid_to_bytes_or_none(child_context.id),
|
||||
context_user_id_bin=None,
|
||||
context_parent_id_bin=ulid_to_bytes_or_none(child_context.parent_id),
|
||||
state=STATE_ON,
|
||||
entity_id="switch.heater",
|
||||
icon=None,
|
||||
context_only=False,
|
||||
data={},
|
||||
context=child_context,
|
||||
)
|
||||
|
||||
results = processor.humanify([child_row])
|
||||
|
||||
heater_entries = [e for e in results if e.get("entity_id") == "switch.heater"]
|
||||
assert len(heater_entries) == 1
|
||||
# The parent context is unknown — no user should be attributed.
|
||||
# If get_context's origin_event fallback is used, wrong_user_id leaks in.
|
||||
assert "context_user_id" not in heater_entries[0]
|
||||
|
||||
@@ -24,11 +24,13 @@ from homeassistant.const import (
|
||||
ATTR_ENTITY_ID,
|
||||
ATTR_FRIENDLY_NAME,
|
||||
ATTR_NAME,
|
||||
ATTR_SERVICE,
|
||||
ATTR_UNIT_OF_MEASUREMENT,
|
||||
CONF_DOMAINS,
|
||||
CONF_ENTITIES,
|
||||
CONF_EXCLUDE,
|
||||
CONF_INCLUDE,
|
||||
EVENT_CALL_SERVICE,
|
||||
EVENT_HOMEASSISTANT_FINAL_WRITE,
|
||||
EVENT_HOMEASSISTANT_START,
|
||||
STATE_OFF,
|
||||
@@ -41,6 +43,12 @@ from homeassistant.helpers.event import async_track_state_change_event
|
||||
from homeassistant.setup import async_setup_component
|
||||
from homeassistant.util import dt as dt_util
|
||||
|
||||
from .common import (
|
||||
assert_thermostat_context_chain_events,
|
||||
setup_thermostat_context_test_entities,
|
||||
simulate_thermostat_context_chain,
|
||||
)
|
||||
|
||||
from tests.common import MockConfigEntry, async_fire_time_changed
|
||||
from tests.components.recorder.common import (
|
||||
async_block_recorder,
|
||||
@@ -3202,3 +3210,420 @@ async def test_consistent_stream_and_recorder_filtering(
|
||||
|
||||
results = response["result"]
|
||||
assert len(results) == result_count
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("recorder_mock")
|
||||
async def test_logbook_stream_user_id_from_parent_context(
|
||||
hass: HomeAssistant, hass_ws_client: WebSocketGenerator
|
||||
) -> None:
|
||||
"""Test user attribution from parent context in live event stream.
|
||||
|
||||
Simulates the generic_thermostat pattern where a child context
|
||||
(no user_id) is created for the heater service call, while the
|
||||
parent context (from the user's set_hvac_mode call) has the user_id.
|
||||
|
||||
The live stream uses memoize_new_contexts=False, so context_lookup
|
||||
is empty. User_id must be resolved via the context_user_ids map.
|
||||
"""
|
||||
await asyncio.gather(
|
||||
*[
|
||||
async_setup_component(hass, comp, {})
|
||||
for comp in ("homeassistant", "logbook")
|
||||
]
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
setup_thermostat_context_test_entities(hass)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
await async_wait_recording_done(hass)
|
||||
now = dt_util.utcnow()
|
||||
websocket_client = await hass_ws_client()
|
||||
await websocket_client.send_json(
|
||||
{"id": 7, "type": "logbook/event_stream", "start_time": now.isoformat()}
|
||||
)
|
||||
|
||||
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
|
||||
assert msg["id"] == 7
|
||||
assert msg["type"] == TYPE_RESULT
|
||||
assert msg["success"]
|
||||
|
||||
# Receive historical events (partial) and sync message
|
||||
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
|
||||
assert msg["event"]["partial"] is True
|
||||
|
||||
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
|
||||
assert msg["event"]["events"] == []
|
||||
|
||||
# Simulate the full generic_thermostat chain as live events
|
||||
parent_context, _ = simulate_thermostat_context_chain(hass)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
|
||||
assert msg["id"] == 7
|
||||
assert msg["type"] == "event"
|
||||
|
||||
assert_thermostat_context_chain_events(msg["event"]["events"], parent_context)
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("recorder_mock")
|
||||
async def test_logbook_stream_user_id_from_parent_context_filtered(
|
||||
hass: HomeAssistant, hass_ws_client: WebSocketGenerator
|
||||
) -> None:
|
||||
"""Test user attribution from parent context in filtered live event stream.
|
||||
|
||||
Same scenario as test_logbook_stream_user_id_from_parent_context but
|
||||
with entity_ids in the subscription, matching what the frontend does.
|
||||
This exercises the filtered event subscription path where
|
||||
EVENT_CALL_SERVICE must be explicitly included and matched via
|
||||
service_data.
|
||||
"""
|
||||
await asyncio.gather(
|
||||
*[
|
||||
async_setup_component(hass, comp, {})
|
||||
for comp in ("homeassistant", "logbook")
|
||||
]
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
setup_thermostat_context_test_entities(hass)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
await async_wait_recording_done(hass)
|
||||
now = dt_util.utcnow()
|
||||
websocket_client = await hass_ws_client()
|
||||
# Subscribe with entity_ids, matching what the frontend logbook card does
|
||||
end_time = now + timedelta(hours=3)
|
||||
await websocket_client.send_json(
|
||||
{
|
||||
"id": 7,
|
||||
"type": "logbook/event_stream",
|
||||
"start_time": now.isoformat(),
|
||||
"end_time": end_time.isoformat(),
|
||||
"entity_ids": ["climate.living_room", "switch.heater"],
|
||||
}
|
||||
)
|
||||
|
||||
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
|
||||
assert msg["id"] == 7
|
||||
assert msg["type"] == TYPE_RESULT
|
||||
assert msg["success"]
|
||||
|
||||
# Receive historical events (partial) and sync message
|
||||
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
|
||||
assert msg["event"]["partial"] is True
|
||||
|
||||
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
|
||||
assert msg["event"]["events"] == []
|
||||
|
||||
# Simulate the full chain as live events
|
||||
parent_context, _ = simulate_thermostat_context_chain(hass)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
|
||||
assert msg["id"] == 7
|
||||
assert msg["type"] == "event"
|
||||
|
||||
assert_thermostat_context_chain_events(msg["event"]["events"], parent_context)
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("recorder_mock")
|
||||
async def test_logbook_stream_parent_context_bridges_historical_to_live(
|
||||
hass: HomeAssistant, hass_ws_client: WebSocketGenerator
|
||||
) -> None:
|
||||
"""Test parent-context user attribution bridges the historical→live switch.
|
||||
|
||||
Scenario: a user fires a service call (parent context) that triggers a
|
||||
child state change BEFORE the websocket subscription is opened. The
|
||||
parent's call_service event lives only in the historical window. After
|
||||
the historical backfill completes and the stream switches to live, a
|
||||
NEW state change reusing the same child context (whose parent_id points
|
||||
back at the historical parent) fires. The live event must inherit the
|
||||
user_id from the historical parent — which can only happen if the
|
||||
historical pre-pass populated the persistent LRU cache so the live
|
||||
consumer can find it.
|
||||
"""
|
||||
await asyncio.gather(
|
||||
*[
|
||||
async_setup_component(hass, comp, {})
|
||||
for comp in ("homeassistant", "logbook")
|
||||
]
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
setup_thermostat_context_test_entities(hass)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
user_id = "b400facee45711eaa9308bfd3d19e474"
|
||||
parent_context = core.Context(
|
||||
id="01GTDGKBCH00GW0X476W5TVAAA",
|
||||
user_id=user_id,
|
||||
)
|
||||
child_context = core.Context(
|
||||
id="01GTDGKBCH00GW0X476W5TVDDD",
|
||||
parent_id=parent_context.id,
|
||||
)
|
||||
|
||||
# Fire the parent service call and the first child state change BEFORE
|
||||
# the websocket subscription. These will live in the historical window.
|
||||
start_time = dt_util.utcnow()
|
||||
hass.bus.async_fire(
|
||||
EVENT_CALL_SERVICE,
|
||||
{
|
||||
ATTR_DOMAIN: "climate",
|
||||
ATTR_SERVICE: "set_hvac_mode",
|
||||
"service_data": {ATTR_ENTITY_ID: "climate.living_room"},
|
||||
},
|
||||
context=parent_context,
|
||||
)
|
||||
hass.bus.async_fire(
|
||||
EVENT_CALL_SERVICE,
|
||||
{
|
||||
ATTR_DOMAIN: "homeassistant",
|
||||
ATTR_SERVICE: "turn_on",
|
||||
"service_data": {ATTR_ENTITY_ID: "switch.heater"},
|
||||
},
|
||||
context=child_context,
|
||||
)
|
||||
hass.states.async_set(
|
||||
"switch.heater",
|
||||
STATE_ON,
|
||||
{ATTR_FRIENDLY_NAME: "Heater"},
|
||||
context=child_context,
|
||||
)
|
||||
await async_wait_recording_done(hass)
|
||||
|
||||
# Open a filtered subscription. The filtered query path excludes the
|
||||
# parent's set_hvac_mode call_service from the historical row stream
|
||||
# because its event_data references climate.living_room, not
|
||||
# switch.heater. The pre-pass must fetch the parent and populate the
|
||||
# persistent LRU so the upcoming live event can resolve attribution.
|
||||
websocket_client = await hass_ws_client()
|
||||
await websocket_client.send_json(
|
||||
{
|
||||
"id": 7,
|
||||
"type": "logbook/event_stream",
|
||||
"start_time": start_time.isoformat(),
|
||||
"entity_ids": ["switch.heater"],
|
||||
}
|
||||
)
|
||||
|
||||
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
|
||||
assert msg["id"] == 7
|
||||
assert msg["type"] == TYPE_RESULT
|
||||
assert msg["success"]
|
||||
|
||||
# Drain the historical backfill messages.
|
||||
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
|
||||
assert msg["event"]["partial"] is True
|
||||
historical_events = msg["event"]["events"]
|
||||
historical_heater = [
|
||||
e for e in historical_events if e.get("entity_id") == "switch.heater"
|
||||
]
|
||||
assert len(historical_heater) == 1
|
||||
assert historical_heater[0]["context_user_id"] == user_id
|
||||
|
||||
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
|
||||
assert msg["event"]["events"] == []
|
||||
|
||||
# Stream is now live. Fire a NEW switch.heater state change reusing
|
||||
# child_context — its parent_id still points at the historical parent.
|
||||
# The live consumer must resolve the user_id via the persistent LRU
|
||||
# populated during the historical pre-pass.
|
||||
hass.states.async_set(
|
||||
"switch.heater",
|
||||
STATE_OFF,
|
||||
{ATTR_FRIENDLY_NAME: "Heater"},
|
||||
context=child_context,
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
|
||||
assert msg["id"] == 7
|
||||
assert msg["type"] == "event"
|
||||
live_heater = [
|
||||
e for e in msg["event"]["events"] if e.get("entity_id") == "switch.heater"
|
||||
]
|
||||
assert len(live_heater) == 1
|
||||
assert live_heater[0]["state"] == "off"
|
||||
assert live_heater[0]["context_user_id"] == user_id
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("recorder_mock")
|
||||
async def test_logbook_get_events_user_id_from_parent_context(
|
||||
hass: HomeAssistant, hass_ws_client: WebSocketGenerator
|
||||
) -> None:
|
||||
"""Test user attribution from parent context in unfiltered historical logbook.
|
||||
|
||||
Uses logbook/get_events without entity_ids, which triggers the
|
||||
unfiltered SQL query path.
|
||||
"""
|
||||
await asyncio.gather(
|
||||
*[
|
||||
async_setup_component(hass, comp, {})
|
||||
for comp in ("homeassistant", "logbook")
|
||||
]
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
setup_thermostat_context_test_entities(hass)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
now = dt_util.utcnow()
|
||||
|
||||
parent_context, _ = simulate_thermostat_context_chain(hass)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
await async_wait_recording_done(hass)
|
||||
|
||||
websocket_client = await hass_ws_client()
|
||||
await websocket_client.send_json(
|
||||
{
|
||||
"id": 1,
|
||||
"type": "logbook/get_events",
|
||||
"start_time": now.isoformat(),
|
||||
}
|
||||
)
|
||||
response = await websocket_client.receive_json()
|
||||
assert response["success"]
|
||||
|
||||
assert_thermostat_context_chain_events(response["result"], parent_context)
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("recorder_mock")
|
||||
async def test_logbook_get_events_user_id_from_parent_context_filtered(
|
||||
hass: HomeAssistant, hass_ws_client: WebSocketGenerator
|
||||
) -> None:
|
||||
"""Test user attribution from parent context in historical logbook with entity filter.
|
||||
|
||||
Uses logbook/get_events with entity_ids, which triggers the filtered
|
||||
SQL query path. The query must also fetch parent context rows so that
|
||||
user_id can be inherited from the parent context.
|
||||
"""
|
||||
await asyncio.gather(
|
||||
*[
|
||||
async_setup_component(hass, comp, {})
|
||||
for comp in ("homeassistant", "logbook")
|
||||
]
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
setup_thermostat_context_test_entities(hass)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
now = dt_util.utcnow()
|
||||
|
||||
parent_context, _ = simulate_thermostat_context_chain(hass)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
await async_wait_recording_done(hass)
|
||||
|
||||
websocket_client = await hass_ws_client()
|
||||
await websocket_client.send_json(
|
||||
{
|
||||
"id": 1,
|
||||
"type": "logbook/get_events",
|
||||
"start_time": now.isoformat(),
|
||||
"entity_ids": ["climate.living_room", "switch.heater"],
|
||||
}
|
||||
)
|
||||
response = await websocket_client.receive_json()
|
||||
assert response["success"]
|
||||
|
||||
assert_thermostat_context_chain_events(response["result"], parent_context)
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("recorder_mock")
|
||||
async def test_logbook_stream_live_parent_service_call_only(
|
||||
hass: HomeAssistant, hass_ws_client: WebSocketGenerator
|
||||
) -> None:
|
||||
"""Test user attribution when parent context only appears on a service call.
|
||||
|
||||
In the thermostat pattern, the parent context also appears on a state
|
||||
change for climate.living_room. This test covers the case where the
|
||||
parent context ONLY fires a call_service event (no state change with
|
||||
the parent context for any subscribed entity). The live consumer must
|
||||
still resolve the child's user_id from the parent's call_service event.
|
||||
|
||||
This fails if EVENT_CALL_SERVICE is not subscribed to in the live stream.
|
||||
"""
|
||||
await asyncio.gather(
|
||||
*[
|
||||
async_setup_component(hass, comp, {})
|
||||
for comp in ("homeassistant", "logbook")
|
||||
]
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
hass.states.async_set("switch.heater", STATE_OFF)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
await async_wait_recording_done(hass)
|
||||
now = dt_util.utcnow()
|
||||
websocket_client = await hass_ws_client()
|
||||
end_time = now + timedelta(hours=3)
|
||||
await websocket_client.send_json(
|
||||
{
|
||||
"id": 7,
|
||||
"type": "logbook/event_stream",
|
||||
"start_time": now.isoformat(),
|
||||
"end_time": end_time.isoformat(),
|
||||
"entity_ids": ["switch.heater"],
|
||||
}
|
||||
)
|
||||
|
||||
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
|
||||
assert msg["id"] == 7
|
||||
assert msg["type"] == TYPE_RESULT
|
||||
assert msg["success"]
|
||||
|
||||
# Drain historical backfill
|
||||
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
|
||||
assert msg["event"]["partial"] is True
|
||||
|
||||
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
|
||||
assert msg["event"]["events"] == []
|
||||
|
||||
# Stream is now live. Fire a parent service call (no state change with
|
||||
# the parent context) followed by a child state change.
|
||||
user_id = "b400facee45711eaa9308bfd3d19e474"
|
||||
parent_context = core.Context(
|
||||
id="01GTDGKBCH00GW0X476W5TVAAA",
|
||||
user_id=user_id,
|
||||
)
|
||||
child_context = core.Context(
|
||||
id="01GTDGKBCH00GW0X476W5TVDDD",
|
||||
parent_id=parent_context.id,
|
||||
)
|
||||
|
||||
# Only the service call carries the parent context — no state change
|
||||
# with parent_context for any subscribed entity.
|
||||
hass.bus.async_fire(
|
||||
EVENT_CALL_SERVICE,
|
||||
{
|
||||
ATTR_DOMAIN: "homeassistant",
|
||||
ATTR_SERVICE: "turn_on",
|
||||
"service_data": {ATTR_ENTITY_ID: "switch.heater"},
|
||||
},
|
||||
context=parent_context,
|
||||
)
|
||||
|
||||
# Child state change with no user_id on its context
|
||||
hass.states.async_set(
|
||||
"switch.heater",
|
||||
STATE_ON,
|
||||
{ATTR_FRIENDLY_NAME: "Heater"},
|
||||
context=child_context,
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
|
||||
assert msg["id"] == 7
|
||||
assert msg["type"] == "event"
|
||||
|
||||
heater_entries = [
|
||||
e for e in msg["event"]["events"] if e.get("entity_id") == "switch.heater"
|
||||
]
|
||||
assert len(heater_entries) == 1
|
||||
assert heater_entries[0]["state"] == "on"
|
||||
assert heater_entries[0]["context_user_id"] == user_id
|
||||
|
||||
Reference in New Issue
Block a user