Compare commits

...

4 Commits

Author SHA1 Message Date
Erik 3a86b8cb81 Address review comments 2026-06-15 09:26:28 +02:00
Erik 8b2cb49fdc Adjust error handling 2026-06-15 09:21:58 +02:00
Erik 795bbd5a33 Update tests 2026-06-15 09:10:15 +02:00
Erik 2a1baa9573 Queue nested firing of events 2026-06-11 15:08:56 +02:00
5 changed files with 147 additions and 24 deletions
+62 -2
View File
@@ -5,7 +5,7 @@ of entities and react to changes.
"""
import asyncio
from collections import UserDict, defaultdict
from collections import UserDict, defaultdict, deque
from collections.abc import (
Callable,
Collection,
@@ -1428,10 +1428,24 @@ def _verify_event_type_length_or_raise(event_type: EventType[_DataT] | str) -> N
raise MaxLengthExceeded(event_type, "event_type", MAX_LENGTH_EVENT_EVENT_TYPE)
# Maximum number of events event listeners may queue while a single top-level
# event is being dispatched, to guard against event listeners firing events in
# an endless loop.
_MAX_QUEUED_EVENT_DISPATCHES: Final = 10_000
class EventBus:
"""Allow the firing of and listening for events."""
__slots__ = ("_debug", "_hass", "_listeners", "_match_all_listeners")
__slots__ = (
"_debug",
"_dispatching",
"_event_queue",
"_hass",
"_listeners",
"_match_all_listeners",
"_queued_event_count",
)
def __init__(self, hass: HomeAssistant) -> None:
"""Initialize a new event bus."""
@@ -1441,6 +1455,11 @@ class EventBus:
self._match_all_listeners: list[_FilterableJobType[Any]] = []
self._listeners[MATCH_ALL] = self._match_all_listeners
self._hass = hass
self._event_queue: deque[
tuple[EventType[Any] | str, Any, EventOrigin, Context | None, float]
] = deque()
self._dispatching = False
self._queued_event_count = 0
self._async_logging_changed()
self.async_listen(EVENT_LOGGING_CHANGED, self._async_logging_changed)
@@ -1520,6 +1539,47 @@ class EventBus:
"Bus:Handling %s", _event_repr(event_type, origin, event_data)
)
if self._dispatching:
# A nested fire is queued and dispatched after the current
# dispatch. The fire time is captured now since dispatch is
# deferred.
if self._queued_event_count >= _MAX_QUEUED_EVENT_DISPATCHES:
# Guard against event listeners firing events in an endless
# loop: stop queuing further events and raise so the firing
# listener's error handling kicks in. Events already queued
# are still dispatched.
raise HomeAssistantError(
f"Event {event_type} not fired: more than"
f" {_MAX_QUEUED_EVENT_DISPATCHES} events were queued by event"
" listeners while dispatching a single event; event listeners"
" are likely firing events in an endless loop"
)
self._queued_event_count += 1
self._event_queue.append(
(event_type, event_data, origin, context, time_fired or time.time())
)
return
self._dispatching = True
self._queued_event_count = 0
try:
self._async_dispatch(event_type, event_data, origin, context, time_fired)
event_queue = self._event_queue
while event_queue:
self._async_dispatch(*event_queue.popleft())
finally:
self._dispatching = False
@callback
def _async_dispatch(
self,
event_type: EventType[_DataT] | str,
event_data: _DataT | None,
origin: EventOrigin,
context: Context | None,
time_fired: float | None,
) -> None:
"""Dispatch an event to its listeners."""
listeners = self._listeners.get(event_type, EMPTY_LIST)
if event_type not in EVENTS_EXCLUDED_FROM_MATCH_ALL:
match_all_listeners = self._match_all_listeners
+6 -2
View File
@@ -157,8 +157,12 @@ async def test_async_handle_source_entity_changes_source_entity_removed(
history_stats_config_entry.entry_id not in hass.config_entries.async_entry_ids()
)
# Check we got the expected events
assert events == ["remove"]
# Check we got the expected events: the helper entity's device link is
# cleared when the source device is removed (the helper entity belongs to
# the history_stats config entry, not the removed source config entry),
# then the helper entity is removed when the history_stats config entry is
# removed. Both registry actions are observed in fire order.
assert events == ["update", "remove"]
@pytest.mark.usefixtures("recorder_mock")
+6 -2
View File
@@ -145,8 +145,12 @@ async def test_async_handle_source_entity_changes_source_entity_removed(
# Check that the statistics config entry is removed
assert statistics_config_entry.entry_id not in hass.config_entries.async_entry_ids()
# Check we got the expected events
assert events == ["remove"]
# Check we got the expected events: the helper entity's device link is
# cleared when the source device is removed (the helper entity belongs to
# the statistics config entry, not the removed source config entry), then
# the helper entity is removed when the statistics config entry is removed.
# Both registry actions are observed in fire order.
assert events == ["update", "remove"]
async def test_async_handle_source_entity_changes_source_entity_removed_shared_device(
+6 -2
View File
@@ -177,8 +177,12 @@ async def test_async_handle_source_entity_changes_source_entity_removed(
# Check that the trend config entry is removed
assert trend_config_entry.entry_id not in hass.config_entries.async_entry_ids()
# Check we got the expected events
assert events == ["remove"]
# Check we got the expected events: the helper entity's device link is
# cleared when the source device is removed (the helper entity belongs to
# the trend config entry, not the removed source config entry), then the
# helper entity is removed when the trend config entry is removed. Both
# registry actions are observed in fire order.
assert events == ["update", "remove"]
async def test_async_handle_source_entity_changes_source_entity_removed_shared_device(
+67 -16
View File
@@ -1336,22 +1336,14 @@ async def test_eventbus_listen_once_run_immediately_coro(hass: HomeAssistant) ->
async def test_eventbus_nested_fire_dispatch_order(hass: HomeAssistant) -> None:
"""Test dispatch order when a listener fires an event synchronously.
Event dispatch is reentrant: an event fired from within a synchronous
listener is dispatched immediately, nested inside the dispatch of the
outer event.
The implementation of event listeners is such that listeners are called
in the order they were registered
As a result, the order in which a listener observes the two
events depends on its registration position relative to the listener
which fires the nested event: listeners registered before it observe
fire order, listeners registered after it observe the nested event
first.
This test documents the current behavior rather than guarantees it: a
non-reentrant (queued) dispatch would make all listeners observe fire
order.
Event dispatch is however non-reentrant: an event fired from within a
synchronous listener is queued and dispatched after the dispatch of the
outer event completes. All listeners therefore observe events in fire
order, regardless of their registration position relative to the
listener which fires the nested event.
"""
observed_before: list[str] = []
observed_after: list[str] = []
@@ -1378,15 +1370,74 @@ async def test_eventbus_nested_fire_dispatch_order(hass: HomeAssistant) -> None:
hass.bus.async_fire("test_outer")
# Registered before the nesting listener: observes fire order.
# All listeners observe fire order, regardless of registration position
# relative to the nesting listener.
assert observed_before == ["test_outer", "test_nested"]
# Registered after the nesting listener: observes inverted order.
assert observed_after == ["test_nested", "test_outer"]
assert observed_after == ["test_outer", "test_nested"]
for unsub in unsubs:
unsub()
async def test_eventbus_nested_fire_endless_loop_guard(
hass: HomeAssistant, caplog: pytest.LogCaptureFixture
) -> None:
"""Test that event listeners firing events in an endless loop are stopped.
A listener which unconditionally fires an event it also listens to would
keep the dispatch drain loop running forever. Once the per-dispatch queue
limit is reached, the bus stops queuing further events and raises in the
firing listener; the raise is caught and logged by the per-listener error
handling.
"""
calls: list[ha.Event] = []
@ha.callback
def refire(event: ha.Event) -> None:
calls.append(event)
hass.bus.async_fire("test_loop")
unsub = hass.bus.async_listen("test_loop", refire)
with patch.object(ha, "_MAX_QUEUED_EVENT_DISPATCHES", 10):
hass.bus.async_fire("test_loop")
# The top-level dispatch plus 10 queued dispatches; the next fire is
# rejected once the queue limit is reached. The firing listener raises,
# which the per-listener error handling catches and logs.
assert len(calls) == 11
assert "Error running job" in caplog.text
assert "are likely firing events in an endless loop" in caplog.text
unsub()
# The bus remains functional after the aborted dispatch
events = async_capture_events(hass, "test_after")
hass.bus.async_fire("test_after")
assert len(events) == 1
async def test_eventbus_fire_raises_when_queue_limit_reached(
hass: HomeAssistant,
) -> None:
"""Test a nested fire raises once the per-dispatch queue limit is reached.
A fire issued while an event is being dispatched is queued, but once the
limit is reached it is rejected with an error instead of being queued.
"""
# Simulate being in the middle of dispatching with the queue limit reached
hass.bus._dispatching = True
hass.bus._queued_event_count = ha._MAX_QUEUED_EVENT_DISPATCHES
try:
with pytest.raises(HomeAssistantError, match="endless loop"):
hass.bus.async_fire("test")
# The rejected event is not queued
assert len(hass.bus._event_queue) == 0
finally:
hass.bus._dispatching = False
hass.bus._queued_event_count = 0
async def test_eventbus_unsubscribe_listener(hass: HomeAssistant) -> None:
"""Test unsubscribe listener from returned function."""
calls = []