Compare commits

...

25 Commits

Author SHA1 Message Date
Erik
d91d05750a Revert "Don't queue service call events when not needed"
This reverts commit 6438c44895.
2026-04-13 20:00:37 +02:00
Erik
6438c44895 Don't queue service call events when not needed 2026-04-13 15:15:53 +02:00
Erik
8b5310efe9 Address comment 2026-04-13 09:52:57 +02:00
Erik
46900109e0 Revert unrelated change 2026-04-13 08:21:29 +02:00
J. Nick Koston
604c1c44e8 fix test 2026-04-11 12:15:36 -10:00
J. Nick Koston
fc8170ba12 doubled query, double bind 2026-04-11 12:13:04 -10:00
J. Nick Koston
231b69d433 note to future self 2026-04-11 12:08:53 -10:00
J. Nick Koston
1057cee988 fix refactoring error 2026-04-11 12:07:36 -10:00
J. Nick Koston
804ac0c6a0 bot comments 2026-04-11 12:03:04 -10:00
J. Nick Koston
7cd2d67cca preen 2026-04-11 12:00:37 -10:00
J. Nick Koston
f987b1485f move to queries 2026-04-11 11:56:34 -10:00
J. Nick Koston
e7adeb616a preen 2026-04-11 11:49:51 -10:00
J. Nick Koston
943bfabf6d preen 2026-04-11 11:45:52 -10:00
J. Nick Koston
c5a6b9c002 address bot comments 2026-04-11 11:44:46 -10:00
J. Nick Koston
acee1d48f1 avoid wasted work 2026-04-11 11:38:21 -10:00
J. Nick Koston
baa103acc5 cover live swithc 2026-04-11 11:31:24 -10:00
J. Nick Koston
3957e409ca write both for historical 2026-04-11 11:28:28 -10:00
J. Nick Koston
cc7e4422fd fix copilot comments 2026-04-11 11:21:18 -10:00
J. Nick Koston
99d166e6e8 use lru-dict 2026-04-11 11:17:47 -10:00
J. Nick Koston
545ead149a fix concurrent mutation issue 2026-04-11 11:03:42 -10:00
J. Nick Koston
42c85b9863 bot comment 2026-04-11 11:01:55 -10:00
J. Nick Koston
93b1401e4b post query scan to fill to avoid sparse query 2026-04-11 10:55:40 -10:00
J. Nick Koston
278a3b4928 Merge branch 'dev' into improve_logbook_parent_context_handling 2026-04-11 10:19:33 -10:00
Erik
f69b0de2c7 Adjust 2026-04-10 12:44:14 +02:00
Erik
4cd93b0e7c Improve logbook parent context handling 2026-04-01 08:49:37 +02:00
8 changed files with 1059 additions and 18 deletions

View File

@@ -11,7 +11,9 @@ from homeassistant.const import (
ATTR_DEVICE_ID,
ATTR_DOMAIN,
ATTR_ENTITY_ID,
ATTR_SERVICE_DATA,
ATTR_UNIT_OF_MEASUREMENT,
EVENT_CALL_SERVICE,
EVENT_LOGBOOK_ENTRY,
EVENT_STATE_CHANGED,
)
@@ -104,10 +106,22 @@ def async_determine_event_types(
@callback
def extract_attr(source: Mapping[str, Any], attr: str) -> list[str]:
"""Extract an attribute as a list or string."""
def extract_attr(
event_type: EventType[Any] | str, source: Mapping[str, Any], attr: str
) -> list[str]:
"""Extract an attribute as a list or string.
For EVENT_CALL_SERVICE events, the entity_id is inside service_data,
not at the top level. Check service_data as a fallback.
"""
if (value := source.get(attr)) is None:
return []
# Early return to avoid unnecessary dict lookups for non-service events
if event_type != EVENT_CALL_SERVICE:
return []
if service_data := source.get(ATTR_SERVICE_DATA):
value = service_data.get(attr)
if value is None:
return []
if isinstance(value, list):
return value
return str(value).split(",")
@@ -135,7 +149,7 @@ def event_forwarder_filtered(
def _forward_events_filtered_by_entities_filter(event: Event) -> None:
assert entities_filter is not None
event_data = event.data
entity_ids = extract_attr(event_data, ATTR_ENTITY_ID)
entity_ids = extract_attr(event.event_type, event_data, ATTR_ENTITY_ID)
if entity_ids and not any(
entities_filter(entity_id) for entity_id in entity_ids
):
@@ -157,9 +171,12 @@ def event_forwarder_filtered(
@callback
def _forward_events_filtered_by_device_entity_ids(event: Event) -> None:
event_data = event.data
event_type = event.event_type
if entity_ids_set.intersection(
extract_attr(event_data, ATTR_ENTITY_ID)
) or device_ids_set.intersection(extract_attr(event_data, ATTR_DEVICE_ID)):
extract_attr(event_type, event_data, ATTR_ENTITY_ID)
) or device_ids_set.intersection(
extract_attr(event_type, event_data, ATTR_DEVICE_ID)
):
target(event)
return _forward_events_filtered_by_device_entity_ids

View File

@@ -162,7 +162,10 @@ def async_event_to_row(event: Event) -> EventAsRow:
# that are missing new_state or old_state
# since the logbook does not show these
new_state: State = event.data["new_state"]
context = new_state.context
# Use the event's context rather than the state's context because
# State.expire() replaces the context with a copy that loses
# origin_event, which is needed for context augmentation.
context = event.context
return EventAsRow(
row_id=hash(event),
event_type=None,

View File

@@ -3,14 +3,16 @@
from __future__ import annotations
from collections.abc import Callable, Generator, Sequence
from dataclasses import dataclass
from dataclasses import dataclass, field
from datetime import datetime as dt
import logging
import time
from typing import TYPE_CHECKING, Any
from lru import LRU
from sqlalchemy.engine import Result
from sqlalchemy.engine.row import Row
from sqlalchemy.orm import Session
from homeassistant.components.recorder import get_instance
from homeassistant.components.recorder.filters import Filters
@@ -37,6 +39,7 @@ from homeassistant.const import (
from homeassistant.core import HomeAssistant, split_entity_id
from homeassistant.helpers import entity_registry as er
from homeassistant.util import dt as dt_util
from homeassistant.util.collection import chunked_or_all
from homeassistant.util.event_type import EventType
from .const import (
@@ -80,10 +83,18 @@ from .models import (
async_event_to_row,
)
from .queries import statement_for_request
from .queries.common import PSEUDO_EVENT_STATE_CHANGED
from .queries.common import (
PSEUDO_EVENT_STATE_CHANGED,
select_context_user_ids_for_context_ids,
)
_LOGGER = logging.getLogger(__name__)
# Bound for the parent-context user-id cache — only needs to bridge the
# historical→live handoff, so the in-flight set is realistically ~tens with
# peak bursts of ~100. Ceiling bounds memory in pathological cases.
MAX_CONTEXT_USER_IDS_CACHE = 256
@dataclass(slots=True)
class LogbookRun:
@@ -99,6 +110,14 @@ class LogbookRun:
include_entity_name: bool
timestamp: bool
memoize_new_contexts: bool = True
# True when this run will switch to a live stream; gates population of
# context_user_ids (wasted work for one-shot REST/get_events callers).
for_live_stream: bool = False
# context_id -> user_id for parent context attribution; persisted across
# batches so child rows can inherit user_id from a parent seen earlier.
context_user_ids: LRU[bytes, bytes] = field(
default_factory=lambda: LRU(MAX_CONTEXT_USER_IDS_CACHE)
)
class EventProcessor:
@@ -113,6 +132,7 @@ class EventProcessor:
context_id: str | None = None,
timestamp: bool = False,
include_entity_name: bool = True,
for_live_stream: bool = False,
) -> None:
"""Init the event stream."""
assert not (context_id and (entity_ids or device_ids)), (
@@ -133,6 +153,7 @@ class EventProcessor:
entity_name_cache=EntityNameCache(self.hass),
include_entity_name=include_entity_name,
timestamp=timestamp,
for_live_stream=for_live_stream,
)
self.context_augmenter = ContextAugmenter(self.logbook_run)
@@ -180,13 +201,67 @@ class EventProcessor:
self.filters,
self.context_id,
)
return self.humanify(
execute_stmt_lambda_element(session, stmt, orm_rows=False)
rows = execute_stmt_lambda_element(session, stmt, orm_rows=False)
query_parent_user_ids: dict[bytes, bytes] | None = None
if self.entity_ids or self.device_ids:
# Filtered queries exclude parent call_service rows for
# unrelated targets, so child contexts lose user attribution
# without a pre-pass. all_stmt already includes them.
rows = list(rows)
query_parent_user_ids = self._fetch_parent_user_ids(
session, rows, instance.max_bind_vars
)
return self.humanify(rows, query_parent_user_ids)
def _fetch_parent_user_ids(
self,
session: Session,
rows: list[Row],
max_bind_vars: int,
) -> dict[bytes, bytes] | None:
"""Resolve parent-context user_ids for rows in a filtered query.
Done in Python rather than as a SQL union branch because the
context_parent_id_bin column is sparsely populated — scanning the
States table for non-null parents costs ~40% of the overall query
on real datasets. Here we collect only the parent ids we actually
need and fetch them via an indexed point-lookup on context_id_bin.
"""
cache = self.logbook_run.context_user_ids
pending: set[bytes] = {
parent_id
for row in rows
if (parent_id := row[CONTEXT_PARENT_ID_BIN_POS]) and parent_id not in cache
}
if not pending:
return None
query_parent_user_ids: dict[bytes, bytes] = {}
# The lambda statement unions events and states, so each id appears
# in two IN clauses — halve the chunk size to stay under the
# database's max bind variable count.
for pending_chunk in chunked_or_all(pending, max_bind_vars // 2):
# Schema allows NULL but the query's WHERE clauses exclude it;
# explicit checks satisfy the type checker.
query_parent_user_ids.update(
{
parent_id: user_id
for parent_id, user_id in execute_stmt_lambda_element(
session,
select_context_user_ids_for_context_ids(pending_chunk),
orm_rows=False,
)
if parent_id is not None and user_id is not None
}
)
if self.logbook_run.for_live_stream:
cache.update(query_parent_user_ids)
return query_parent_user_ids
def humanify(
self, rows: Generator[EventAsRow] | Sequence[Row] | Result
) -> list[dict[str, str]]:
self,
rows: Generator[EventAsRow] | Sequence[Row] | Result,
query_parent_user_ids: dict[bytes, bytes] | None = None,
) -> list[dict[str, Any]]:
"""Humanify rows."""
return list(
_humanify(
@@ -195,6 +270,7 @@ class EventProcessor:
self.ent_reg,
self.logbook_run,
self.context_augmenter,
query_parent_user_ids,
)
)
@@ -205,6 +281,7 @@ def _humanify(
ent_reg: er.EntityRegistry,
logbook_run: LogbookRun,
context_augmenter: ContextAugmenter,
query_parent_user_ids: dict[bytes, bytes] | None,
) -> Generator[dict[str, Any]]:
"""Generate a converted list of events into entries."""
# Continuous sensors, will be excluded from the logbook
@@ -220,11 +297,21 @@ def _humanify(
context_id_bin: bytes
data: dict[str, Any]
context_user_ids = logbook_run.context_user_ids
# Skip the LRU write on one-shot runs — the LogbookRun is discarded.
populate_context_user_ids = logbook_run.for_live_stream
# Process rows
for row in rows:
context_id_bin = row[CONTEXT_ID_BIN_POS]
if memoize_new_contexts and context_id_bin not in context_lookup:
context_lookup[context_id_bin] = row
if (
populate_context_user_ids
and (context_user_id_bin := row[CONTEXT_USER_ID_BIN_POS])
and context_id_bin not in context_user_ids
):
context_user_ids[context_id_bin] = context_user_id_bin
if row[CONTEXT_ONLY_POS]:
continue
event_type = row[EVENT_TYPE_POS]
@@ -307,6 +394,28 @@ def _humanify(
):
context_augmenter.augment(data, context_row)
# Fall back to the parent context for child contexts that inherit
# user attribution (e.g., generic_thermostat -> switch turn_on).
# Read from context_lookup directly instead of get_context() to
# avoid the origin_event fallback which would return the *child*
# row's origin event, not the parent's.
if CONTEXT_USER_ID not in data and (
context_parent_id_bin := row[CONTEXT_PARENT_ID_BIN_POS]
):
parent_user_id_bin: bytes | None = context_user_ids.get(
context_parent_id_bin
)
if parent_user_id_bin is None and query_parent_user_ids is not None:
parent_user_id_bin = query_parent_user_ids.get(context_parent_id_bin)
if (
parent_user_id_bin is None
and (parent_row := context_lookup.get(context_parent_id_bin))
is not None
):
parent_user_id_bin = parent_row[CONTEXT_USER_ID_BIN_POS]
if parent_user_id_bin:
data[CONTEXT_USER_ID] = bytes_to_uuid_hex_or_none(parent_user_id_bin)
yield data

View File

@@ -2,12 +2,14 @@
from __future__ import annotations
from collections.abc import Collection
from typing import Final
import sqlalchemy
from sqlalchemy import select
from sqlalchemy import lambda_stmt, select, union_all
from sqlalchemy.sql.elements import BooleanClauseList, ColumnElement
from sqlalchemy.sql.expression import literal
from sqlalchemy.sql.lambdas import StatementLambdaElement
from sqlalchemy.sql.selectable import Select
from homeassistant.components.recorder.db_schema import (
@@ -122,6 +124,26 @@ def select_events_context_id_subquery(
)
def select_context_user_ids_for_context_ids(
context_ids: Collection[bytes],
) -> StatementLambdaElement:
"""Select (context_id_bin, context_user_id_bin) for the given context ids.
Union of events and states since a parent context can originate from
either table (e.g., a state set directly via the API).
"""
return lambda_stmt(
lambda: union_all(
select(Events.context_id_bin, Events.context_user_id_bin)
.where(Events.context_id_bin.in_(context_ids))
.where(Events.context_user_id_bin.is_not(None)),
select(States.context_id_bin, States.context_user_id_bin)
.where(States.context_id_bin.in_(context_ids))
.where(States.context_user_id_bin.is_not(None)),
)
)
def select_events_context_only() -> Select:
"""Generate an events query that mark them as for context_only.

View File

@@ -14,11 +14,13 @@ import voluptuous as vol
from homeassistant.components import websocket_api
from homeassistant.components.recorder import get_instance
from homeassistant.components.websocket_api import ActiveConnection, messages
from homeassistant.const import EVENT_CALL_SERVICE
from homeassistant.core import CALLBACK_TYPE, Event, HomeAssistant, callback
from homeassistant.helpers.event import async_track_point_in_utc_time
from homeassistant.helpers.json import json_bytes
from homeassistant.util import dt as dt_util
from homeassistant.util.async_ import create_eager_task
from homeassistant.util.event_type import EventType
from .const import DOMAIN
from .helpers import (
@@ -289,6 +291,8 @@ async def ws_event_stream(
return
event_types = async_determine_event_types(hass, entity_ids, device_ids)
# A past end_time makes this a one-shot fetch that never goes live.
will_go_live = not (end_time and end_time <= utc_now)
event_processor = EventProcessor(
hass,
event_types,
@@ -297,6 +301,7 @@ async def ws_event_stream(
None,
timestamp=True,
include_entity_name=False,
for_live_stream=will_go_live,
)
if end_time and end_time <= utc_now:
@@ -357,11 +362,20 @@ async def ws_event_stream(
logbook_config: LogbookConfig = hass.data[DOMAIN]
entities_filter = logbook_config.entity_filter
# Live subscription needs call_service events so the live consumer can
# cache parent user_ids as they fire. Historical queries don't — the
# context_only join fetches them by context_id regardless of type.
# Unfiltered streams already include it via BUILT_IN_EVENTS.
live_event_types: tuple[EventType[Any] | str, ...] = (
event_types
if EVENT_CALL_SERVICE in event_types
else (*event_types, EVENT_CALL_SERVICE)
)
async_subscribe_events(
hass,
subscriptions,
_queue_or_cancel,
event_types,
live_event_types,
entities_filter,
entity_ids,
device_ids,

View File

@@ -13,7 +13,16 @@ from homeassistant.components.recorder.models import (
ulid_to_bytes_or_none,
uuid_hex_to_bytes_or_none,
)
from homeassistant.core import Context
from homeassistant.const import (
ATTR_DOMAIN,
ATTR_ENTITY_ID,
ATTR_FRIENDLY_NAME,
ATTR_SERVICE,
EVENT_CALL_SERVICE,
STATE_OFF,
STATE_ON,
)
from homeassistant.core import Context, HomeAssistant
from homeassistant.helpers import entity_registry as er
from homeassistant.helpers.json import JSONEncoder
from homeassistant.util import dt as dt_util
@@ -65,6 +74,97 @@ class MockRow:
return process_timestamp_to_utc_isoformat(self.time_fired)
def setup_thermostat_context_test_entities(hass_: HomeAssistant) -> None:
"""Set up initial states for the thermostat context chain test entities."""
hass_.states.async_set(
"climate.living_room",
"off",
{ATTR_FRIENDLY_NAME: "Living Room Thermostat"},
)
hass_.states.async_set("switch.heater", STATE_OFF)
def simulate_thermostat_context_chain(
hass_: HomeAssistant,
user_id: str = "b400facee45711eaa9308bfd3d19e474",
) -> tuple[Context, Context]:
"""Simulate the generic_thermostat context chain.
Fires events in the realistic order:
1. EVENT_CALL_SERVICE for set_hvac_mode (parent context)
2. EVENT_CALL_SERVICE for homeassistant.turn_on (child context)
3. Climate state changes off → heat (parent context)
4. Switch state changes off → on (child context)
Returns the (parent_context, child_context) tuple.
"""
parent_context = Context(
id="01GTDGKBCH00GW0X476W5TVAAA",
user_id=user_id,
)
child_context = Context(
id="01GTDGKBCH00GW0X476W5TVDDD",
parent_id=parent_context.id,
)
hass_.bus.async_fire(
EVENT_CALL_SERVICE,
{
ATTR_DOMAIN: "climate",
ATTR_SERVICE: "set_hvac_mode",
"service_data": {ATTR_ENTITY_ID: "climate.living_room"},
},
context=parent_context,
)
hass_.bus.async_fire(
EVENT_CALL_SERVICE,
{
ATTR_DOMAIN: "homeassistant",
ATTR_SERVICE: "turn_on",
"service_data": {ATTR_ENTITY_ID: "switch.heater"},
},
context=child_context,
)
hass_.states.async_set(
"climate.living_room",
"heat",
{ATTR_FRIENDLY_NAME: "Living Room Thermostat"},
context=parent_context,
)
hass_.states.async_set(
"switch.heater",
STATE_ON,
{ATTR_FRIENDLY_NAME: "Heater"},
context=child_context,
)
return parent_context, child_context
def assert_thermostat_context_chain_events(
events: list[dict[str, Any]], parent_context: Context
) -> None:
"""Assert the logbook events for a thermostat context chain.
Verifies that climate and switch state changes have correct
state, user attribution, and service call context.
"""
climate_entries = [e for e in events if e.get("entity_id") == "climate.living_room"]
assert len(climate_entries) == 1
assert climate_entries[0]["state"] == "heat"
assert climate_entries[0]["context_user_id"] == parent_context.user_id
assert climate_entries[0]["context_event_type"] == EVENT_CALL_SERVICE
assert climate_entries[0]["context_domain"] == "climate"
assert climate_entries[0]["context_service"] == "set_hvac_mode"
heater_entries = [e for e in events if e.get("entity_id") == "switch.heater"]
assert len(heater_entries) == 1
assert heater_entries[0]["state"] == "on"
assert heater_entries[0]["context_user_id"] == parent_context.user_id
assert heater_entries[0]["context_event_type"] == EVENT_CALL_SERVICE
assert heater_entries[0]["context_domain"] == "homeassistant"
assert heater_entries[0]["context_service"] == "turn_on"
def mock_humanify(hass_, rows):
"""Wrap humanify with mocked logbook objects."""
entity_name_cache = processor.EntityNameCache(hass_)
@@ -89,5 +189,6 @@ def mock_humanify(hass_, rows):
ent_reg,
logbook_run,
context_augmenter,
None,
),
)

View File

@@ -20,6 +20,7 @@ from homeassistant.components.logbook.models import EventAsRow, LazyEventPartial
from homeassistant.components.logbook.processor import EventProcessor
from homeassistant.components.logbook.queries.common import PSEUDO_EVENT_STATE_CHANGED
from homeassistant.components.recorder import Recorder
from homeassistant.components.recorder.models import ulid_to_bytes_or_none
from homeassistant.components.script import EVENT_SCRIPT_STARTED
from homeassistant.components.sensor import SensorStateClass
from homeassistant.const import (
@@ -41,13 +42,19 @@ from homeassistant.const import (
STATE_OFF,
STATE_ON,
)
from homeassistant.core import Event, HomeAssistant
from homeassistant.core import Context, Event, HomeAssistant
from homeassistant.helpers import device_registry as dr, entity_registry as er
from homeassistant.helpers.entityfilter import CONF_ENTITY_GLOBS
from homeassistant.setup import async_setup_component
from homeassistant.util import dt as dt_util
from .common import MockRow, mock_humanify
from .common import (
MockRow,
assert_thermostat_context_chain_events,
mock_humanify,
setup_thermostat_context_test_entities,
simulate_thermostat_context_chain,
)
from tests.common import MockConfigEntry, async_capture_events, mock_platform
from tests.components.recorder.common import (
@@ -3002,3 +3009,346 @@ async def test_logbook_with_non_iterable_entity_filter(hass: HomeAssistant) -> N
},
)
await hass.async_block_till_done()
@pytest.mark.usefixtures("recorder_mock")
async def test_logbook_user_id_from_parent_context(
hass: HomeAssistant, hass_client: ClientSessionGenerator
) -> None:
"""Test user attribution is inherited through the full context chain.
Simulates the generic_thermostat pattern:
1. User calls set_hvac_mode → parent context (has user_id)
- Climate state changes off → heat (parent context)
2. Thermostat calls homeassistant.turn_on → child context (no user_id)
- SERVICE_CALL event fired (child context)
3. Switch state changes off → on (child context)
All entries should have user_id attributed, either directly (step 1)
or inherited from the parent context (steps 2-3).
"""
await asyncio.gather(
*[
async_setup_component(hass, comp, {})
for comp in ("homeassistant", "logbook")
]
)
await async_recorder_block_till_done(hass)
setup_thermostat_context_test_entities(hass)
await hass.async_block_till_done()
parent_context, _ = simulate_thermostat_context_chain(hass)
await hass.async_block_till_done()
await async_wait_recording_done(hass)
client = await hass_client()
start = dt_util.utcnow().date()
start_date = datetime(start.year, start.month, start.day, tzinfo=dt_util.UTC)
end_time = start_date + timedelta(hours=24)
response = await client.get(
f"/api/logbook/{start_date.isoformat()}",
params={"end_time": end_time.isoformat()},
)
assert response.status == HTTPStatus.OK
json_dict = await response.json()
assert_thermostat_context_chain_events(json_dict, parent_context)
@pytest.mark.usefixtures("recorder_mock")
async def test_logbook_user_id_from_parent_context_state_changes_only(
hass: HomeAssistant, hass_client: ClientSessionGenerator
) -> None:
"""Test user attribution is inherited when only state changes are present.
Same chain as the full test but without the EVENT_CALL_SERVICE event.
This exercises the code path where context_lookup resolves the child
context to the state change row itself, and augment walks up to the
parent state change.
"""
await asyncio.gather(
*[
async_setup_component(hass, comp, {})
for comp in ("homeassistant", "logbook")
]
)
await async_recorder_block_till_done(hass)
# Set initial states so that subsequent changes are real state transitions
hass.states.async_set(
"climate.living_room",
"off",
{ATTR_FRIENDLY_NAME: "Living Room Thermostat"},
)
hass.states.async_set("switch.heater", STATE_OFF)
await hass.async_block_till_done()
# Parent context with user_id
parent_context = ha.Context(
id="01GTDGKBCH00GW0X476W5TVAAA",
user_id="b400facee45711eaa9308bfd3d19e474",
)
# Climate state change with the parent context
hass.states.async_set(
"climate.living_room",
"heat",
{ATTR_FRIENDLY_NAME: "Living Room Thermostat"},
context=parent_context,
)
await hass.async_block_till_done()
# Child context WITHOUT user_id, no service call event
child_context = ha.Context(
id="01GTDGKBCH00GW0X476W5TVDDD",
parent_id="01GTDGKBCH00GW0X476W5TVAAA",
)
# Switch state change with the child context
hass.states.async_set(
"switch.heater",
STATE_ON,
{ATTR_FRIENDLY_NAME: "Heater"},
context=child_context,
)
await hass.async_block_till_done()
# Climate updates again in response to switch state change
hass.states.async_set(
"climate.living_room",
"heat",
{ATTR_FRIENDLY_NAME: "Living Room Thermostat"},
context=child_context,
)
await hass.async_block_till_done()
await async_wait_recording_done(hass)
client = await hass_client()
start = dt_util.utcnow().date()
start_date = datetime(start.year, start.month, start.day, tzinfo=dt_util.UTC)
end_time = start_date + timedelta(hours=24)
response = await client.get(
f"/api/logbook/{start_date.isoformat()}",
params={"end_time": end_time.isoformat()},
)
assert response.status == HTTPStatus.OK
json_dict = await response.json()
# Switch state change should be attributed to the climate entity
# and inherit user_id from the parent context
heater_entries = [
entry for entry in json_dict if entry.get("entity_id") == "switch.heater"
]
assert len(heater_entries) == 1
heater_entry = heater_entries[0]
assert heater_entry["context_entity_id"] == "climate.living_room"
assert heater_entry["context_entity_id_name"] == "Living Room Thermostat"
assert heater_entry["context_state"] == "heat"
assert heater_entry["context_user_id"] == "b400facee45711eaa9308bfd3d19e474"
async def test_context_user_ids_lru_eviction(
hass: HomeAssistant,
) -> None:
"""Test that the parent context user-id cache is bounded by LRU eviction.
The cache must keep memory bounded under sustained load. New entries
arriving after the cap evict the least recently used entries. An
early parent context whose entry has been evicted should no longer
contribute its user_id to a later child state change.
"""
user_id = "b400facee45711eaa9308bfd3d19e474"
early_parent_context = ha.Context(
id="01GTDGKBCH00GW0X476W5TVAAA",
user_id=user_id,
)
child_context = ha.Context(
id="01GTDGKBCH00GW0X476W5TVDDD",
parent_id=early_parent_context.id,
)
logbook_run = logbook.processor.LogbookRun(
context_lookup={None: None},
external_events={},
event_cache=logbook.processor.EventCache({}),
entity_name_cache=logbook.processor.EntityNameCache(hass),
include_entity_name=True,
timestamp=False,
memoize_new_contexts=False,
for_live_stream=True,
)
context_augmenter = logbook.processor.ContextAugmenter(logbook_run)
ent_reg = er.async_get(hass)
processor = logbook.processor.EventProcessor.__new__(
logbook.processor.EventProcessor
)
processor.hass = hass
processor.ent_reg = ent_reg
processor.logbook_run = logbook_run
processor.context_augmenter = context_augmenter
hass.states.async_set("switch.heater", STATE_OFF)
await hass.async_block_till_done()
# Seed: the early parent SERVICE_CALL event populates the cache.
parent_row = MockRow(
EVENT_CALL_SERVICE,
{
ATTR_DOMAIN: "climate",
ATTR_SERVICE: "set_hvac_mode",
"service_data": {ATTR_ENTITY_ID: "climate.living_room"},
},
context=early_parent_context,
)
parent_row.context_only = True
parent_row.icon = None
processor.humanify([parent_row])
assert (
ulid_to_bytes_or_none(early_parent_context.id) in logbook_run.context_user_ids
)
# Flood the cache with MAX+1 unrelated parent contexts so the early
# parent is evicted from the front of the LRU.
filler_rows = []
for index in range(logbook.processor.MAX_CONTEXT_USER_IDS_CACHE + 1):
filler_context = ha.Context(
user_id=f"ffffffff{index:024x}"[:32],
)
filler_row = MockRow(
EVENT_CALL_SERVICE,
{
ATTR_DOMAIN: "test",
ATTR_SERVICE: "noop",
"service_data": {},
},
context=filler_context,
)
filler_row.context_only = True
filler_row.icon = None
filler_rows.append(filler_row)
processor.humanify(filler_rows)
assert (
len(logbook_run.context_user_ids)
== logbook.processor.MAX_CONTEXT_USER_IDS_CACHE
)
assert (
ulid_to_bytes_or_none(early_parent_context.id)
not in logbook_run.context_user_ids
)
# The child state change can no longer inherit the early parent's user_id
# because that entry was evicted.
child_row = MockRow(
PSEUDO_EVENT_STATE_CHANGED,
context=child_context,
)
child_row.state = STATE_ON
child_row.entity_id = "switch.heater"
child_row.icon = None
results = processor.humanify([child_row])
heater_entries = [e for e in results if e.get("entity_id") == "switch.heater"]
assert len(heater_entries) == 1
assert "context_user_id" not in heater_entries[0]
async def test_parent_user_attribution_does_not_use_origin_event_fallback(
hass: HomeAssistant,
) -> None:
"""Test that parent context lookup doesn't fall back to origin_event.
ContextAugmenter.get_context() has a fallback: when a context_id isn't in
context_lookup, it returns async_event_to_row(row.context.origin_event).
This fallback uses the *child row's* origin event, not the parent's,
so it can attribute the wrong user_id to a child context.
In practice this scenario is unlikely — child contexts don't carry a
user_id, so the origin_event fallback would return None for user_id
anyway. We guard against it nevertheless to ensure the lookup is
semantically correct: the parent context should only be resolved via
context_lookup, never via an unrelated fallback path.
Scenario:
- A user_id is set directly on child_context (not realistic, but
exercises the fallback path).
- Creating an Event with that context sets context.origin_event,
which carries the user_id.
- A state change for switch.heater uses that same child_context.
- The parent context is NOT in context_lookup (simulating live stream).
- The parent user_id should NOT be resolved via the origin_event fallback.
"""
wrong_user_id = "aaaaaaaaaaa711eaa9308bfd3d19e474"
parent_context = Context(id="01GTDGKBCH00GW0X476W5TVAAA")
# Child context whose origin_event will carry wrong_user_id
child_context = Context(
id="01GTDGKBCH00GW0X476W5TVDDD",
parent_id=parent_context.id,
user_id=wrong_user_id,
)
# Creating an Event sets context.origin_event = self, which carries
# wrong_user_id via child_context.user_id.
Event(EVENT_CALL_SERVICE, {}, context=child_context)
assert child_context.origin_event is not None
hass.states.async_set("switch.heater", STATE_OFF)
await hass.async_block_till_done()
logbook_run = logbook.processor.LogbookRun(
context_lookup={None: None},
external_events={},
event_cache=logbook.processor.EventCache({}),
entity_name_cache=logbook.processor.EntityNameCache(hass),
include_entity_name=True,
timestamp=False,
memoize_new_contexts=False,
)
context_augmenter = logbook.processor.ContextAugmenter(logbook_run)
ent_reg = er.async_get(hass)
processor = logbook.processor.EventProcessor.__new__(
logbook.processor.EventProcessor
)
processor.hass = hass
processor.ent_reg = ent_reg
processor.logbook_run = logbook_run
processor.context_augmenter = context_augmenter
# Build a child state-change EventAsRow with the child_context.
# The row itself has no user_id (context_user_id_bin=None) but
# the child_context.origin_event carries wrong_user_id.
child_row = EventAsRow(
row_id=1,
event_type=PSEUDO_EVENT_STATE_CHANGED,
event_data=None,
time_fired_ts=dt_util.utcnow().timestamp(),
context_id_bin=ulid_to_bytes_or_none(child_context.id),
context_user_id_bin=None,
context_parent_id_bin=ulid_to_bytes_or_none(child_context.parent_id),
state=STATE_ON,
entity_id="switch.heater",
icon=None,
context_only=False,
data={},
context=child_context,
)
results = processor.humanify([child_row])
heater_entries = [e for e in results if e.get("entity_id") == "switch.heater"]
assert len(heater_entries) == 1
# The parent context is unknown — no user should be attributed.
# If get_context's origin_event fallback is used, wrong_user_id leaks in.
assert "context_user_id" not in heater_entries[0]

View File

@@ -24,11 +24,13 @@ from homeassistant.const import (
ATTR_ENTITY_ID,
ATTR_FRIENDLY_NAME,
ATTR_NAME,
ATTR_SERVICE,
ATTR_UNIT_OF_MEASUREMENT,
CONF_DOMAINS,
CONF_ENTITIES,
CONF_EXCLUDE,
CONF_INCLUDE,
EVENT_CALL_SERVICE,
EVENT_HOMEASSISTANT_FINAL_WRITE,
EVENT_HOMEASSISTANT_START,
STATE_OFF,
@@ -41,6 +43,12 @@ from homeassistant.helpers.event import async_track_state_change_event
from homeassistant.setup import async_setup_component
from homeassistant.util import dt as dt_util
from .common import (
assert_thermostat_context_chain_events,
setup_thermostat_context_test_entities,
simulate_thermostat_context_chain,
)
from tests.common import MockConfigEntry, async_fire_time_changed
from tests.components.recorder.common import (
async_block_recorder,
@@ -3202,3 +3210,420 @@ async def test_consistent_stream_and_recorder_filtering(
results = response["result"]
assert len(results) == result_count
@pytest.mark.usefixtures("recorder_mock")
async def test_logbook_stream_user_id_from_parent_context(
hass: HomeAssistant, hass_ws_client: WebSocketGenerator
) -> None:
"""Test user attribution from parent context in live event stream.
Simulates the generic_thermostat pattern where a child context
(no user_id) is created for the heater service call, while the
parent context (from the user's set_hvac_mode call) has the user_id.
The live stream uses memoize_new_contexts=False, so context_lookup
is empty. User_id must be resolved via the context_user_ids map.
"""
await asyncio.gather(
*[
async_setup_component(hass, comp, {})
for comp in ("homeassistant", "logbook")
]
)
await hass.async_block_till_done()
setup_thermostat_context_test_entities(hass)
await hass.async_block_till_done()
await async_wait_recording_done(hass)
now = dt_util.utcnow()
websocket_client = await hass_ws_client()
await websocket_client.send_json(
{"id": 7, "type": "logbook/event_stream", "start_time": now.isoformat()}
)
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == TYPE_RESULT
assert msg["success"]
# Receive historical events (partial) and sync message
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["event"]["partial"] is True
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["event"]["events"] == []
# Simulate the full generic_thermostat chain as live events
parent_context, _ = simulate_thermostat_context_chain(hass)
await hass.async_block_till_done()
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert_thermostat_context_chain_events(msg["event"]["events"], parent_context)
@pytest.mark.usefixtures("recorder_mock")
async def test_logbook_stream_user_id_from_parent_context_filtered(
hass: HomeAssistant, hass_ws_client: WebSocketGenerator
) -> None:
"""Test user attribution from parent context in filtered live event stream.
Same scenario as test_logbook_stream_user_id_from_parent_context but
with entity_ids in the subscription, matching what the frontend does.
This exercises the filtered event subscription path where
EVENT_CALL_SERVICE must be explicitly included and matched via
service_data.
"""
await asyncio.gather(
*[
async_setup_component(hass, comp, {})
for comp in ("homeassistant", "logbook")
]
)
await hass.async_block_till_done()
setup_thermostat_context_test_entities(hass)
await hass.async_block_till_done()
await async_wait_recording_done(hass)
now = dt_util.utcnow()
websocket_client = await hass_ws_client()
# Subscribe with entity_ids, matching what the frontend logbook card does
end_time = now + timedelta(hours=3)
await websocket_client.send_json(
{
"id": 7,
"type": "logbook/event_stream",
"start_time": now.isoformat(),
"end_time": end_time.isoformat(),
"entity_ids": ["climate.living_room", "switch.heater"],
}
)
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == TYPE_RESULT
assert msg["success"]
# Receive historical events (partial) and sync message
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["event"]["partial"] is True
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["event"]["events"] == []
# Simulate the full chain as live events
parent_context, _ = simulate_thermostat_context_chain(hass)
await hass.async_block_till_done()
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
assert_thermostat_context_chain_events(msg["event"]["events"], parent_context)
@pytest.mark.usefixtures("recorder_mock")
async def test_logbook_stream_parent_context_bridges_historical_to_live(
hass: HomeAssistant, hass_ws_client: WebSocketGenerator
) -> None:
"""Test parent-context user attribution bridges the historical→live switch.
Scenario: a user fires a service call (parent context) that triggers a
child state change BEFORE the websocket subscription is opened. The
parent's call_service event lives only in the historical window. After
the historical backfill completes and the stream switches to live, a
NEW state change reusing the same child context (whose parent_id points
back at the historical parent) fires. The live event must inherit the
user_id from the historical parent — which can only happen if the
historical pre-pass populated the persistent LRU cache so the live
consumer can find it.
"""
await asyncio.gather(
*[
async_setup_component(hass, comp, {})
for comp in ("homeassistant", "logbook")
]
)
await hass.async_block_till_done()
setup_thermostat_context_test_entities(hass)
await hass.async_block_till_done()
user_id = "b400facee45711eaa9308bfd3d19e474"
parent_context = core.Context(
id="01GTDGKBCH00GW0X476W5TVAAA",
user_id=user_id,
)
child_context = core.Context(
id="01GTDGKBCH00GW0X476W5TVDDD",
parent_id=parent_context.id,
)
# Fire the parent service call and the first child state change BEFORE
# the websocket subscription. These will live in the historical window.
start_time = dt_util.utcnow()
hass.bus.async_fire(
EVENT_CALL_SERVICE,
{
ATTR_DOMAIN: "climate",
ATTR_SERVICE: "set_hvac_mode",
"service_data": {ATTR_ENTITY_ID: "climate.living_room"},
},
context=parent_context,
)
hass.bus.async_fire(
EVENT_CALL_SERVICE,
{
ATTR_DOMAIN: "homeassistant",
ATTR_SERVICE: "turn_on",
"service_data": {ATTR_ENTITY_ID: "switch.heater"},
},
context=child_context,
)
hass.states.async_set(
"switch.heater",
STATE_ON,
{ATTR_FRIENDLY_NAME: "Heater"},
context=child_context,
)
await async_wait_recording_done(hass)
# Open a filtered subscription. The filtered query path excludes the
# parent's set_hvac_mode call_service from the historical row stream
# because its event_data references climate.living_room, not
# switch.heater. The pre-pass must fetch the parent and populate the
# persistent LRU so the upcoming live event can resolve attribution.
websocket_client = await hass_ws_client()
await websocket_client.send_json(
{
"id": 7,
"type": "logbook/event_stream",
"start_time": start_time.isoformat(),
"entity_ids": ["switch.heater"],
}
)
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == TYPE_RESULT
assert msg["success"]
# Drain the historical backfill messages.
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["event"]["partial"] is True
historical_events = msg["event"]["events"]
historical_heater = [
e for e in historical_events if e.get("entity_id") == "switch.heater"
]
assert len(historical_heater) == 1
assert historical_heater[0]["context_user_id"] == user_id
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["event"]["events"] == []
# Stream is now live. Fire a NEW switch.heater state change reusing
# child_context — its parent_id still points at the historical parent.
# The live consumer must resolve the user_id via the persistent LRU
# populated during the historical pre-pass.
hass.states.async_set(
"switch.heater",
STATE_OFF,
{ATTR_FRIENDLY_NAME: "Heater"},
context=child_context,
)
await hass.async_block_till_done()
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
live_heater = [
e for e in msg["event"]["events"] if e.get("entity_id") == "switch.heater"
]
assert len(live_heater) == 1
assert live_heater[0]["state"] == "off"
assert live_heater[0]["context_user_id"] == user_id
@pytest.mark.usefixtures("recorder_mock")
async def test_logbook_get_events_user_id_from_parent_context(
hass: HomeAssistant, hass_ws_client: WebSocketGenerator
) -> None:
"""Test user attribution from parent context in unfiltered historical logbook.
Uses logbook/get_events without entity_ids, which triggers the
unfiltered SQL query path.
"""
await asyncio.gather(
*[
async_setup_component(hass, comp, {})
for comp in ("homeassistant", "logbook")
]
)
await hass.async_block_till_done()
setup_thermostat_context_test_entities(hass)
await hass.async_block_till_done()
now = dt_util.utcnow()
parent_context, _ = simulate_thermostat_context_chain(hass)
await hass.async_block_till_done()
await async_wait_recording_done(hass)
websocket_client = await hass_ws_client()
await websocket_client.send_json(
{
"id": 1,
"type": "logbook/get_events",
"start_time": now.isoformat(),
}
)
response = await websocket_client.receive_json()
assert response["success"]
assert_thermostat_context_chain_events(response["result"], parent_context)
@pytest.mark.usefixtures("recorder_mock")
async def test_logbook_get_events_user_id_from_parent_context_filtered(
hass: HomeAssistant, hass_ws_client: WebSocketGenerator
) -> None:
"""Test user attribution from parent context in historical logbook with entity filter.
Uses logbook/get_events with entity_ids, which triggers the filtered
SQL query path. The query must also fetch parent context rows so that
user_id can be inherited from the parent context.
"""
await asyncio.gather(
*[
async_setup_component(hass, comp, {})
for comp in ("homeassistant", "logbook")
]
)
await hass.async_block_till_done()
setup_thermostat_context_test_entities(hass)
await hass.async_block_till_done()
now = dt_util.utcnow()
parent_context, _ = simulate_thermostat_context_chain(hass)
await hass.async_block_till_done()
await async_wait_recording_done(hass)
websocket_client = await hass_ws_client()
await websocket_client.send_json(
{
"id": 1,
"type": "logbook/get_events",
"start_time": now.isoformat(),
"entity_ids": ["climate.living_room", "switch.heater"],
}
)
response = await websocket_client.receive_json()
assert response["success"]
assert_thermostat_context_chain_events(response["result"], parent_context)
@pytest.mark.usefixtures("recorder_mock")
async def test_logbook_stream_live_parent_service_call_only(
hass: HomeAssistant, hass_ws_client: WebSocketGenerator
) -> None:
"""Test user attribution when parent context only appears on a service call.
In the thermostat pattern, the parent context also appears on a state
change for climate.living_room. This test covers the case where the
parent context ONLY fires a call_service event (no state change with
the parent context for any subscribed entity). The live consumer must
still resolve the child's user_id from the parent's call_service event.
This fails if EVENT_CALL_SERVICE is not subscribed to in the live stream.
"""
await asyncio.gather(
*[
async_setup_component(hass, comp, {})
for comp in ("homeassistant", "logbook")
]
)
await hass.async_block_till_done()
hass.states.async_set("switch.heater", STATE_OFF)
await hass.async_block_till_done()
await async_wait_recording_done(hass)
now = dt_util.utcnow()
websocket_client = await hass_ws_client()
end_time = now + timedelta(hours=3)
await websocket_client.send_json(
{
"id": 7,
"type": "logbook/event_stream",
"start_time": now.isoformat(),
"end_time": end_time.isoformat(),
"entity_ids": ["switch.heater"],
}
)
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == TYPE_RESULT
assert msg["success"]
# Drain historical backfill
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["event"]["partial"] is True
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["event"]["events"] == []
# Stream is now live. Fire a parent service call (no state change with
# the parent context) followed by a child state change.
user_id = "b400facee45711eaa9308bfd3d19e474"
parent_context = core.Context(
id="01GTDGKBCH00GW0X476W5TVAAA",
user_id=user_id,
)
child_context = core.Context(
id="01GTDGKBCH00GW0X476W5TVDDD",
parent_id=parent_context.id,
)
# Only the service call carries the parent context — no state change
# with parent_context for any subscribed entity.
hass.bus.async_fire(
EVENT_CALL_SERVICE,
{
ATTR_DOMAIN: "homeassistant",
ATTR_SERVICE: "turn_on",
"service_data": {ATTR_ENTITY_ID: "switch.heater"},
},
context=parent_context,
)
# Child state change with no user_id on its context
hass.states.async_set(
"switch.heater",
STATE_ON,
{ATTR_FRIENDLY_NAME: "Heater"},
context=child_context,
)
await hass.async_block_till_done()
msg = await asyncio.wait_for(websocket_client.receive_json(), 2)
assert msg["id"] == 7
assert msg["type"] == "event"
heater_entries = [
e for e in msg["event"]["events"] if e.get("entity_id") == "switch.heater"
]
assert len(heater_entries) == 1
assert heater_entries[0]["state"] == "on"
assert heater_entries[0]["context_user_id"] == user_id