mirror of
https://github.com/home-assistant/core.git
synced 2026-06-03 18:03:43 +02:00
sandbox_v2: delete RemoteStore; route writes via contextvar (Phase A2)
Phase A2 of plan-sandbox-context: remove the module-level `Store` rebinding now that the `current_sandbox` contextvar (A1) is the single source of truth for sandbox Store IO. Load-bearing correctness fix (surfaced by A1's STATUS): the contextvar save branch moves DOWN from `Store.async_save` to `Store._async_write_data`. `async_delay_save` and the FINAL_WRITE flush bypass `async_save` entirely — they funnel through `_async_handle_write_data` -> `_async_write_data`. While `RemoteStore` existed it overrode `_async_write_data` and masked this; deleting it would have silently routed delayed/final-write saves to the sandbox tempdir. Branching at `_async_write_data` covers async_save, async_delay_save, and FINAL_WRITE uniformly. The redundant `async_save` branch is removed. Deletions: - `hass_client/remote_store.py` (the subclass + installer) - `hass_client/tests/test_remote_store.py` (covered by the contextvar tests + the new delayed-save regression test) - the `install_remote_store` call/teardown in `SandboxRuntime.run` - the explicit `data.store` swap in `_load_restore_state` (the contextvar reaches the import-captured `Store` reference) New regression test `test_delayed_save_flushes_through_bridge` asserts `async_delay_save` + EVENT_HOMEASSISTANT_FINAL_WRITE route through the bridge. Docs (CLAUDE.md, OVERVIEW.md, FOLLOWUPS.md, architecture.html) rewritten around the contextvar. Tests: 190 core (sandbox_v2 + storage + restore_state) + 50 client all green; prek clean. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -63,8 +63,8 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
||||
"""Persist the sandbox's restore-state snapshot (Phase 9).
|
||||
|
||||
The runtime ships its ``RestoreEntity`` state in the shutdown
|
||||
reply rather than via ``RemoteStore`` (the reader task is busy
|
||||
dispatching the shutdown handler — a re-entrant store_save
|
||||
reply rather than via the sandbox store bridge (the reader task
|
||||
is busy dispatching the shutdown handler — a re-entrant store_save
|
||||
would deadlock). We route the payload through the bridge's
|
||||
store server so it lands at the same path the next run's
|
||||
warm-load reads from.
|
||||
@@ -106,8 +106,8 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
||||
"""Stop every sandbox process on HA shutdown.
|
||||
|
||||
Phase 9: ask each sandbox to unload its entries and flush
|
||||
``RestoreEntity`` state through the Phase 8 ``RemoteStore``
|
||||
before pulling the plug. ``async_stop_all`` then handles SIGTERM
|
||||
``RestoreEntity`` state through the ``current_sandbox`` store
|
||||
bridge before pulling the plug. ``async_stop_all`` then handles SIGTERM
|
||||
/ SIGKILL for any sandbox that didn't ack the graceful request
|
||||
within the grace.
|
||||
"""
|
||||
|
||||
@@ -56,9 +56,9 @@ Sandbox → Main calls:
|
||||
Main → Sandbox shutdown (Phase 9):
|
||||
|
||||
* ``sandbox_v2/shutdown`` — ask the runtime to unload its entries, dump
|
||||
``RestoreEntity`` state through the Phase 8 :class:`RemoteStore`, fire
|
||||
``EVENT_HOMEASSISTANT_FINAL_WRITE`` so any pending Stores flush, and
|
||||
exit cleanly. Response ``{"ok": True, "unloaded": int, "restored":
|
||||
``RestoreEntity`` state, fire ``EVENT_HOMEASSISTANT_FINAL_WRITE`` so any
|
||||
pending Stores flush to main via the ``current_sandbox`` store bridge,
|
||||
and exit cleanly. Response ``{"ok": True, "unloaded": int, "restored":
|
||||
int}``. The runtime sets its shutdown event right after writing the
|
||||
reply, so the subprocess exits 0 on its own — main only needs SIGTERM
|
||||
if the round-trip times out.
|
||||
|
||||
@@ -479,16 +479,6 @@ class Store[_T: Mapping[str, Any] | Sequence[Any]]:
|
||||
"data": data,
|
||||
}
|
||||
|
||||
if sandbox := current_sandbox.get():
|
||||
# A sandbox runtime routes the wrapped envelope to main; the
|
||||
# bridge owns the serialise + transport. Bypass the local-disk
|
||||
# write machinery (write lock, manager cache, final-write
|
||||
# listener) entirely.
|
||||
wrapped = self._data
|
||||
self._data = None
|
||||
await sandbox.async_store_save(self.key, wrapped)
|
||||
return
|
||||
|
||||
if self.hass.state is CoreState.stopping:
|
||||
self._async_ensure_final_write_listener()
|
||||
return
|
||||
@@ -608,6 +598,17 @@ class Store[_T: Mapping[str, Any] | Sequence[Any]]:
|
||||
_LOGGER.error("Error writing config for %s: %s", self.key, err)
|
||||
|
||||
async def _async_write_data(self, data: dict) -> None:
|
||||
if sandbox := current_sandbox.get():
|
||||
# A sandbox runtime routes the wrapped envelope to main instead
|
||||
# of writing to local disk. Branching here (rather than in
|
||||
# async_save) is load-bearing: async_save, async_delay_save, and
|
||||
# the EVENT_HOMEASSISTANT_FINAL_WRITE flush all funnel their
|
||||
# writes through _async_handle_write_data -> _async_write_data,
|
||||
# so a single branch here covers every write path uniformly. The
|
||||
# bridge owns the envelope normalisation (resolving any pending
|
||||
# data_func), orjson preserialise, and transport.
|
||||
await sandbox.async_store_save(self.key, data)
|
||||
return
|
||||
if self._serialize_in_event_loop:
|
||||
if "data_func" in data:
|
||||
data["data"] = data.pop("data_func")()
|
||||
|
||||
+13
-4
@@ -47,8 +47,8 @@ second condition), as a deliberate call relying on git history for rollback.
|
||||
|
||||
- `hass_client/` — Python client library (its own `uv` env). Hosts
|
||||
`SandboxRuntime`, `FlowRunner`, `EntryRunner`, `EntityBridge`,
|
||||
`ServiceMirror`, `EventMirror`, `RemoteStore`, and the two pytest
|
||||
plugins under `hass_client/testing/`.
|
||||
`ServiceMirror`, `EventMirror`, `ChannelSandboxBridge`, and the two
|
||||
pytest plugins under `hass_client/testing/`.
|
||||
- `docs/` — per-phase decision write-ups.
|
||||
- `run_compat.py` + `COMPAT.md` + `COMPAT.csv` — compat-lane runner
|
||||
and report (Phase 10).
|
||||
@@ -58,7 +58,7 @@ The HA Core side of the integration lives at
|
||||
|
||||
## Core HA files modified (high-review surface)
|
||||
|
||||
v2 touches three core HA files. Each is intentional, small, and was
|
||||
v2 touches four core HA files. Each is intentional, small, and was
|
||||
introduced by a specific phase — see the matching STATUS file for
|
||||
the rationale.
|
||||
|
||||
@@ -80,10 +80,19 @@ the rationale.
|
||||
- `homeassistant/auth/models.py` + `auth/__init__.py` +
|
||||
`auth/auth_store.py` + `components/websocket_api/connection.py` —
|
||||
optional `RefreshToken.scopes` + dispatcher enforcement. **Phase 7.**
|
||||
- `homeassistant/helpers/sandbox_context.py` (NEW) +
|
||||
`homeassistant/helpers/storage.py` — the `current_sandbox`
|
||||
`ContextVar` + `SandboxBridge` `Protocol`, read by `Store`'s IO
|
||||
methods (`_async_load_data`, `_async_write_data`, `async_remove`) so
|
||||
sandbox `Store` IO routes to main at call time. This **replaced** the
|
||||
Phase 8 module-level `Store` rebinding — no more monkey-patch.
|
||||
**plan-sandbox-context (Phase A1 + A2).**
|
||||
|
||||
Iron Law: do **not** monkey-patch private internals. v1's direct
|
||||
write to `EntityComponent._platforms` is the cautionary tale —
|
||||
v2 took the slightly bigger PR to add the public hook instead.
|
||||
v2 took the slightly bigger PR to add the public hook instead. The
|
||||
Phase 8 `Store` rebinding was the same smell; plan-sandbox-context
|
||||
replaced it with the declared `current_sandbox` core HA hook.
|
||||
|
||||
## Open follow-ups (not yet shipped)
|
||||
|
||||
|
||||
+37
-22
@@ -46,7 +46,7 @@ inside the sandbox.
|
||||
| Config flow | Forwarded through host integration | Runs inside the sandbox; main owns the canonical `ConfigEntry` store |
|
||||
| Auth | System-user token, full HA scope | Scoped `RefreshToken` (`{"sandbox_v2/", "auth/current_user"}`); dispatcher rejects out-of-scope calls |
|
||||
| Data sharing | Sandbox sees all of main's state | Default locked-down; opt-in state/registry/area sharing per group is a future feature ([`docs/design-share-states.md`](docs/design-share-states.md)) |
|
||||
| Store routing | None — sandbox writes to its own tempdir | `RemoteStore` proxies every `Store(...)` to main; main writes to `<config>/.storage/sandbox_v2/<group>/<key>` |
|
||||
| Store routing | None — sandbox writes to its own tempdir | The `current_sandbox` contextvar makes `Store` IO proxy to main; main writes to `<config>/.storage/sandbox_v2/<group>/<key>` |
|
||||
| Shutdown | Best-effort | Graceful `sandbox_v2/shutdown` round-trip; sandbox unloads entries + dumps `RestoreEntity` state; main persists it for next boot |
|
||||
| Custom integrations | Out of scope | First-class — they route to the `custom` group |
|
||||
|
||||
@@ -84,7 +84,7 @@ The design choices and the failure modes of v1 they fix are recorded in
|
||||
│ │
|
||||
│ SandboxRuntime │
|
||||
│ • private HomeAssistant instance │
|
||||
│ • install_remote_store(channel) — rebinds storage.Store to RemoteStore │
|
||||
│ • current_sandbox.set(bridge) — routes Store IO to main via contextvar │
|
||||
│ • FlowRunner — drives integration ConfigFlow on entry_init / step / abort │
|
||||
│ • EntryRunner — runs async_setup_entry against the sandbox's hass │
|
||||
│ • EntityBridge — pushes register_entity + state_changed to main │
|
||||
@@ -190,12 +190,15 @@ On `EVENT_HOMEASSISTANT_STOP` the integration runs:
|
||||
4. `manager.async_stop_all()` falls through to SIGTERM, then SIGKILL,
|
||||
for any sandbox that didn't ack the graceful round-trip.
|
||||
|
||||
On the next boot the runtime warm-loads `core.restore_state` via an
|
||||
explicit `RemoteStore(hass, …, encoder=JSONEncoder)` before any handler
|
||||
registers, so the first `RestoreEntity.async_get_last_state()` sees the
|
||||
previous run's state. The warm-load can't go through the storage-module
|
||||
rebinding because `restore_state.py` captures `Store` at import time —
|
||||
the explicit `RemoteStore` instance is the workaround.
|
||||
On the next boot the runtime warm-loads `core.restore_state` before any
|
||||
handler registers, so the first `RestoreEntity.async_get_last_state()`
|
||||
sees the previous run's state. It works against a vanilla `Store`: the
|
||||
runtime sets `current_sandbox` before the warm-load, and `Store`'s IO
|
||||
methods read the contextvar at call time, so the load routes to main even
|
||||
though `restore_state.py` captured the original `Store` reference at
|
||||
import. (Phase 8 needed an explicit sandbox-backed `Store` instance here
|
||||
because its module-level rebinding couldn't reach that captured
|
||||
reference; the contextvar made that workaround unnecessary.)
|
||||
|
||||
## Config-flow forwarding
|
||||
|
||||
@@ -384,20 +387,32 @@ The full decision rationale for the auth side lives in
|
||||
|
||||
## Store routing
|
||||
|
||||
`RemoteStore` ([`hass_client/remote_store.py`](hass_client/hass_client/remote_store.py))
|
||||
subclasses `homeassistant.helpers.storage.Store` and overrides the
|
||||
three IO primitives — `_async_load_data`, `_async_write_data`,
|
||||
`async_remove` — to talk to main via `sandbox_v2/store_load`,
|
||||
`sandbox_v2/store_save`, `sandbox_v2/store_remove`. `delay_save`, the
|
||||
`EVENT_HOMEASSISTANT_FINAL_WRITE` hook, and the migration loop run
|
||||
unchanged from the base class.
|
||||
`homeassistant.helpers.storage.Store` reads a `current_sandbox`
|
||||
`ContextVar` (declared in
|
||||
[`homeassistant/helpers/sandbox_context.py`](../homeassistant/helpers/sandbox_context.py))
|
||||
at IO time. When it is set, `Store._async_load_data`,
|
||||
`Store._async_write_data`, and `Store.async_remove` delegate to the
|
||||
contextvar's `SandboxBridge` instead of touching local disk. Branching at
|
||||
`_async_write_data` (not `async_save`) is deliberate: `async_save`,
|
||||
`async_delay_save`, and the `EVENT_HOMEASSISTANT_FINAL_WRITE` flush all
|
||||
funnel through `_async_handle_write_data` → `_async_write_data`, so one
|
||||
branch there covers every write path. The migration loop in
|
||||
`_async_load_data` runs unchanged regardless of whether the wrapped
|
||||
envelope came from disk or the bridge.
|
||||
|
||||
`install_remote_store(channel)` runs right after the sandbox channel
|
||||
opens and before any per-runner handler registers. It rebinds
|
||||
`homeassistant.helpers.storage.Store` to `RemoteStore` for the lifetime
|
||||
of the sandbox process, so every `Store(...)` instantiated during
|
||||
`async_setup_entry` is a `RemoteStore`. The patch is process-wide
|
||||
because one sandbox process hosts one sandbox group.
|
||||
The sandbox runtime supplies the bridge:
|
||||
`ChannelSandboxBridge` ([`hass_client/sandbox_bridge.py`](hass_client/hass_client/sandbox_bridge.py))
|
||||
implements the three `SandboxBridge` store methods over
|
||||
`sandbox_v2/store_load`, `sandbox_v2/store_save`,
|
||||
`sandbox_v2/store_remove`. `SandboxRuntime.run` does
|
||||
`current_sandbox.set(ChannelSandboxBridge(channel))` right after the
|
||||
channel opens and **before** the warm-load and any per-runner handler
|
||||
registers, so every coroutine the runtime spawns inherits it (asyncio
|
||||
copies the context at `create_task` time). One sandbox process hosts one
|
||||
sandbox group, so a single bridge per runtime is correct. This replaced
|
||||
the Phase 8 module-level `Store` rebinding — no monkey-patch, and it
|
||||
reaches helpers like `restore_state` that captured the original `Store`
|
||||
reference at import.
|
||||
|
||||
On main, each `SandboxBridge` owns a `_SandboxStoreServer` pinned to
|
||||
`<config>/.storage/sandbox_v2/<group>/`. Writes use
|
||||
@@ -522,7 +537,7 @@ deferred, and what it flagged for the next phase. For a quick map:
|
||||
| Entity bridge | [`bridge.py`](../homeassistant/components/sandbox_v2/bridge.py), [`entity/`](../homeassistant/components/sandbox_v2/entity/) | [`entry_runner.py`](hass_client/hass_client/entry_runner.py), [`entity_bridge.py`](hass_client/hass_client/entity_bridge.py) |
|
||||
| Service/event mirror | [`bridge.py`](../homeassistant/components/sandbox_v2/bridge.py) | [`service_mirror.py`](hass_client/hass_client/service_mirror.py), [`event_mirror.py`](hass_client/hass_client/event_mirror.py), [`approved_domains.py`](hass_client/hass_client/approved_domains.py) |
|
||||
| Auth scopes | [`auth.py`](../homeassistant/components/sandbox_v2/auth.py), `homeassistant/auth/models.py`, `homeassistant/components/websocket_api/connection.py` | — |
|
||||
| Store routing | [`bridge.py`](../homeassistant/components/sandbox_v2/bridge.py) (`_SandboxStoreServer`) | [`remote_store.py`](hass_client/hass_client/remote_store.py) |
|
||||
| Store routing | [`bridge.py`](../homeassistant/components/sandbox_v2/bridge.py) (`_SandboxStoreServer`), `homeassistant/helpers/sandbox_context.py`, `homeassistant/helpers/storage.py` | [`sandbox_bridge.py`](hass_client/hass_client/sandbox_bridge.py) |
|
||||
| Shutdown | [`__init__.py`](../homeassistant/components/sandbox_v2/__init__.py) (`_on_stop`), `manager.py` | [`sandbox.py`](hass_client/hass_client/sandbox.py) (`_run_graceful_shutdown`) |
|
||||
| Test infra | — | [`testing/`](hass_client/hass_client/testing/), [`run_compat.py`](run_compat.py) |
|
||||
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -35,7 +35,7 @@ dispatched task, so the reader keeps draining the wire even when the
|
||||
cap is hit. `SandboxRuntime._run_graceful_shutdown` now fires
|
||||
`EVENT_HOMEASSISTANT_FINAL_WRITE` (after setting `CoreState.final_write`
|
||||
and `await hass.async_block_till_done()`) so `delay_save` Stores flush
|
||||
through `RemoteStore` before the reply goes out.
|
||||
their pending writes to main before the reply goes out.
|
||||
|
||||
**Outcome.** 93 HA-core sandbox_v2 tests + 45 hass_client tests green
|
||||
(2 new channel tests covering reentrancy + the concurrency cap; 2 new
|
||||
@@ -253,6 +253,37 @@ v2 test updates. Reports: `COMPAT.md` + `COMPAT_FULL.md` + `BACKLOG.md`
|
||||
|
||||
---
|
||||
|
||||
## plan-sandbox-context — `current_sandbox` contextvar replaces the store rebinding
|
||||
|
||||
**Why.** Phase 8 routed sandbox `Store` IO to main by rebinding
|
||||
`homeassistant.helpers.storage.Store` at module scope — the `remote_store`
|
||||
installer swapped in a `Store` subclass for the lifetime of the process. That
|
||||
is the exact "do not monkey-patch private internals" smell the project's Iron
|
||||
Law calls out — the same shape v1 was the cautionary tale for. It also had a
|
||||
footgun: helpers that did `from .storage import Store` at import time
|
||||
(`restore_state`, the registries) captured the *original* class, so the
|
||||
rebinding couldn't reach them — `restore_state` needed an explicit per-instance
|
||||
`Store` swap as a workaround.
|
||||
|
||||
**What landed.** A declared core HA hook: `current_sandbox`, a module-level
|
||||
`ContextVar[SandboxBridge | None]` in
|
||||
`homeassistant/helpers/sandbox_context.py`, read by `Store._async_load_data`,
|
||||
`Store._async_write_data`, and `Store.async_remove` at IO time. A contextvar
|
||||
read inside the instance methods is a single source of truth no matter how
|
||||
`Store` was imported, so the `restore_state` workaround is gone. The sandbox
|
||||
runtime sets the contextvar to a `ChannelSandboxBridge` before the warm-load;
|
||||
asyncio's context copy on `create_task` propagates it to every handler. Shipped
|
||||
as **A1** (additive — contextvar branch alongside the rebinding) then **A2**
|
||||
(deleted `remote_store.py`, the installer, and the `restore_state` swap). A2's
|
||||
load-bearing detail: the save branch lives at `_async_write_data`, not
|
||||
`async_save`, so `async_delay_save` and the FINAL_WRITE flush — which bypass
|
||||
`async_save` — route to main too.
|
||||
|
||||
**Outcome.** Zero patched globals in the sandbox; `Store` routing is a declared
|
||||
hook. The "monkey-patch the storage module" tension is closed.
|
||||
|
||||
---
|
||||
|
||||
## Still open
|
||||
|
||||
These are the items that survived Phase 17 — see
|
||||
|
||||
@@ -1,210 +0,0 @@
|
||||
"""Sandbox-side :class:`RemoteStore` — routes ``Store`` IO to main.
|
||||
|
||||
Phase 8: the sandbox runtime swaps ``homeassistant.helpers.storage.Store``
|
||||
with :class:`RemoteStore` once the control channel is ready, so every
|
||||
``Store(hass, version, key, …)`` call from an integration returns a
|
||||
RemoteStore that proxies ``async_load`` / ``async_save`` / ``async_remove``
|
||||
to main. Main namespaces every key as
|
||||
``<config>/.storage/sandbox_v2/<group>/<key>`` so two sandbox processes —
|
||||
or main itself — can't read each other's data.
|
||||
|
||||
The patch installs lazily: registries that already loaded against the
|
||||
sandbox-private tempdir (``core.device_registry`` and friends) keep
|
||||
their local file backing. Integrations that instantiate ``Store`` after
|
||||
the install — i.e. during ``async_setup_entry`` — get a RemoteStore.
|
||||
|
||||
``delay_save`` semantics are unchanged: :class:`RemoteStore` overrides
|
||||
only the disk-IO primitives (:meth:`_async_load_data`,
|
||||
:meth:`_async_write_data`, :meth:`async_remove`); batching, the final-
|
||||
write hook, and the migration loop run unchanged from
|
||||
:class:`Store`.
|
||||
"""
|
||||
|
||||
from collections.abc import Callable
|
||||
from copy import deepcopy
|
||||
import inspect
|
||||
import json
|
||||
import logging
|
||||
from typing import Any, ClassVar
|
||||
|
||||
from homeassistant.exceptions import UnsupportedStorageVersionError
|
||||
from homeassistant.helpers import json as json_helper, storage as _storage
|
||||
from homeassistant.util.json import SerializationError
|
||||
|
||||
from .channel import Channel, ChannelClosedError, ChannelRemoteError
|
||||
from .protocol import MSG_STORE_LOAD, MSG_STORE_REMOVE, MSG_STORE_SAVE
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
_BaseStore = _storage.Store
|
||||
|
||||
|
||||
class RemoteStore(_BaseStore):
|
||||
"""``Store`` subclass that persists via :class:`Channel` to main.
|
||||
|
||||
Class-level :attr:`_channel` is set by :func:`install_remote_store`
|
||||
and cleared by the uninstall callable it returns. There is exactly
|
||||
one channel per sandbox runtime, so a class attribute is enough —
|
||||
every ``Store`` instance the sandbox builds shares it.
|
||||
"""
|
||||
|
||||
_channel: ClassVar[Channel | None] = None
|
||||
|
||||
@classmethod
|
||||
def _remote_channel(cls) -> Channel | None:
|
||||
return cls._channel
|
||||
|
||||
async def _async_load_data(self) -> Any:
|
||||
"""Load the wrapped payload from main, then run any migration.
|
||||
|
||||
Mirrors :meth:`Store._async_load_data` but bypasses the on-disk
|
||||
path and the file-cache manager — main is the source of truth.
|
||||
The migration block matches ``Store`` line-for-line so an
|
||||
integration that ships a ``_async_migrate_func`` keeps working.
|
||||
"""
|
||||
if self._load_empty:
|
||||
self.make_read_only()
|
||||
return None
|
||||
|
||||
if self._data is not None:
|
||||
data = self._data
|
||||
if "data_func" in data:
|
||||
data["data"] = data.pop("data_func")()
|
||||
data = deepcopy(data)
|
||||
else:
|
||||
wrapped = await self._remote_load()
|
||||
if wrapped is None:
|
||||
return None
|
||||
data = wrapped
|
||||
|
||||
if "minor_version" not in data:
|
||||
data["minor_version"] = 1
|
||||
|
||||
if (
|
||||
data["version"] == self.version
|
||||
and data["minor_version"] == self.minor_version
|
||||
):
|
||||
return data["data"]
|
||||
|
||||
if data["version"] > self._max_readable_version:
|
||||
raise UnsupportedStorageVersionError(
|
||||
self.key, data["version"], self._max_readable_version
|
||||
)
|
||||
_LOGGER.info(
|
||||
"Migrating %s storage from %s.%s to %s.%s",
|
||||
self.key,
|
||||
data["version"],
|
||||
data["minor_version"],
|
||||
self.version,
|
||||
self.minor_version,
|
||||
)
|
||||
if len(inspect.signature(self._async_migrate_func).parameters) == 2:
|
||||
stored = await self._async_migrate_func(data["version"], data["data"])
|
||||
else:
|
||||
try:
|
||||
stored = await self._async_migrate_func(
|
||||
data["version"], data["minor_version"], data["data"]
|
||||
)
|
||||
except NotImplementedError:
|
||||
if data["version"] != self.version:
|
||||
raise
|
||||
stored = data["data"]
|
||||
await self.async_save(stored)
|
||||
return stored
|
||||
|
||||
async def _remote_load(self) -> dict[str, Any] | None:
|
||||
"""Fetch the wrapped dict for ``self.key`` from main."""
|
||||
channel = self._remote_channel()
|
||||
if channel is None:
|
||||
_LOGGER.error("RemoteStore[%s]: load before install", self.key)
|
||||
return None
|
||||
try:
|
||||
wrapped = await channel.call(MSG_STORE_LOAD, {"key": self.key})
|
||||
except ChannelClosedError:
|
||||
_LOGGER.warning("RemoteStore[%s]: channel closed mid-load", self.key)
|
||||
return None
|
||||
except ChannelRemoteError as err:
|
||||
_LOGGER.warning("RemoteStore[%s] load failed: %s", self.key, err)
|
||||
return None
|
||||
if wrapped is None:
|
||||
return None
|
||||
if not isinstance(wrapped, dict):
|
||||
_LOGGER.error(
|
||||
"RemoteStore[%s]: main returned non-dict (%s)",
|
||||
self.key,
|
||||
type(wrapped).__name__,
|
||||
)
|
||||
return None
|
||||
return wrapped
|
||||
|
||||
async def _async_write_data(self, data: dict) -> None:
|
||||
"""Push the wrapped payload to main instead of writing to disk.
|
||||
|
||||
``Store`` callers may hand us HA-specific types (``Fragment`` from
|
||||
``State.json_fragment``, ``set``/``tuple``, ``datetime``, ``Path``,
|
||||
``as_dict``-shaped objects). The channel transports plain JSON, so
|
||||
we run the payload through orjson's HA-aware encoder first and
|
||||
parse the resulting bytes back to primitives before handing it
|
||||
off. Same trip ``Store.async_save`` would take on its way to disk
|
||||
— we just intercept before the bytes hit a file.
|
||||
"""
|
||||
channel = self._remote_channel()
|
||||
if channel is None:
|
||||
_LOGGER.error("RemoteStore[%s]: save before install", self.key)
|
||||
return
|
||||
if "data_func" in data:
|
||||
data["data"] = data.pop("data_func")()
|
||||
try:
|
||||
_mode, json_bytes = json_helper.prepare_save_json(data, encoder=None)
|
||||
payload = json.loads(json_bytes)
|
||||
except SerializationError:
|
||||
_LOGGER.exception("RemoteStore[%s]: payload not serialisable", self.key)
|
||||
return
|
||||
try:
|
||||
await channel.call(MSG_STORE_SAVE, {"key": self.key, "data": payload})
|
||||
except ChannelClosedError:
|
||||
_LOGGER.warning("RemoteStore[%s]: channel closed mid-save", self.key)
|
||||
except ChannelRemoteError as err:
|
||||
_LOGGER.error("RemoteStore[%s] save failed: %s", self.key, err)
|
||||
|
||||
async def async_remove(self) -> None:
|
||||
"""Mirror :meth:`Store.async_remove` but unlink on main, not locally."""
|
||||
self._manager.async_invalidate(self.key)
|
||||
self._async_cleanup_delay_listener()
|
||||
self._async_cleanup_final_write_listener()
|
||||
channel = self._remote_channel()
|
||||
if channel is None:
|
||||
return
|
||||
try:
|
||||
await channel.call(MSG_STORE_REMOVE, {"key": self.key})
|
||||
except ChannelClosedError:
|
||||
_LOGGER.warning("RemoteStore[%s]: channel closed mid-remove", self.key)
|
||||
except ChannelRemoteError as err:
|
||||
_LOGGER.warning("RemoteStore[%s] remove failed: %s", self.key, err)
|
||||
|
||||
|
||||
def install_remote_store(channel: Channel) -> Callable[[], None]:
|
||||
"""Patch ``homeassistant.helpers.storage.Store`` with :class:`RemoteStore`.
|
||||
|
||||
Returns an idempotent uninstall callable that restores the original
|
||||
``Store`` class. Tests that don't tear the patch down at fixture
|
||||
teardown will leak the patch into the next test, so always call the
|
||||
returned uninstaller.
|
||||
|
||||
The patch is process-wide on purpose — every ``Store(...)`` call
|
||||
inside the sandbox routes through ``RemoteStore``. The sandbox
|
||||
process hosts one sandbox group, so a single class-level
|
||||
:attr:`RemoteStore._channel` is correct.
|
||||
"""
|
||||
RemoteStore._channel = channel # noqa: SLF001 — own class attr
|
||||
_storage.Store = RemoteStore # type: ignore[misc]
|
||||
|
||||
def _uninstall() -> None:
|
||||
if _storage.Store is RemoteStore:
|
||||
_storage.Store = _BaseStore # type: ignore[misc]
|
||||
RemoteStore._channel = None # noqa: SLF001
|
||||
|
||||
return _uninstall
|
||||
|
||||
|
||||
__all__ = ["RemoteStore", "install_remote_store"]
|
||||
@@ -32,7 +32,6 @@ from typing import Any
|
||||
from homeassistant.const import EVENT_HOMEASSISTANT_FINAL_WRITE
|
||||
from homeassistant.core import CoreState
|
||||
from homeassistant.helpers import json as json_helper, restore_state
|
||||
from homeassistant.helpers.json import JSONEncoder
|
||||
from homeassistant.helpers.sandbox_context import current_sandbox
|
||||
|
||||
from .approved_domains import ApprovedDomains
|
||||
@@ -42,7 +41,6 @@ from .entry_runner import EntryRunner
|
||||
from .event_mirror import EventMirror
|
||||
from .flow_runner import FlowRunner
|
||||
from .protocol import MSG_SHUTDOWN
|
||||
from .remote_store import RemoteStore, install_remote_store
|
||||
from .sandbox_bridge import ChannelSandboxBridge
|
||||
from .service_mirror import ServiceMirror
|
||||
|
||||
@@ -160,7 +158,6 @@ class SandboxRuntime:
|
||||
sys.stdout.flush()
|
||||
|
||||
self._channel = await self._channel_factory()
|
||||
uninstall_remote_store: Callable[[], None] | None = None
|
||||
sandbox_token: Any = None
|
||||
if self._channel is not None:
|
||||
# Route every `Store` IO to main via `current_sandbox`. The
|
||||
@@ -185,21 +182,10 @@ class SandboxRuntime:
|
||||
"one event loop? (see plan Risk #3)"
|
||||
)
|
||||
sandbox_token = current_sandbox.set(ChannelSandboxBridge(self._channel))
|
||||
# Phase A1 keeps `install_remote_store` alongside the contextvar
|
||||
# (both paths active). The contextvar branch is the first line of
|
||||
# each `Store` IO method, so it serves the IO; the rebinding is
|
||||
# the legacy path A2 deletes once A1 bakes on dev.
|
||||
#
|
||||
# Patch `homeassistant.helpers.storage.Store` BEFORE any
|
||||
# integration imports it: every entry_setup that arrives
|
||||
# below will instantiate Stores that route to main. Registries
|
||||
# already created against the sandbox tempdir keep their
|
||||
# local backing — only post-install instantiations are routed.
|
||||
uninstall_remote_store = install_remote_store(self._channel)
|
||||
# Phase 9: start the channel reader first so the warm-load
|
||||
# round-trip can resolve, then pre-load this sandbox group's
|
||||
# restore-state cache via the freshly-installed RemoteStore.
|
||||
# The data lives on main under
|
||||
# restore-state cache. The contextvar (set above) routes the
|
||||
# load to main. The data lives on main under
|
||||
# ``.storage/sandbox_v2/<group>/core.restore_state`` and was
|
||||
# written by the previous run's shutdown handler. Bare HA —
|
||||
# no bootstrap — so we call it ourselves; any RestoreEntity
|
||||
@@ -229,8 +215,6 @@ class SandboxRuntime:
|
||||
await self._entity_bridge.async_stop()
|
||||
if self._channel is not None:
|
||||
await self._channel.close()
|
||||
if uninstall_remote_store is not None:
|
||||
uninstall_remote_store()
|
||||
if sandbox_token is not None:
|
||||
# Tidy test isolation; in prod the process exits anyway.
|
||||
current_sandbox.reset(sandbox_token)
|
||||
@@ -262,12 +246,13 @@ class SandboxRuntime:
|
||||
|
||||
Phase 12 fires ``EVENT_HOMEASSISTANT_FINAL_WRITE`` and waits for
|
||||
the bus to drain so ``Store``s with pending ``async_delay_save``
|
||||
writes flush to main via ``RemoteStore`` — the now-concurrent
|
||||
channel dispatcher means the re-entrant ``MSG_STORE_SAVE`` call
|
||||
each flush issues no longer deadlocks against this handler.
|
||||
writes flush to main via the ``current_sandbox`` bridge — the
|
||||
now-concurrent channel dispatcher means the re-entrant
|
||||
``MSG_STORE_SAVE`` call each flush issues no longer deadlocks
|
||||
against this handler.
|
||||
|
||||
Restore state is still **collected** (not flushed via
|
||||
``RemoteStore``) and returned in this reply: ``core.restore_state``
|
||||
Restore state is still **collected** (not flushed via the
|
||||
bridge) and returned in this reply: ``core.restore_state``
|
||||
is owned by the runtime's explicit warm-load / shutdown-dump path,
|
||||
not by an integration's ``Store``, so it doesn't ride the
|
||||
FINAL_WRITE flush. Shipping it back in the reply keeps the data
|
||||
@@ -299,8 +284,8 @@ class SandboxRuntime:
|
||||
|
||||
# Phase 12: fire FINAL_WRITE so ``async_delay_save``-using
|
||||
# ``Store``s flush their pending data. Concurrent channel
|
||||
# dispatcher means each ``RemoteStore`` write can re-enter the
|
||||
# channel without deadlocking against this handler.
|
||||
# dispatcher means each bridge write can re-enter the channel
|
||||
# without deadlocking against this handler.
|
||||
try:
|
||||
hass.set_state(CoreState.final_write)
|
||||
hass.bus.async_fire_internal(EVENT_HOMEASSISTANT_FINAL_WRITE)
|
||||
@@ -351,20 +336,14 @@ async def _load_restore_state(hass: Any) -> None:
|
||||
through ``async_start``, so we skip that listener and rely on
|
||||
Phase 9's shutdown handler to force the final dump.
|
||||
|
||||
``RestoreStateData`` imports ``Store`` via ``from .storage import
|
||||
Store`` at module-load time, so Phase 8's
|
||||
:func:`install_remote_store` (which rebinds
|
||||
``helpers.storage.Store``) cannot reach it. We swap the singleton's
|
||||
``store`` attribute with a :class:`RemoteStore` explicitly so the
|
||||
load — and the later shutdown dump — round-trip through main.
|
||||
No store swap is needed: ``RestoreStateData`` builds a vanilla
|
||||
``Store``, and ``Store.async_load`` reads ``current_sandbox`` at call
|
||||
time. Because the runtime set the contextvar before calling us, the
|
||||
load — and the later shutdown dump — round-trip through main no matter
|
||||
that ``restore_state.py`` captured the original ``Store`` reference at
|
||||
import time.
|
||||
"""
|
||||
data = restore_state.async_get(hass)
|
||||
data.store = RemoteStore(
|
||||
hass,
|
||||
restore_state.STORAGE_VERSION,
|
||||
restore_state.STORAGE_KEY,
|
||||
encoder=JSONEncoder,
|
||||
)
|
||||
try:
|
||||
await data.async_load()
|
||||
except Exception:
|
||||
|
||||
@@ -6,8 +6,8 @@ control channel: the three ``Store`` IO methods delegate to main via the
|
||||
namespaces every key as ``<config>/.storage/sandbox_v2/<group>/<key>`` so
|
||||
two sandbox processes — or main itself — can't read each other's data.
|
||||
|
||||
The bodies are lifted from the pre-contextvar :class:`RemoteStore` (which
|
||||
this primitive replaces): same load semantics, same orjson preserialise on
|
||||
The bodies are lifted from the pre-contextvar Phase 8 store subclass that
|
||||
this primitive replaced: same load semantics, same orjson preserialise on
|
||||
save, same channel error handling. The difference is *how* it's wired —
|
||||
``Store`` reads ``current_sandbox`` at call time instead of being rebound
|
||||
at module scope.
|
||||
|
||||
@@ -1,218 +0,0 @@
|
||||
"""Phase 8 tests for :class:`hass_client.remote_store.RemoteStore`.
|
||||
|
||||
Each test runs the patched-in ``Store`` against an in-memory channel
|
||||
pair that pretends to be main. We assert RemoteStore proxies all three
|
||||
operations correctly, runs migration when versions differ, and that the
|
||||
install/uninstall pair restores ``homeassistant.helpers.storage.Store``
|
||||
to its original value.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import tempfile
|
||||
from typing import Any
|
||||
|
||||
from hass_client.channel import Channel
|
||||
from hass_client.flow_runner import FlowRunner
|
||||
from hass_client.remote_store import RemoteStore, install_remote_store
|
||||
import pytest
|
||||
|
||||
from homeassistant.helpers import storage as _storage
|
||||
|
||||
|
||||
class _LoopbackWriter:
|
||||
def __init__(self, target: asyncio.StreamReader) -> None:
|
||||
self._target = target
|
||||
|
||||
def write(self, data: bytes) -> None:
|
||||
self._target.feed_data(data)
|
||||
|
||||
async def drain(self) -> None:
|
||||
return None
|
||||
|
||||
def close(self) -> None:
|
||||
self._target.feed_eof()
|
||||
|
||||
async def wait_closed(self) -> None:
|
||||
return None
|
||||
|
||||
|
||||
def _make_channel_pair() -> tuple[Channel, Channel]:
|
||||
reader_a = asyncio.StreamReader()
|
||||
reader_b = asyncio.StreamReader()
|
||||
return (
|
||||
Channel(reader_a, _LoopbackWriter(reader_b), name="main"), # type: ignore[arg-type]
|
||||
Channel(reader_b, _LoopbackWriter(reader_a), name="sandbox"), # type: ignore[arg-type]
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(name="channels")
|
||||
async def _channels_fixture() -> tuple[Channel, Channel]:
|
||||
main, sandbox = _make_channel_pair()
|
||||
yield main, sandbox
|
||||
await main.close()
|
||||
await sandbox.close()
|
||||
|
||||
|
||||
@pytest.fixture(name="hass_runtime")
|
||||
async def _hass_runtime_fixture():
|
||||
with tempfile.TemporaryDirectory(prefix="sandbox_v2_remote_store_") as tmp:
|
||||
flow_runner = await FlowRunner.create(config_dir=tmp)
|
||||
try:
|
||||
yield flow_runner.hass
|
||||
finally:
|
||||
await flow_runner.async_stop()
|
||||
|
||||
|
||||
@pytest.fixture(name="installed_store")
|
||||
def _installed_store_fixture(channels: tuple[Channel, Channel]):
|
||||
"""Install RemoteStore on the channel pair and tear it down afterwards."""
|
||||
_main, sandbox = channels
|
||||
uninstall = install_remote_store(sandbox)
|
||||
yield sandbox
|
||||
uninstall()
|
||||
|
||||
|
||||
async def test_install_replaces_store_symbol(channels: tuple[Channel, Channel]) -> None:
|
||||
"""``install_remote_store`` patches ``storage.Store`` and uninstall restores it."""
|
||||
_main, sandbox = channels
|
||||
original = _storage.Store
|
||||
uninstall = install_remote_store(sandbox)
|
||||
try:
|
||||
assert _storage.Store is RemoteStore
|
||||
finally:
|
||||
uninstall()
|
||||
assert _storage.Store is original
|
||||
assert RemoteStore._channel is None # noqa: SLF001
|
||||
|
||||
|
||||
async def test_save_then_load_round_trip(
|
||||
channels: tuple[Channel, Channel],
|
||||
hass_runtime: Any,
|
||||
installed_store: Channel,
|
||||
) -> None:
|
||||
"""A round-trip through a fake main returns the saved data."""
|
||||
main, _sandbox = channels
|
||||
saved: dict[str, Any] = {}
|
||||
|
||||
async def _on_save(payload: dict[str, Any]) -> dict[str, bool]:
|
||||
saved[payload["key"]] = payload["data"]
|
||||
return {"ok": True}
|
||||
|
||||
async def _on_load(payload: dict[str, Any]) -> dict[str, Any] | None:
|
||||
return saved.get(payload["key"])
|
||||
|
||||
main.register("sandbox_v2/store_save", _on_save)
|
||||
main.register("sandbox_v2/store_load", _on_load)
|
||||
main.start()
|
||||
installed_store.start()
|
||||
|
||||
store = _storage.Store(hass_runtime, 1, "phase8_demo")
|
||||
await store.async_save({"hello": "world"})
|
||||
|
||||
assert saved["phase8_demo"]["data"] == {"hello": "world"}
|
||||
assert saved["phase8_demo"]["version"] == 1
|
||||
|
||||
# Fresh Store with no in-memory data must round-trip through main.
|
||||
other = _storage.Store(hass_runtime, 1, "phase8_demo")
|
||||
loaded = await other.async_load()
|
||||
assert loaded == {"hello": "world"}
|
||||
|
||||
|
||||
async def test_load_returns_none_when_main_has_no_data(
|
||||
channels: tuple[Channel, Channel],
|
||||
hass_runtime: Any,
|
||||
installed_store: Channel,
|
||||
) -> None:
|
||||
"""A missing key reads back as ``None`` (matching ``Store`` semantics)."""
|
||||
main, _sandbox = channels
|
||||
|
||||
async def _on_load(_payload: dict[str, Any]) -> None:
|
||||
return None
|
||||
|
||||
main.register("sandbox_v2/store_load", _on_load)
|
||||
main.start()
|
||||
installed_store.start()
|
||||
|
||||
store = _storage.Store(hass_runtime, 1, "missing_key")
|
||||
assert await store.async_load() is None
|
||||
|
||||
|
||||
async def test_remove_proxies_to_main(
|
||||
channels: tuple[Channel, Channel],
|
||||
hass_runtime: Any,
|
||||
installed_store: Channel,
|
||||
) -> None:
|
||||
"""``async_remove`` fires a ``sandbox_v2/store_remove`` RPC."""
|
||||
main, _sandbox = channels
|
||||
removed: list[str] = []
|
||||
|
||||
async def _on_remove(payload: dict[str, Any]) -> dict[str, bool]:
|
||||
removed.append(payload["key"])
|
||||
return {"ok": True}
|
||||
|
||||
main.register("sandbox_v2/store_remove", _on_remove)
|
||||
main.start()
|
||||
installed_store.start()
|
||||
|
||||
store = _storage.Store(hass_runtime, 1, "drop_me")
|
||||
await store.async_remove()
|
||||
|
||||
assert removed == ["drop_me"]
|
||||
|
||||
|
||||
async def test_migration_runs_when_version_differs(
|
||||
channels: tuple[Channel, Channel],
|
||||
hass_runtime: Any,
|
||||
installed_store: Channel,
|
||||
) -> None:
|
||||
"""A wrapped v1 payload prompts the subclass's migrate_func + a write back."""
|
||||
main, _sandbox = channels
|
||||
saved: dict[str, Any] = {}
|
||||
|
||||
async def _on_save(payload: dict[str, Any]) -> dict[str, bool]:
|
||||
saved[payload["key"]] = payload["data"]
|
||||
return {"ok": True}
|
||||
|
||||
async def _on_load(_payload: dict[str, Any]) -> dict[str, Any]:
|
||||
return {
|
||||
"version": 1,
|
||||
"minor_version": 1,
|
||||
"key": "phase8_migrating",
|
||||
"data": {"shape": "old"},
|
||||
}
|
||||
|
||||
main.register("sandbox_v2/store_save", _on_save)
|
||||
main.register("sandbox_v2/store_load", _on_load)
|
||||
main.start()
|
||||
installed_store.start()
|
||||
|
||||
class _MigratingStore(_storage.Store):
|
||||
async def _async_migrate_func(
|
||||
self,
|
||||
old_major_version: int,
|
||||
old_minor_version: int,
|
||||
old_data: dict[str, Any],
|
||||
) -> dict[str, Any]:
|
||||
return {"shape": "new", "was": old_data["shape"]}
|
||||
|
||||
store = _MigratingStore(hass_runtime, 2, "phase8_migrating")
|
||||
loaded = await store.async_load()
|
||||
assert loaded == {"shape": "new", "was": "old"}
|
||||
# Migration also wrote the post-migration shape back to main.
|
||||
assert saved["phase8_migrating"]["version"] == 2
|
||||
assert saved["phase8_migrating"]["data"] == {"shape": "new", "was": "old"}
|
||||
|
||||
|
||||
async def test_load_with_channel_uninstalled_returns_none(
|
||||
channels: tuple[Channel, Channel],
|
||||
hass_runtime: Any,
|
||||
) -> None:
|
||||
"""A RemoteStore without an installed channel doesn't deadlock."""
|
||||
# Don't install — RemoteStore._channel stays None.
|
||||
main, sandbox = channels
|
||||
main.start()
|
||||
sandbox.start()
|
||||
# Manually construct a RemoteStore (the patched Store path is what the
|
||||
# install fixture exercises; this case is the diagnostic-only path).
|
||||
store = RemoteStore(hass_runtime, 1, "orphan")
|
||||
assert await store.async_load() is None
|
||||
@@ -1,17 +1,18 @@
|
||||
"""Phase A1 tests for the ``current_sandbox`` contextvar store routing.
|
||||
"""Tests for the ``current_sandbox`` contextvar store routing.
|
||||
|
||||
These exercise the new routing primitive that replaces the module-level
|
||||
``Store`` rebinding (``install_remote_store`` / :class:`RemoteStore`):
|
||||
These exercise the routing primitive that replaced the former
|
||||
module-level ``Store`` rebinding (the deleted ``remote_store`` module):
|
||||
|
||||
* The first five tests drive ``Store``'s public API through the contextvar
|
||||
branch using an in-memory ``_FakeBridge`` set on ``current_sandbox``
|
||||
directly — no channel. They map 1:1 to the plan's "Phase A1 — Tests
|
||||
added" list.
|
||||
* Most tests drive ``Store``'s public API through the contextvar branch
|
||||
using an in-memory ``_FakeBridge`` set on ``current_sandbox`` directly —
|
||||
no channel. They cover load/unwrap, missing keys, migration, the
|
||||
no-sandbox disk path, the ``restore_state`` warm-load, contextvar task
|
||||
inheritance, and (the A2 guard) the ``async_delay_save`` / FINAL_WRITE
|
||||
flush.
|
||||
* The final test exercises the concrete :class:`ChannelSandboxBridge` over
|
||||
an in-memory channel pair, covering the wire mapping of the new
|
||||
``sandbox_bridge.py`` file directly (in A1 the same mapping is still
|
||||
covered transitively by ``test_remote_store.py``; A2 keeps this one when
|
||||
it deletes that file).
|
||||
an in-memory channel pair, covering the wire mapping of
|
||||
``sandbox_bridge.py`` directly — the coverage ``test_remote_store.py``
|
||||
used to provide transitively before A2 deleted it.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
@@ -24,6 +25,7 @@ from hass_client.flow_runner import FlowRunner
|
||||
from hass_client.sandbox_bridge import ChannelSandboxBridge
|
||||
import pytest
|
||||
|
||||
from homeassistant.const import EVENT_HOMEASSISTANT_FINAL_WRITE
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers import restore_state, storage as _storage
|
||||
from homeassistant.helpers.sandbox_context import current_sandbox
|
||||
@@ -48,6 +50,11 @@ class _FakeBridge:
|
||||
return self.loaded.get(key)
|
||||
|
||||
async def async_store_save(self, key: str, data: Any) -> None:
|
||||
# Mirror ``ChannelSandboxBridge``: resolve a deferred ``data_func``
|
||||
# (handed down by ``async_delay_save``) into a concrete ``data`` key
|
||||
# before recording the envelope.
|
||||
if "data_func" in data:
|
||||
data["data"] = data.pop("data_func")()
|
||||
self.saved[key] = data
|
||||
|
||||
async def async_store_remove(self, key: str) -> None:
|
||||
@@ -183,8 +190,8 @@ async def test_restore_state_warm_load_without_workaround(
|
||||
) -> None:
|
||||
"""``RestoreStateData`` warm-load reaches the bridge with no store swap.
|
||||
|
||||
The smoking gun that the contextvar fix subsumes the
|
||||
``data.store = RemoteStore(...)`` workaround in ``sandbox.py``: a vanilla
|
||||
The smoking gun that the contextvar fix subsumes the explicit
|
||||
``data.store`` swap A2 deleted from ``sandbox.py``: a vanilla
|
||||
``RestoreStateData`` (which captured the original ``Store`` at module
|
||||
import) routes its ``async_load`` to the bridge purely because the
|
||||
contextvar is set — no explicit store replacement.
|
||||
@@ -222,6 +229,35 @@ async def test_contextvar_inherits_across_create_task(
|
||||
assert result == {"from": "task"}
|
||||
|
||||
|
||||
async def test_delayed_save_flushes_through_bridge(
|
||||
hass_runtime: HomeAssistant,
|
||||
bridge: _FakeBridge,
|
||||
) -> None:
|
||||
"""``async_delay_save`` + the FINAL_WRITE flush route through the bridge.
|
||||
|
||||
The A2 regression guard. ``async_delay_save`` and the
|
||||
EVENT_HOMEASSISTANT_FINAL_WRITE flush bypass ``async_save`` entirely —
|
||||
they funnel through ``_async_handle_write_data`` -> ``_async_write_data``.
|
||||
The contextvar branch must live at ``_async_write_data`` (not only
|
||||
``async_save``) or these writes would silently land on the sandbox's
|
||||
local disk instead of reaching main. The Phase 8 store subclass
|
||||
overrode ``_async_write_data`` and masked this; deleting it surfaced the
|
||||
gap.
|
||||
"""
|
||||
store = _storage.Store(hass_runtime, 1, "delayed")
|
||||
store.async_delay_save(lambda: {"foo": "bar"}, delay=0)
|
||||
|
||||
# The FINAL_WRITE listener (armed by async_delay_save) flushes the
|
||||
# pending envelope; whichever of it / the delay timer fires first wins
|
||||
# the write lock, the other is a no-op. Either path is _async_write_data.
|
||||
hass_runtime.bus.async_fire(EVENT_HOMEASSISTANT_FINAL_WRITE)
|
||||
await hass_runtime.async_block_till_done()
|
||||
|
||||
assert "delayed" in bridge.saved
|
||||
assert bridge.saved["delayed"]["key"] == "delayed"
|
||||
assert bridge.saved["delayed"]["data"] == {"foo": "bar"}
|
||||
|
||||
|
||||
# --- ChannelSandboxBridge wire mapping (the new sandbox_bridge.py file) ---
|
||||
|
||||
|
||||
|
||||
@@ -127,7 +127,7 @@ async def test_shutdown_returns_restore_state_payload(
|
||||
) -> None:
|
||||
"""The shutdown handler returns a JSON-safe restore-state snapshot.
|
||||
|
||||
Restore state ships back in the reply (not via ``RemoteStore``)
|
||||
Restore state ships back in the reply (not via the store bridge)
|
||||
because the channel reader task is busy dispatching the shutdown
|
||||
handler — a re-entrant ``store_save`` call would deadlock. Main is
|
||||
responsible for persisting the payload before SIGTERM.
|
||||
@@ -165,7 +165,7 @@ async def test_shutdown_fires_final_write_event(
|
||||
|
||||
Concurrent channel dispatcher means the FINAL_WRITE fire-and-drain
|
||||
inside the shutdown handler no longer deadlocks against re-entrant
|
||||
``RemoteStore`` writes triggered by listeners on the event.
|
||||
bridge writes triggered by listeners on the event.
|
||||
"""
|
||||
runtime, main_channel, _task = runtime_pair
|
||||
hass = runtime._flow_runner.hass # noqa: SLF001
|
||||
@@ -189,7 +189,7 @@ async def test_shutdown_flushes_pending_delay_save(
|
||||
runtime_pair: tuple[SandboxRuntime, Channel, asyncio.Task[int]],
|
||||
capsys: pytest.CaptureFixture[str],
|
||||
) -> None:
|
||||
"""Phase 12: ``async_delay_save`` writes flush through ``RemoteStore``.
|
||||
"""Phase 12: ``async_delay_save`` writes flush through the store bridge.
|
||||
|
||||
Without the concurrent channel dispatcher this would deadlock: the
|
||||
Store's FINAL_WRITE listener would re-enter the same channel reader
|
||||
@@ -207,9 +207,10 @@ async def test_shutdown_flushes_pending_delay_save(
|
||||
|
||||
main_channel.register(MSG_STORE_SAVE, _on_store_save)
|
||||
|
||||
# ``install_remote_store`` already patched ``Store`` during ``run()``,
|
||||
# so resolving ``Store`` via the module attribute returns the
|
||||
# ``RemoteStore`` subclass that routes writes at our main channel.
|
||||
# ``run()`` already set ``current_sandbox`` to the channel bridge, so a
|
||||
# vanilla ``Store`` routes its writes to our main channel at IO time —
|
||||
# the FINAL_WRITE flush below funnels through ``_async_write_data``,
|
||||
# which reads the contextvar inside the shutdown handler's task context.
|
||||
store = _storage.Store(hass, 1, "phase12_test")
|
||||
store.async_delay_save(lambda: {"pending": True}, 3600)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user