Compare commits

...

2 Commits

Author SHA1 Message Date
J. Nick Koston 974b5ece19 Verify state_changed unsubscribe is invoked on overflow
Add a spy on async_track_state_change_event so the test directly
asserts the live stream's unsubscribe callable is invoked when the
queue overflows, instead of relying on the (unreliable) shared
EVENT_STATE_CHANGED bus listener count.
2026-05-21 14:21:35 -05:00
J. Nick Koston 47d21a7204 Fix flaky test_overflow_queue in history websocket tests
Comparing the full bus listener dict can fail when unrelated components
asynchronously add or remove EVENT_STATE_CHANGED listeners during the
test; capture listeners before and after via a context manager that
excludes state_changed.
2026-05-21 14:08:03 -05:00
+103 -51
View File
@@ -1,7 +1,10 @@
"""The tests the History component websocket_api."""
import asyncio
from collections.abc import Callable, Iterator
from contextlib import contextmanager
from datetime import timedelta
from typing import Any
from unittest.mock import ANY, patch
from freezegun import freeze_time
@@ -9,7 +12,12 @@ import pytest
from homeassistant.components import history
from homeassistant.components.history import websocket_api
from homeassistant.const import EVENT_HOMEASSISTANT_FINAL_WRITE, STATE_OFF, STATE_ON
from homeassistant.const import (
EVENT_HOMEASSISTANT_FINAL_WRITE,
EVENT_STATE_CHANGED,
STATE_OFF,
STATE_ON,
)
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.event import async_track_state_change_event
from homeassistant.setup import async_setup_component
@@ -35,6 +43,27 @@ def listeners_without_writes(listeners: dict[str, int]) -> dict[str, int]:
}
@contextmanager
def assert_no_listener_leak(hass: HomeAssistant) -> Iterator[None]:
"""Capture bus listeners on entry, assert no leak on exit.
EVENT_STATE_CHANGED is excluded because unrelated components can
asynchronously add or remove state_changed listeners during a test.
"""
excluded = {EVENT_HOMEASSISTANT_FINAL_WRITE, EVENT_STATE_CHANGED}
def _snapshot() -> dict[str, int]:
return {
key: value
for key, value in hass.bus.async_listeners().items()
if key not in excluded
}
before = _snapshot()
yield
assert _snapshot() == before
@pytest.mark.usefixtures("hass_history")
def test_setup() -> None:
"""Test setup method of history."""
@@ -1571,7 +1600,28 @@ async def test_overflow_queue(
"""Test overflowing the history stream queue."""
now = dt_util.utcnow()
wanted_entities = ["sensor.two", "sensor.four", "sensor.one"]
with patch.object(websocket_api, "MAX_PENDING_HISTORY_STATES", 5):
unsub_calls = 0
def spy_track_state_change_event(*args: Any, **kwargs: Any) -> Callable[[], None]:
nonlocal unsub_calls
real_unsub = async_track_state_change_event(*args, **kwargs)
def wrapped_unsub() -> None:
nonlocal unsub_calls
unsub_calls += 1
real_unsub()
return wrapped_unsub
with (
patch.object(websocket_api, "MAX_PENDING_HISTORY_STATES", 5),
patch.object(
websocket_api,
"async_track_state_change_event",
spy_track_state_change_event,
),
):
await async_setup_component(
hass,
"history",
@@ -1595,61 +1645,63 @@ async def test_overflow_queue(
await async_wait_recording_done(hass)
client = await hass_ws_client()
init_listeners = hass.bus.async_listeners()
await client.send_json(
{
"id": 1,
"type": "history/stream",
"entity_ids": wanted_entities,
"start_time": now.isoformat(),
"include_start_time_state": True,
"significant_changes_only": False,
"no_attributes": True,
"minimal_response": True,
}
)
response = await client.receive_json()
assert response["success"]
assert response["id"] == 1
assert response["type"] == "result"
with assert_no_listener_leak(hass):
await client.send_json(
{
"id": 1,
"type": "history/stream",
"entity_ids": wanted_entities,
"start_time": now.isoformat(),
"include_start_time_state": True,
"significant_changes_only": False,
"no_attributes": True,
"minimal_response": True,
}
)
response = await client.receive_json()
assert response["success"]
assert response["id"] == 1
assert response["type"] == "result"
response = await client.receive_json()
first_end_time = sensor_two_last_updated_timestamp
response = await client.receive_json()
first_end_time = sensor_two_last_updated_timestamp
assert response == {
"event": {
"end_time": pytest.approx(first_end_time),
"start_time": pytest.approx(now.timestamp()),
"states": {
"sensor.one": [
{
"lu": pytest.approx(sensor_one_last_updated_timestamp),
"s": "on",
}
],
"sensor.two": [
{
"lu": pytest.approx(sensor_two_last_updated_timestamp),
"s": "off",
}
],
assert response == {
"event": {
"end_time": pytest.approx(first_end_time),
"start_time": pytest.approx(now.timestamp()),
"states": {
"sensor.one": [
{
"lu": pytest.approx(sensor_one_last_updated_timestamp),
"s": "on",
}
],
"sensor.two": [
{
"lu": pytest.approx(sensor_two_last_updated_timestamp),
"s": "off",
}
],
},
},
},
"id": 1,
"type": "event",
}
"id": 1,
"type": "event",
}
await async_recorder_block_till_done(hass)
# Overflow the queue
for val in range(10):
hass.states.async_set("sensor.one", str(val), attributes={"any": "attr"})
hass.states.async_set("sensor.two", str(val), attributes={"any": "attr"})
await async_recorder_block_till_done(hass)
await async_recorder_block_till_done(hass)
# Overflow the queue
for val in range(10):
hass.states.async_set(
"sensor.one", str(val), attributes={"any": "attr"}
)
hass.states.async_set(
"sensor.two", str(val), attributes={"any": "attr"}
)
await async_recorder_block_till_done(hass)
assert listeners_without_writes(
hass.bus.async_listeners()
) == listeners_without_writes(init_listeners)
assert unsub_calls == 1
@pytest.mark.usefixtures("recorder_mock")