sandbox_v2: delete RemoteStore; route writes via contextvar (Phase A2)

Phase A2 of plan-sandbox-context: remove the module-level `Store`
rebinding now that the `current_sandbox` contextvar (A1) is the single
source of truth for sandbox Store IO.

Load-bearing correctness fix (surfaced by A1's STATUS): the contextvar
save branch moves DOWN from `Store.async_save` to
`Store._async_write_data`. `async_delay_save` and the FINAL_WRITE flush
bypass `async_save` entirely — they funnel through
`_async_handle_write_data` -> `_async_write_data`. While `RemoteStore`
existed it overrode `_async_write_data` and masked this; deleting it
would have silently routed delayed/final-write saves to the sandbox
tempdir. Branching at `_async_write_data` covers async_save,
async_delay_save, and FINAL_WRITE uniformly. The redundant `async_save`
branch is removed.

Deletions:
- `hass_client/remote_store.py` (the subclass + installer)
- `hass_client/tests/test_remote_store.py` (covered by the contextvar
  tests + the new delayed-save regression test)
- the `install_remote_store` call/teardown in `SandboxRuntime.run`
- the explicit `data.store` swap in `_load_restore_state` (the
  contextvar reaches the import-captured `Store` reference)

New regression test `test_delayed_save_flushes_through_bridge` asserts
`async_delay_save` + EVENT_HOMEASSISTANT_FINAL_WRITE route through the
bridge. Docs (CLAUDE.md, OVERVIEW.md, FOLLOWUPS.md, architecture.html)
rewritten around the contextvar.

Tests: 190 core (sandbox_v2 + storage + restore_state) + 50 client all
green; prek clean.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Paulus
2026-06-03 04:39:34 -04:00
parent 19adbba726
commit 4c85363668
13 changed files with 2918 additions and 530 deletions
@@ -63,8 +63,8 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Persist the sandbox's restore-state snapshot (Phase 9).
The runtime ships its ``RestoreEntity`` state in the shutdown
reply rather than via ``RemoteStore`` (the reader task is busy
dispatching the shutdown handler — a re-entrant store_save
reply rather than via the sandbox store bridge (the reader task
is busy dispatching the shutdown handler — a re-entrant store_save
would deadlock). We route the payload through the bridge's
store server so it lands at the same path the next run's
warm-load reads from.
@@ -106,8 +106,8 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Stop every sandbox process on HA shutdown.
Phase 9: ask each sandbox to unload its entries and flush
``RestoreEntity`` state through the Phase 8 ``RemoteStore``
before pulling the plug. ``async_stop_all`` then handles SIGTERM
``RestoreEntity`` state through the ``current_sandbox`` store
bridge before pulling the plug. ``async_stop_all`` then handles SIGTERM
/ SIGKILL for any sandbox that didn't ack the graceful request
within the grace.
"""
@@ -56,9 +56,9 @@ Sandbox → Main calls:
Main → Sandbox shutdown (Phase 9):
* ``sandbox_v2/shutdown`` — ask the runtime to unload its entries, dump
``RestoreEntity`` state through the Phase 8 :class:`RemoteStore`, fire
``EVENT_HOMEASSISTANT_FINAL_WRITE`` so any pending Stores flush, and
exit cleanly. Response ``{"ok": True, "unloaded": int, "restored":
``RestoreEntity`` state, fire ``EVENT_HOMEASSISTANT_FINAL_WRITE`` so any
pending Stores flush to main via the ``current_sandbox`` store bridge,
and exit cleanly. Response ``{"ok": True, "unloaded": int, "restored":
int}``. The runtime sets its shutdown event right after writing the
reply, so the subprocess exits 0 on its own — main only needs SIGTERM
if the round-trip times out.
+11 -10
View File
@@ -479,16 +479,6 @@ class Store[_T: Mapping[str, Any] | Sequence[Any]]:
"data": data,
}
if sandbox := current_sandbox.get():
# A sandbox runtime routes the wrapped envelope to main; the
# bridge owns the serialise + transport. Bypass the local-disk
# write machinery (write lock, manager cache, final-write
# listener) entirely.
wrapped = self._data
self._data = None
await sandbox.async_store_save(self.key, wrapped)
return
if self.hass.state is CoreState.stopping:
self._async_ensure_final_write_listener()
return
@@ -608,6 +598,17 @@ class Store[_T: Mapping[str, Any] | Sequence[Any]]:
_LOGGER.error("Error writing config for %s: %s", self.key, err)
async def _async_write_data(self, data: dict) -> None:
if sandbox := current_sandbox.get():
# A sandbox runtime routes the wrapped envelope to main instead
# of writing to local disk. Branching here (rather than in
# async_save) is load-bearing: async_save, async_delay_save, and
# the EVENT_HOMEASSISTANT_FINAL_WRITE flush all funnel their
# writes through _async_handle_write_data -> _async_write_data,
# so a single branch here covers every write path uniformly. The
# bridge owns the envelope normalisation (resolving any pending
# data_func), orjson preserialise, and transport.
await sandbox.async_store_save(self.key, data)
return
if self._serialize_in_event_loop:
if "data_func" in data:
data["data"] = data.pop("data_func")()
+13 -4
View File
@@ -47,8 +47,8 @@ second condition), as a deliberate call relying on git history for rollback.
- `hass_client/` — Python client library (its own `uv` env). Hosts
`SandboxRuntime`, `FlowRunner`, `EntryRunner`, `EntityBridge`,
`ServiceMirror`, `EventMirror`, `RemoteStore`, and the two pytest
plugins under `hass_client/testing/`.
`ServiceMirror`, `EventMirror`, `ChannelSandboxBridge`, and the two
pytest plugins under `hass_client/testing/`.
- `docs/` — per-phase decision write-ups.
- `run_compat.py` + `COMPAT.md` + `COMPAT.csv` — compat-lane runner
and report (Phase 10).
@@ -58,7 +58,7 @@ The HA Core side of the integration lives at
## Core HA files modified (high-review surface)
v2 touches three core HA files. Each is intentional, small, and was
v2 touches four core HA files. Each is intentional, small, and was
introduced by a specific phase — see the matching STATUS file for
the rationale.
@@ -80,10 +80,19 @@ the rationale.
- `homeassistant/auth/models.py` + `auth/__init__.py` +
`auth/auth_store.py` + `components/websocket_api/connection.py`
optional `RefreshToken.scopes` + dispatcher enforcement. **Phase 7.**
- `homeassistant/helpers/sandbox_context.py` (NEW) +
`homeassistant/helpers/storage.py` — the `current_sandbox`
`ContextVar` + `SandboxBridge` `Protocol`, read by `Store`'s IO
methods (`_async_load_data`, `_async_write_data`, `async_remove`) so
sandbox `Store` IO routes to main at call time. This **replaced** the
Phase 8 module-level `Store` rebinding — no more monkey-patch.
**plan-sandbox-context (Phase A1 + A2).**
Iron Law: do **not** monkey-patch private internals. v1's direct
write to `EntityComponent._platforms` is the cautionary tale —
v2 took the slightly bigger PR to add the public hook instead.
v2 took the slightly bigger PR to add the public hook instead. The
Phase 8 `Store` rebinding was the same smell; plan-sandbox-context
replaced it with the declared `current_sandbox` core HA hook.
## Open follow-ups (not yet shipped)
+37 -22
View File
@@ -46,7 +46,7 @@ inside the sandbox.
| Config flow | Forwarded through host integration | Runs inside the sandbox; main owns the canonical `ConfigEntry` store |
| Auth | System-user token, full HA scope | Scoped `RefreshToken` (`{"sandbox_v2/", "auth/current_user"}`); dispatcher rejects out-of-scope calls |
| Data sharing | Sandbox sees all of main's state | Default locked-down; opt-in state/registry/area sharing per group is a future feature ([`docs/design-share-states.md`](docs/design-share-states.md)) |
| Store routing | None — sandbox writes to its own tempdir | `RemoteStore` proxies every `Store(...)` to main; main writes to `<config>/.storage/sandbox_v2/<group>/<key>` |
| Store routing | None — sandbox writes to its own tempdir | The `current_sandbox` contextvar makes `Store` IO proxy to main; main writes to `<config>/.storage/sandbox_v2/<group>/<key>` |
| Shutdown | Best-effort | Graceful `sandbox_v2/shutdown` round-trip; sandbox unloads entries + dumps `RestoreEntity` state; main persists it for next boot |
| Custom integrations | Out of scope | First-class — they route to the `custom` group |
@@ -84,7 +84,7 @@ The design choices and the failure modes of v1 they fix are recorded in
│ │
│ SandboxRuntime │
│ • private HomeAssistant instance │
│ • install_remote_store(channel) — rebinds storage.Store to RemoteStore
│ • current_sandbox.set(bridge) — routes Store IO to main via contextvar
│ • FlowRunner — drives integration ConfigFlow on entry_init / step / abort │
│ • EntryRunner — runs async_setup_entry against the sandbox's hass │
│ • EntityBridge — pushes register_entity + state_changed to main │
@@ -190,12 +190,15 @@ On `EVENT_HOMEASSISTANT_STOP` the integration runs:
4. `manager.async_stop_all()` falls through to SIGTERM, then SIGKILL,
for any sandbox that didn't ack the graceful round-trip.
On the next boot the runtime warm-loads `core.restore_state` via an
explicit `RemoteStore(hass, …, encoder=JSONEncoder)` before any handler
registers, so the first `RestoreEntity.async_get_last_state()` sees the
previous run's state. The warm-load can't go through the storage-module
rebinding because `restore_state.py` captures `Store` at import time —
the explicit `RemoteStore` instance is the workaround.
On the next boot the runtime warm-loads `core.restore_state` before any
handler registers, so the first `RestoreEntity.async_get_last_state()`
sees the previous run's state. It works against a vanilla `Store`: the
runtime sets `current_sandbox` before the warm-load, and `Store`'s IO
methods read the contextvar at call time, so the load routes to main even
though `restore_state.py` captured the original `Store` reference at
import. (Phase 8 needed an explicit sandbox-backed `Store` instance here
because its module-level rebinding couldn't reach that captured
reference; the contextvar made that workaround unnecessary.)
## Config-flow forwarding
@@ -384,20 +387,32 @@ The full decision rationale for the auth side lives in
## Store routing
`RemoteStore` ([`hass_client/remote_store.py`](hass_client/hass_client/remote_store.py))
subclasses `homeassistant.helpers.storage.Store` and overrides the
three IO primitives — `_async_load_data`, `_async_write_data`,
`async_remove` — to talk to main via `sandbox_v2/store_load`,
`sandbox_v2/store_save`, `sandbox_v2/store_remove`. `delay_save`, the
`EVENT_HOMEASSISTANT_FINAL_WRITE` hook, and the migration loop run
unchanged from the base class.
`homeassistant.helpers.storage.Store` reads a `current_sandbox`
`ContextVar` (declared in
[`homeassistant/helpers/sandbox_context.py`](../homeassistant/helpers/sandbox_context.py))
at IO time. When it is set, `Store._async_load_data`,
`Store._async_write_data`, and `Store.async_remove` delegate to the
contextvar's `SandboxBridge` instead of touching local disk. Branching at
`_async_write_data` (not `async_save`) is deliberate: `async_save`,
`async_delay_save`, and the `EVENT_HOMEASSISTANT_FINAL_WRITE` flush all
funnel through `_async_handle_write_data``_async_write_data`, so one
branch there covers every write path. The migration loop in
`_async_load_data` runs unchanged regardless of whether the wrapped
envelope came from disk or the bridge.
`install_remote_store(channel)` runs right after the sandbox channel
opens and before any per-runner handler registers. It rebinds
`homeassistant.helpers.storage.Store` to `RemoteStore` for the lifetime
of the sandbox process, so every `Store(...)` instantiated during
`async_setup_entry` is a `RemoteStore`. The patch is process-wide
because one sandbox process hosts one sandbox group.
The sandbox runtime supplies the bridge:
`ChannelSandboxBridge` ([`hass_client/sandbox_bridge.py`](hass_client/hass_client/sandbox_bridge.py))
implements the three `SandboxBridge` store methods over
`sandbox_v2/store_load`, `sandbox_v2/store_save`,
`sandbox_v2/store_remove`. `SandboxRuntime.run` does
`current_sandbox.set(ChannelSandboxBridge(channel))` right after the
channel opens and **before** the warm-load and any per-runner handler
registers, so every coroutine the runtime spawns inherits it (asyncio
copies the context at `create_task` time). One sandbox process hosts one
sandbox group, so a single bridge per runtime is correct. This replaced
the Phase 8 module-level `Store` rebinding — no monkey-patch, and it
reaches helpers like `restore_state` that captured the original `Store`
reference at import.
On main, each `SandboxBridge` owns a `_SandboxStoreServer` pinned to
`<config>/.storage/sandbox_v2/<group>/`. Writes use
@@ -522,7 +537,7 @@ deferred, and what it flagged for the next phase. For a quick map:
| Entity bridge | [`bridge.py`](../homeassistant/components/sandbox_v2/bridge.py), [`entity/`](../homeassistant/components/sandbox_v2/entity/) | [`entry_runner.py`](hass_client/hass_client/entry_runner.py), [`entity_bridge.py`](hass_client/hass_client/entity_bridge.py) |
| Service/event mirror | [`bridge.py`](../homeassistant/components/sandbox_v2/bridge.py) | [`service_mirror.py`](hass_client/hass_client/service_mirror.py), [`event_mirror.py`](hass_client/hass_client/event_mirror.py), [`approved_domains.py`](hass_client/hass_client/approved_domains.py) |
| Auth scopes | [`auth.py`](../homeassistant/components/sandbox_v2/auth.py), `homeassistant/auth/models.py`, `homeassistant/components/websocket_api/connection.py` | — |
| Store routing | [`bridge.py`](../homeassistant/components/sandbox_v2/bridge.py) (`_SandboxStoreServer`) | [`remote_store.py`](hass_client/hass_client/remote_store.py) |
| Store routing | [`bridge.py`](../homeassistant/components/sandbox_v2/bridge.py) (`_SandboxStoreServer`), `homeassistant/helpers/sandbox_context.py`, `homeassistant/helpers/storage.py` | [`sandbox_bridge.py`](hass_client/hass_client/sandbox_bridge.py) |
| Shutdown | [`__init__.py`](../homeassistant/components/sandbox_v2/__init__.py) (`_on_stop`), `manager.py` | [`sandbox.py`](hass_client/hass_client/sandbox.py) (`_run_graceful_shutdown`) |
| Test infra | — | [`testing/`](hass_client/hass_client/testing/), [`run_compat.py`](run_compat.py) |
File diff suppressed because it is too large Load Diff
+32 -1
View File
@@ -35,7 +35,7 @@ dispatched task, so the reader keeps draining the wire even when the
cap is hit. `SandboxRuntime._run_graceful_shutdown` now fires
`EVENT_HOMEASSISTANT_FINAL_WRITE` (after setting `CoreState.final_write`
and `await hass.async_block_till_done()`) so `delay_save` Stores flush
through `RemoteStore` before the reply goes out.
their pending writes to main before the reply goes out.
**Outcome.** 93 HA-core sandbox_v2 tests + 45 hass_client tests green
(2 new channel tests covering reentrancy + the concurrency cap; 2 new
@@ -253,6 +253,37 @@ v2 test updates. Reports: `COMPAT.md` + `COMPAT_FULL.md` + `BACKLOG.md`
---
## plan-sandbox-context — `current_sandbox` contextvar replaces the store rebinding
**Why.** Phase 8 routed sandbox `Store` IO to main by rebinding
`homeassistant.helpers.storage.Store` at module scope — the `remote_store`
installer swapped in a `Store` subclass for the lifetime of the process. That
is the exact "do not monkey-patch private internals" smell the project's Iron
Law calls out — the same shape v1 was the cautionary tale for. It also had a
footgun: helpers that did `from .storage import Store` at import time
(`restore_state`, the registries) captured the *original* class, so the
rebinding couldn't reach them — `restore_state` needed an explicit per-instance
`Store` swap as a workaround.
**What landed.** A declared core HA hook: `current_sandbox`, a module-level
`ContextVar[SandboxBridge | None]` in
`homeassistant/helpers/sandbox_context.py`, read by `Store._async_load_data`,
`Store._async_write_data`, and `Store.async_remove` at IO time. A contextvar
read inside the instance methods is a single source of truth no matter how
`Store` was imported, so the `restore_state` workaround is gone. The sandbox
runtime sets the contextvar to a `ChannelSandboxBridge` before the warm-load;
asyncio's context copy on `create_task` propagates it to every handler. Shipped
as **A1** (additive — contextvar branch alongside the rebinding) then **A2**
(deleted `remote_store.py`, the installer, and the `restore_state` swap). A2's
load-bearing detail: the save branch lives at `_async_write_data`, not
`async_save`, so `async_delay_save` and the FINAL_WRITE flush — which bypass
`async_save` — route to main too.
**Outcome.** Zero patched globals in the sandbox; `Store` routing is a declared
hook. The "monkey-patch the storage module" tension is closed.
---
## Still open
These are the items that survived Phase 17 — see
@@ -1,210 +0,0 @@
"""Sandbox-side :class:`RemoteStore` — routes ``Store`` IO to main.
Phase 8: the sandbox runtime swaps ``homeassistant.helpers.storage.Store``
with :class:`RemoteStore` once the control channel is ready, so every
``Store(hass, version, key, …)`` call from an integration returns a
RemoteStore that proxies ``async_load`` / ``async_save`` / ``async_remove``
to main. Main namespaces every key as
``<config>/.storage/sandbox_v2/<group>/<key>`` so two sandbox processes —
or main itself — can't read each other's data.
The patch installs lazily: registries that already loaded against the
sandbox-private tempdir (``core.device_registry`` and friends) keep
their local file backing. Integrations that instantiate ``Store`` after
the install — i.e. during ``async_setup_entry`` — get a RemoteStore.
``delay_save`` semantics are unchanged: :class:`RemoteStore` overrides
only the disk-IO primitives (:meth:`_async_load_data`,
:meth:`_async_write_data`, :meth:`async_remove`); batching, the final-
write hook, and the migration loop run unchanged from
:class:`Store`.
"""
from collections.abc import Callable
from copy import deepcopy
import inspect
import json
import logging
from typing import Any, ClassVar
from homeassistant.exceptions import UnsupportedStorageVersionError
from homeassistant.helpers import json as json_helper, storage as _storage
from homeassistant.util.json import SerializationError
from .channel import Channel, ChannelClosedError, ChannelRemoteError
from .protocol import MSG_STORE_LOAD, MSG_STORE_REMOVE, MSG_STORE_SAVE
_LOGGER = logging.getLogger(__name__)
_BaseStore = _storage.Store
class RemoteStore(_BaseStore):
"""``Store`` subclass that persists via :class:`Channel` to main.
Class-level :attr:`_channel` is set by :func:`install_remote_store`
and cleared by the uninstall callable it returns. There is exactly
one channel per sandbox runtime, so a class attribute is enough —
every ``Store`` instance the sandbox builds shares it.
"""
_channel: ClassVar[Channel | None] = None
@classmethod
def _remote_channel(cls) -> Channel | None:
return cls._channel
async def _async_load_data(self) -> Any:
"""Load the wrapped payload from main, then run any migration.
Mirrors :meth:`Store._async_load_data` but bypasses the on-disk
path and the file-cache manager — main is the source of truth.
The migration block matches ``Store`` line-for-line so an
integration that ships a ``_async_migrate_func`` keeps working.
"""
if self._load_empty:
self.make_read_only()
return None
if self._data is not None:
data = self._data
if "data_func" in data:
data["data"] = data.pop("data_func")()
data = deepcopy(data)
else:
wrapped = await self._remote_load()
if wrapped is None:
return None
data = wrapped
if "minor_version" not in data:
data["minor_version"] = 1
if (
data["version"] == self.version
and data["minor_version"] == self.minor_version
):
return data["data"]
if data["version"] > self._max_readable_version:
raise UnsupportedStorageVersionError(
self.key, data["version"], self._max_readable_version
)
_LOGGER.info(
"Migrating %s storage from %s.%s to %s.%s",
self.key,
data["version"],
data["minor_version"],
self.version,
self.minor_version,
)
if len(inspect.signature(self._async_migrate_func).parameters) == 2:
stored = await self._async_migrate_func(data["version"], data["data"])
else:
try:
stored = await self._async_migrate_func(
data["version"], data["minor_version"], data["data"]
)
except NotImplementedError:
if data["version"] != self.version:
raise
stored = data["data"]
await self.async_save(stored)
return stored
async def _remote_load(self) -> dict[str, Any] | None:
"""Fetch the wrapped dict for ``self.key`` from main."""
channel = self._remote_channel()
if channel is None:
_LOGGER.error("RemoteStore[%s]: load before install", self.key)
return None
try:
wrapped = await channel.call(MSG_STORE_LOAD, {"key": self.key})
except ChannelClosedError:
_LOGGER.warning("RemoteStore[%s]: channel closed mid-load", self.key)
return None
except ChannelRemoteError as err:
_LOGGER.warning("RemoteStore[%s] load failed: %s", self.key, err)
return None
if wrapped is None:
return None
if not isinstance(wrapped, dict):
_LOGGER.error(
"RemoteStore[%s]: main returned non-dict (%s)",
self.key,
type(wrapped).__name__,
)
return None
return wrapped
async def _async_write_data(self, data: dict) -> None:
"""Push the wrapped payload to main instead of writing to disk.
``Store`` callers may hand us HA-specific types (``Fragment`` from
``State.json_fragment``, ``set``/``tuple``, ``datetime``, ``Path``,
``as_dict``-shaped objects). The channel transports plain JSON, so
we run the payload through orjson's HA-aware encoder first and
parse the resulting bytes back to primitives before handing it
off. Same trip ``Store.async_save`` would take on its way to disk
— we just intercept before the bytes hit a file.
"""
channel = self._remote_channel()
if channel is None:
_LOGGER.error("RemoteStore[%s]: save before install", self.key)
return
if "data_func" in data:
data["data"] = data.pop("data_func")()
try:
_mode, json_bytes = json_helper.prepare_save_json(data, encoder=None)
payload = json.loads(json_bytes)
except SerializationError:
_LOGGER.exception("RemoteStore[%s]: payload not serialisable", self.key)
return
try:
await channel.call(MSG_STORE_SAVE, {"key": self.key, "data": payload})
except ChannelClosedError:
_LOGGER.warning("RemoteStore[%s]: channel closed mid-save", self.key)
except ChannelRemoteError as err:
_LOGGER.error("RemoteStore[%s] save failed: %s", self.key, err)
async def async_remove(self) -> None:
"""Mirror :meth:`Store.async_remove` but unlink on main, not locally."""
self._manager.async_invalidate(self.key)
self._async_cleanup_delay_listener()
self._async_cleanup_final_write_listener()
channel = self._remote_channel()
if channel is None:
return
try:
await channel.call(MSG_STORE_REMOVE, {"key": self.key})
except ChannelClosedError:
_LOGGER.warning("RemoteStore[%s]: channel closed mid-remove", self.key)
except ChannelRemoteError as err:
_LOGGER.warning("RemoteStore[%s] remove failed: %s", self.key, err)
def install_remote_store(channel: Channel) -> Callable[[], None]:
"""Patch ``homeassistant.helpers.storage.Store`` with :class:`RemoteStore`.
Returns an idempotent uninstall callable that restores the original
``Store`` class. Tests that don't tear the patch down at fixture
teardown will leak the patch into the next test, so always call the
returned uninstaller.
The patch is process-wide on purpose — every ``Store(...)`` call
inside the sandbox routes through ``RemoteStore``. The sandbox
process hosts one sandbox group, so a single class-level
:attr:`RemoteStore._channel` is correct.
"""
RemoteStore._channel = channel # noqa: SLF001 — own class attr
_storage.Store = RemoteStore # type: ignore[misc]
def _uninstall() -> None:
if _storage.Store is RemoteStore:
_storage.Store = _BaseStore # type: ignore[misc]
RemoteStore._channel = None # noqa: SLF001
return _uninstall
__all__ = ["RemoteStore", "install_remote_store"]
+16 -37
View File
@@ -32,7 +32,6 @@ from typing import Any
from homeassistant.const import EVENT_HOMEASSISTANT_FINAL_WRITE
from homeassistant.core import CoreState
from homeassistant.helpers import json as json_helper, restore_state
from homeassistant.helpers.json import JSONEncoder
from homeassistant.helpers.sandbox_context import current_sandbox
from .approved_domains import ApprovedDomains
@@ -42,7 +41,6 @@ from .entry_runner import EntryRunner
from .event_mirror import EventMirror
from .flow_runner import FlowRunner
from .protocol import MSG_SHUTDOWN
from .remote_store import RemoteStore, install_remote_store
from .sandbox_bridge import ChannelSandboxBridge
from .service_mirror import ServiceMirror
@@ -160,7 +158,6 @@ class SandboxRuntime:
sys.stdout.flush()
self._channel = await self._channel_factory()
uninstall_remote_store: Callable[[], None] | None = None
sandbox_token: Any = None
if self._channel is not None:
# Route every `Store` IO to main via `current_sandbox`. The
@@ -185,21 +182,10 @@ class SandboxRuntime:
"one event loop? (see plan Risk #3)"
)
sandbox_token = current_sandbox.set(ChannelSandboxBridge(self._channel))
# Phase A1 keeps `install_remote_store` alongside the contextvar
# (both paths active). The contextvar branch is the first line of
# each `Store` IO method, so it serves the IO; the rebinding is
# the legacy path A2 deletes once A1 bakes on dev.
#
# Patch `homeassistant.helpers.storage.Store` BEFORE any
# integration imports it: every entry_setup that arrives
# below will instantiate Stores that route to main. Registries
# already created against the sandbox tempdir keep their
# local backing — only post-install instantiations are routed.
uninstall_remote_store = install_remote_store(self._channel)
# Phase 9: start the channel reader first so the warm-load
# round-trip can resolve, then pre-load this sandbox group's
# restore-state cache via the freshly-installed RemoteStore.
# The data lives on main under
# restore-state cache. The contextvar (set above) routes the
# load to main. The data lives on main under
# ``.storage/sandbox_v2/<group>/core.restore_state`` and was
# written by the previous run's shutdown handler. Bare HA —
# no bootstrap — so we call it ourselves; any RestoreEntity
@@ -229,8 +215,6 @@ class SandboxRuntime:
await self._entity_bridge.async_stop()
if self._channel is not None:
await self._channel.close()
if uninstall_remote_store is not None:
uninstall_remote_store()
if sandbox_token is not None:
# Tidy test isolation; in prod the process exits anyway.
current_sandbox.reset(sandbox_token)
@@ -262,12 +246,13 @@ class SandboxRuntime:
Phase 12 fires ``EVENT_HOMEASSISTANT_FINAL_WRITE`` and waits for
the bus to drain so ``Store``s with pending ``async_delay_save``
writes flush to main via ``RemoteStore`` — the now-concurrent
channel dispatcher means the re-entrant ``MSG_STORE_SAVE`` call
each flush issues no longer deadlocks against this handler.
writes flush to main via the ``current_sandbox`` bridge — the
now-concurrent channel dispatcher means the re-entrant
``MSG_STORE_SAVE`` call each flush issues no longer deadlocks
against this handler.
Restore state is still **collected** (not flushed via
``RemoteStore``) and returned in this reply: ``core.restore_state``
Restore state is still **collected** (not flushed via the
bridge) and returned in this reply: ``core.restore_state``
is owned by the runtime's explicit warm-load / shutdown-dump path,
not by an integration's ``Store``, so it doesn't ride the
FINAL_WRITE flush. Shipping it back in the reply keeps the data
@@ -299,8 +284,8 @@ class SandboxRuntime:
# Phase 12: fire FINAL_WRITE so ``async_delay_save``-using
# ``Store``s flush their pending data. Concurrent channel
# dispatcher means each ``RemoteStore`` write can re-enter the
# channel without deadlocking against this handler.
# dispatcher means each bridge write can re-enter the channel
# without deadlocking against this handler.
try:
hass.set_state(CoreState.final_write)
hass.bus.async_fire_internal(EVENT_HOMEASSISTANT_FINAL_WRITE)
@@ -351,20 +336,14 @@ async def _load_restore_state(hass: Any) -> None:
through ``async_start``, so we skip that listener and rely on
Phase 9's shutdown handler to force the final dump.
``RestoreStateData`` imports ``Store`` via ``from .storage import
Store`` at module-load time, so Phase 8's
:func:`install_remote_store` (which rebinds
``helpers.storage.Store``) cannot reach it. We swap the singleton's
``store`` attribute with a :class:`RemoteStore` explicitly so the
load — and the later shutdown dump — round-trip through main.
No store swap is needed: ``RestoreStateData`` builds a vanilla
``Store``, and ``Store.async_load`` reads ``current_sandbox`` at call
time. Because the runtime set the contextvar before calling us, the
load — and the later shutdown dump — round-trip through main no matter
that ``restore_state.py`` captured the original ``Store`` reference at
import time.
"""
data = restore_state.async_get(hass)
data.store = RemoteStore(
hass,
restore_state.STORAGE_VERSION,
restore_state.STORAGE_KEY,
encoder=JSONEncoder,
)
try:
await data.async_load()
except Exception:
@@ -6,8 +6,8 @@ control channel: the three ``Store`` IO methods delegate to main via the
namespaces every key as ``<config>/.storage/sandbox_v2/<group>/<key>`` so
two sandbox processes — or main itself — can't read each other's data.
The bodies are lifted from the pre-contextvar :class:`RemoteStore` (which
this primitive replaces): same load semantics, same orjson preserialise on
The bodies are lifted from the pre-contextvar Phase 8 store subclass that
this primitive replaced: same load semantics, same orjson preserialise on
save, same channel error handling. The difference is *how* it's wired —
``Store`` reads ``current_sandbox`` at call time instead of being rebound
at module scope.
@@ -1,218 +0,0 @@
"""Phase 8 tests for :class:`hass_client.remote_store.RemoteStore`.
Each test runs the patched-in ``Store`` against an in-memory channel
pair that pretends to be main. We assert RemoteStore proxies all three
operations correctly, runs migration when versions differ, and that the
install/uninstall pair restores ``homeassistant.helpers.storage.Store``
to its original value.
"""
import asyncio
import tempfile
from typing import Any
from hass_client.channel import Channel
from hass_client.flow_runner import FlowRunner
from hass_client.remote_store import RemoteStore, install_remote_store
import pytest
from homeassistant.helpers import storage as _storage
class _LoopbackWriter:
def __init__(self, target: asyncio.StreamReader) -> None:
self._target = target
def write(self, data: bytes) -> None:
self._target.feed_data(data)
async def drain(self) -> None:
return None
def close(self) -> None:
self._target.feed_eof()
async def wait_closed(self) -> None:
return None
def _make_channel_pair() -> tuple[Channel, Channel]:
reader_a = asyncio.StreamReader()
reader_b = asyncio.StreamReader()
return (
Channel(reader_a, _LoopbackWriter(reader_b), name="main"), # type: ignore[arg-type]
Channel(reader_b, _LoopbackWriter(reader_a), name="sandbox"), # type: ignore[arg-type]
)
@pytest.fixture(name="channels")
async def _channels_fixture() -> tuple[Channel, Channel]:
main, sandbox = _make_channel_pair()
yield main, sandbox
await main.close()
await sandbox.close()
@pytest.fixture(name="hass_runtime")
async def _hass_runtime_fixture():
with tempfile.TemporaryDirectory(prefix="sandbox_v2_remote_store_") as tmp:
flow_runner = await FlowRunner.create(config_dir=tmp)
try:
yield flow_runner.hass
finally:
await flow_runner.async_stop()
@pytest.fixture(name="installed_store")
def _installed_store_fixture(channels: tuple[Channel, Channel]):
"""Install RemoteStore on the channel pair and tear it down afterwards."""
_main, sandbox = channels
uninstall = install_remote_store(sandbox)
yield sandbox
uninstall()
async def test_install_replaces_store_symbol(channels: tuple[Channel, Channel]) -> None:
"""``install_remote_store`` patches ``storage.Store`` and uninstall restores it."""
_main, sandbox = channels
original = _storage.Store
uninstall = install_remote_store(sandbox)
try:
assert _storage.Store is RemoteStore
finally:
uninstall()
assert _storage.Store is original
assert RemoteStore._channel is None # noqa: SLF001
async def test_save_then_load_round_trip(
channels: tuple[Channel, Channel],
hass_runtime: Any,
installed_store: Channel,
) -> None:
"""A round-trip through a fake main returns the saved data."""
main, _sandbox = channels
saved: dict[str, Any] = {}
async def _on_save(payload: dict[str, Any]) -> dict[str, bool]:
saved[payload["key"]] = payload["data"]
return {"ok": True}
async def _on_load(payload: dict[str, Any]) -> dict[str, Any] | None:
return saved.get(payload["key"])
main.register("sandbox_v2/store_save", _on_save)
main.register("sandbox_v2/store_load", _on_load)
main.start()
installed_store.start()
store = _storage.Store(hass_runtime, 1, "phase8_demo")
await store.async_save({"hello": "world"})
assert saved["phase8_demo"]["data"] == {"hello": "world"}
assert saved["phase8_demo"]["version"] == 1
# Fresh Store with no in-memory data must round-trip through main.
other = _storage.Store(hass_runtime, 1, "phase8_demo")
loaded = await other.async_load()
assert loaded == {"hello": "world"}
async def test_load_returns_none_when_main_has_no_data(
channels: tuple[Channel, Channel],
hass_runtime: Any,
installed_store: Channel,
) -> None:
"""A missing key reads back as ``None`` (matching ``Store`` semantics)."""
main, _sandbox = channels
async def _on_load(_payload: dict[str, Any]) -> None:
return None
main.register("sandbox_v2/store_load", _on_load)
main.start()
installed_store.start()
store = _storage.Store(hass_runtime, 1, "missing_key")
assert await store.async_load() is None
async def test_remove_proxies_to_main(
channels: tuple[Channel, Channel],
hass_runtime: Any,
installed_store: Channel,
) -> None:
"""``async_remove`` fires a ``sandbox_v2/store_remove`` RPC."""
main, _sandbox = channels
removed: list[str] = []
async def _on_remove(payload: dict[str, Any]) -> dict[str, bool]:
removed.append(payload["key"])
return {"ok": True}
main.register("sandbox_v2/store_remove", _on_remove)
main.start()
installed_store.start()
store = _storage.Store(hass_runtime, 1, "drop_me")
await store.async_remove()
assert removed == ["drop_me"]
async def test_migration_runs_when_version_differs(
channels: tuple[Channel, Channel],
hass_runtime: Any,
installed_store: Channel,
) -> None:
"""A wrapped v1 payload prompts the subclass's migrate_func + a write back."""
main, _sandbox = channels
saved: dict[str, Any] = {}
async def _on_save(payload: dict[str, Any]) -> dict[str, bool]:
saved[payload["key"]] = payload["data"]
return {"ok": True}
async def _on_load(_payload: dict[str, Any]) -> dict[str, Any]:
return {
"version": 1,
"minor_version": 1,
"key": "phase8_migrating",
"data": {"shape": "old"},
}
main.register("sandbox_v2/store_save", _on_save)
main.register("sandbox_v2/store_load", _on_load)
main.start()
installed_store.start()
class _MigratingStore(_storage.Store):
async def _async_migrate_func(
self,
old_major_version: int,
old_minor_version: int,
old_data: dict[str, Any],
) -> dict[str, Any]:
return {"shape": "new", "was": old_data["shape"]}
store = _MigratingStore(hass_runtime, 2, "phase8_migrating")
loaded = await store.async_load()
assert loaded == {"shape": "new", "was": "old"}
# Migration also wrote the post-migration shape back to main.
assert saved["phase8_migrating"]["version"] == 2
assert saved["phase8_migrating"]["data"] == {"shape": "new", "was": "old"}
async def test_load_with_channel_uninstalled_returns_none(
channels: tuple[Channel, Channel],
hass_runtime: Any,
) -> None:
"""A RemoteStore without an installed channel doesn't deadlock."""
# Don't install — RemoteStore._channel stays None.
main, sandbox = channels
main.start()
sandbox.start()
# Manually construct a RemoteStore (the patched Store path is what the
# install fixture exercises; this case is the diagnostic-only path).
store = RemoteStore(hass_runtime, 1, "orphan")
assert await store.async_load() is None
@@ -1,17 +1,18 @@
"""Phase A1 tests for the ``current_sandbox`` contextvar store routing.
"""Tests for the ``current_sandbox`` contextvar store routing.
These exercise the new routing primitive that replaces the module-level
``Store`` rebinding (``install_remote_store`` / :class:`RemoteStore`):
These exercise the routing primitive that replaced the former
module-level ``Store`` rebinding (the deleted ``remote_store`` module):
* The first five tests drive ``Store``'s public API through the contextvar
branch using an in-memory ``_FakeBridge`` set on ``current_sandbox``
directly — no channel. They map 1:1 to the plan's "Phase A1 — Tests
added" list.
* Most tests drive ``Store``'s public API through the contextvar branch
using an in-memory ``_FakeBridge`` set on ``current_sandbox`` directly —
no channel. They cover load/unwrap, missing keys, migration, the
no-sandbox disk path, the ``restore_state`` warm-load, contextvar task
inheritance, and (the A2 guard) the ``async_delay_save`` / FINAL_WRITE
flush.
* The final test exercises the concrete :class:`ChannelSandboxBridge` over
an in-memory channel pair, covering the wire mapping of the new
``sandbox_bridge.py`` file directly (in A1 the same mapping is still
covered transitively by ``test_remote_store.py``; A2 keeps this one when
it deletes that file).
an in-memory channel pair, covering the wire mapping of
``sandbox_bridge.py`` directly — the coverage ``test_remote_store.py``
used to provide transitively before A2 deleted it.
"""
import asyncio
@@ -24,6 +25,7 @@ from hass_client.flow_runner import FlowRunner
from hass_client.sandbox_bridge import ChannelSandboxBridge
import pytest
from homeassistant.const import EVENT_HOMEASSISTANT_FINAL_WRITE
from homeassistant.core import HomeAssistant
from homeassistant.helpers import restore_state, storage as _storage
from homeassistant.helpers.sandbox_context import current_sandbox
@@ -48,6 +50,11 @@ class _FakeBridge:
return self.loaded.get(key)
async def async_store_save(self, key: str, data: Any) -> None:
# Mirror ``ChannelSandboxBridge``: resolve a deferred ``data_func``
# (handed down by ``async_delay_save``) into a concrete ``data`` key
# before recording the envelope.
if "data_func" in data:
data["data"] = data.pop("data_func")()
self.saved[key] = data
async def async_store_remove(self, key: str) -> None:
@@ -183,8 +190,8 @@ async def test_restore_state_warm_load_without_workaround(
) -> None:
"""``RestoreStateData`` warm-load reaches the bridge with no store swap.
The smoking gun that the contextvar fix subsumes the
``data.store = RemoteStore(...)`` workaround in ``sandbox.py``: a vanilla
The smoking gun that the contextvar fix subsumes the explicit
``data.store`` swap A2 deleted from ``sandbox.py``: a vanilla
``RestoreStateData`` (which captured the original ``Store`` at module
import) routes its ``async_load`` to the bridge purely because the
contextvar is set — no explicit store replacement.
@@ -222,6 +229,35 @@ async def test_contextvar_inherits_across_create_task(
assert result == {"from": "task"}
async def test_delayed_save_flushes_through_bridge(
hass_runtime: HomeAssistant,
bridge: _FakeBridge,
) -> None:
"""``async_delay_save`` + the FINAL_WRITE flush route through the bridge.
The A2 regression guard. ``async_delay_save`` and the
EVENT_HOMEASSISTANT_FINAL_WRITE flush bypass ``async_save`` entirely —
they funnel through ``_async_handle_write_data`` -> ``_async_write_data``.
The contextvar branch must live at ``_async_write_data`` (not only
``async_save``) or these writes would silently land on the sandbox's
local disk instead of reaching main. The Phase 8 store subclass
overrode ``_async_write_data`` and masked this; deleting it surfaced the
gap.
"""
store = _storage.Store(hass_runtime, 1, "delayed")
store.async_delay_save(lambda: {"foo": "bar"}, delay=0)
# The FINAL_WRITE listener (armed by async_delay_save) flushes the
# pending envelope; whichever of it / the delay timer fires first wins
# the write lock, the other is a no-op. Either path is _async_write_data.
hass_runtime.bus.async_fire(EVENT_HOMEASSISTANT_FINAL_WRITE)
await hass_runtime.async_block_till_done()
assert "delayed" in bridge.saved
assert bridge.saved["delayed"]["key"] == "delayed"
assert bridge.saved["delayed"]["data"] == {"foo": "bar"}
# --- ChannelSandboxBridge wire mapping (the new sandbox_bridge.py file) ---
@@ -127,7 +127,7 @@ async def test_shutdown_returns_restore_state_payload(
) -> None:
"""The shutdown handler returns a JSON-safe restore-state snapshot.
Restore state ships back in the reply (not via ``RemoteStore``)
Restore state ships back in the reply (not via the store bridge)
because the channel reader task is busy dispatching the shutdown
handler — a re-entrant ``store_save`` call would deadlock. Main is
responsible for persisting the payload before SIGTERM.
@@ -165,7 +165,7 @@ async def test_shutdown_fires_final_write_event(
Concurrent channel dispatcher means the FINAL_WRITE fire-and-drain
inside the shutdown handler no longer deadlocks against re-entrant
``RemoteStore`` writes triggered by listeners on the event.
bridge writes triggered by listeners on the event.
"""
runtime, main_channel, _task = runtime_pair
hass = runtime._flow_runner.hass # noqa: SLF001
@@ -189,7 +189,7 @@ async def test_shutdown_flushes_pending_delay_save(
runtime_pair: tuple[SandboxRuntime, Channel, asyncio.Task[int]],
capsys: pytest.CaptureFixture[str],
) -> None:
"""Phase 12: ``async_delay_save`` writes flush through ``RemoteStore``.
"""Phase 12: ``async_delay_save`` writes flush through the store bridge.
Without the concurrent channel dispatcher this would deadlock: the
Store's FINAL_WRITE listener would re-enter the same channel reader
@@ -207,9 +207,10 @@ async def test_shutdown_flushes_pending_delay_save(
main_channel.register(MSG_STORE_SAVE, _on_store_save)
# ``install_remote_store`` already patched ``Store`` during ``run()``,
# so resolving ``Store`` via the module attribute returns the
# ``RemoteStore`` subclass that routes writes at our main channel.
# ``run()`` already set ``current_sandbox`` to the channel bridge, so a
# vanilla ``Store`` routes its writes to our main channel at IO time —
# the FINAL_WRITE flush below funnels through ``_async_write_data``,
# which reads the contextvar inside the shutdown handler's task context.
store = _storage.Store(hass, 1, "phase12_test")
store.async_delay_save(lambda: {"pending": True}, 3600)