Compare commits

...

1 Commits

Author SHA1 Message Date
Erik 2a1baa9573 Queue nested firing of events 2026-06-11 15:08:56 +02:00
2 changed files with 109 additions and 18 deletions
+67 -2
View File
@@ -5,7 +5,7 @@ of entities and react to changes.
"""
import asyncio
from collections import UserDict, defaultdict
from collections import UserDict, defaultdict, deque
from collections.abc import (
Callable,
Collection,
@@ -1428,10 +1428,23 @@ def _verify_event_type_length_or_raise(event_type: EventType[_DataT] | str) -> N
raise MaxLengthExceeded(event_type, "event_type", MAX_LENGTH_EVENT_EVENT_TYPE)
# Maximum number of events fired by event listeners which will be dispatched
# from a single top-level fire, to guard against event listeners firing
# events in an endless loop.
_MAX_QUEUED_EVENT_DISPATCHES: Final = 10_000
class EventBus:
"""Allow the firing of and listening for events."""
__slots__ = ("_debug", "_hass", "_listeners", "_match_all_listeners")
__slots__ = (
"_debug",
"_dispatching",
"_fire_queue",
"_hass",
"_listeners",
"_match_all_listeners",
)
def __init__(self, hass: HomeAssistant) -> None:
"""Initialize a new event bus."""
@@ -1441,6 +1454,10 @@ class EventBus:
self._match_all_listeners: list[_FilterableJobType[Any]] = []
self._listeners[MATCH_ALL] = self._match_all_listeners
self._hass = hass
self._fire_queue: deque[
tuple[EventType[Any] | str, Any, EventOrigin, Context | None, float]
] = deque()
self._dispatching = False
self._async_logging_changed()
self.async_listen(EVENT_LOGGING_CHANGED, self._async_logging_changed)
@@ -1520,6 +1537,54 @@ class EventBus:
"Bus:Handling %s", _event_repr(event_type, origin, event_data)
)
if self._dispatching:
# Non-reentrant dispatch: an event fired from within an event
# listener is queued and dispatched after the current dispatch
# completes, so all listeners observe events in fire order. The
# fire time is captured now since dispatch is deferred.
self._fire_queue.append(
(event_type, event_data, origin, context, time_fired or time.time())
)
return
self._dispatching = True
try:
self._async_dispatch(event_type, event_data, origin, context, time_fired)
if self._fire_queue:
self._async_drain_fire_queue()
finally:
self._dispatching = False
@callback
def _async_drain_fire_queue(self) -> None:
"""Dispatch events queued by event listeners, in fire order."""
fire_queue = self._fire_queue
dispatched = 0
while fire_queue:
if dispatched >= _MAX_QUEUED_EVENT_DISPATCHES:
_LOGGER.error(
"Aborting event dispatch: %d events were fired by event"
" listeners while dispatching a single event; event"
" listeners are likely firing events in an endless loop."
" Dropping queued events: %s",
dispatched,
", ".join(sorted({str(item[0]) for item in fire_queue})),
)
fire_queue.clear()
return
self._async_dispatch(*fire_queue.popleft())
dispatched += 1
@callback
def _async_dispatch(
self,
event_type: EventType[_DataT] | str,
event_data: _DataT | None,
origin: EventOrigin,
context: Context | None,
time_fired: float | None,
) -> None:
"""Dispatch an event to its listeners."""
listeners = self._listeners.get(event_type, EMPTY_LIST)
if event_type not in EVENTS_EXCLUDED_FROM_MATCH_ALL:
match_all_listeners = self._match_all_listeners
+42 -16
View File
@@ -1336,22 +1336,14 @@ async def test_eventbus_listen_once_run_immediately_coro(hass: HomeAssistant) ->
async def test_eventbus_nested_fire_dispatch_order(hass: HomeAssistant) -> None:
"""Test dispatch order when a listener fires an event synchronously.
Event dispatch is reentrant: an event fired from within a synchronous
listener is dispatched immediately, nested inside the dispatch of the
outer event.
The implementation of event listeners is such that listeners are called
in the order they were registered
As a result, the order in which a listener observes the two
events depends on its registration position relative to the listener
which fires the nested event: listeners registered before it observe
fire order, listeners registered after it observe the nested event
first.
This test documents the current behavior rather than guarantees it: a
non-reentrant (queued) dispatch would make all listeners observe fire
order.
Event dispatch is however non-reentrant: an event fired from within a
synchronous listener is queued and dispatched after the dispatch of the
outer event completes. All listeners therefore observe events in fire
order, regardless of their registration position relative to the
listener which fires the nested event.
"""
observed_before: list[str] = []
observed_after: list[str] = []
@@ -1378,15 +1370,49 @@ async def test_eventbus_nested_fire_dispatch_order(hass: HomeAssistant) -> None:
hass.bus.async_fire("test_outer")
# Registered before the nesting listener: observes fire order.
# All listeners observe fire order, regardless of registration position
# relative to the nesting listener.
assert observed_before == ["test_outer", "test_nested"]
# Registered after the nesting listener: observes inverted order.
assert observed_after == ["test_nested", "test_outer"]
assert observed_after == ["test_outer", "test_nested"]
for unsub in unsubs:
unsub()
async def test_eventbus_nested_fire_endless_loop_guard(
hass: HomeAssistant, caplog: pytest.LogCaptureFixture
) -> None:
"""Test that event listeners firing events in an endless loop are stopped.
Without the guard, a listener which unconditionally fires an event it
also listens to would keep the dispatch drain loop running forever.
"""
calls: list[ha.Event] = []
@ha.callback
def refire(event: ha.Event) -> None:
calls.append(event)
hass.bus.async_fire("test_loop")
unsub = hass.bus.async_listen("test_loop", refire)
with patch.object(ha, "_MAX_QUEUED_EVENT_DISPATCHES", 10):
hass.bus.async_fire("test_loop")
# The top-level dispatch plus 10 queued dispatches, then the loop is
# aborted and the queued event dropped.
assert len(calls) == 11
assert "listeners are likely firing events in an endless loop" in caplog.text
assert "test_loop" in caplog.text
unsub()
# The bus remains functional after the aborted dispatch
events = async_capture_events(hass, "test_after")
hass.bus.async_fire("test_after")
assert len(events) == 1
async def test_eventbus_unsubscribe_listener(hass: HomeAssistant) -> None:
"""Test unsubscribe listener from returned function."""
calls = []