sandbox_v2: Transport/Codec seam (transport T1)

Split the control channel into three layers so the wire format and the
byte transport can each be swapped without touching the concurrency-
critical dispatch core:

* Channel — dispatch core (pending map, inflight semaphore, register/
  call/push/close); speaks Frame objects, never raw bytes.
* Codec (Protocol) + JsonCodec — Frame <-> bytes. JsonCodec is
  line-compatible with the old wire shape.
* Transport (Protocol) + StreamTransport — whole frame blobs over a
  reader/writer pair using a 4-byte big-endian length prefix (caps frame
  size at 16 MiB and aborts the channel on overflow). Channel.from_transport
  is the drop-in seam for a future WebSocketTransport.

Replaces the stdout text marker (sandbox_v2:ready) with a MSG_READY
*frame* sent as the channel's first message; the manager registers a
handler for it and flips to "running" on arrival, so stdout now carries
nothing but channel frames. Net behavior identical — still JSON, still
stdio — only the framing and handshake changed.

Both channel.py mirrors and protocol.py mirrors updated in lockstep.
Handshake/marker test assertions updated; added coverage for the
from_transport seam via an in-memory queue transport.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Paulus
2026-06-03 06:23:36 -04:00
parent a0732f3e09
commit 8389f7ad96
9 changed files with 753 additions and 223 deletions
+324 -77
View File
@@ -1,32 +1,54 @@
"""JSON-line request/response channel between manager and sandbox runtime.
"""Request/response channel between manager and sandbox runtime.
The wire format is intentionally trivial — one JSON object per line:
The channel is split into three layers so the wire format and the byte
transport can each be swapped without touching the concurrency-critical
dispatch core:
* **request** (call): ``{"id": int, "type": str, "payload": Any}``
* **response**: ``{"id": int, "ok": bool, "result": Any}``
or ``{"id": int, "ok": false, "error": str, "error_type": str}``
* **push** (one-way): ``{"type": str, "payload": Any}`` — no ``id``, no reply
* :class:`Channel` — the dispatch core: pending-id map, inflight
semaphore, ``register`` / ``call`` / ``push`` / ``close``. It speaks in
:class:`Frame` objects and never touches raw bytes.
* :class:`Codec` — turns a :class:`Frame` into bytes and back.
:class:`JsonCodec` is the line-compatible default (one JSON object per
frame); :mod:`.codec_protobuf` adds the protobuf wire on top of the same
seam.
* :class:`Transport` — moves whole frame blobs over some byte channel.
:class:`StreamTransport` length-prefixes each frame (4-byte big-endian
length + body) over an :class:`asyncio.StreamReader` /
:class:`asyncio.StreamWriter` pair (stdio, unix socket). A future
``WebSocketTransport`` drops in via :meth:`Channel.from_transport` using
aiohttp's native binary framing.
Each side wraps its inbound/outbound byte streams in a :class:`Channel`. The
channel is symmetric: either side may call or be called on. The same class
runs in the HA Core integration and inside the sandbox subprocess (the
sandbox side lives at :mod:`hass_client.channel`; the two are kept in sync
by the protocol shape rather than a shared import — the integration must
not depend on ``hass_client``).
The :class:`Frame` shape mirrors the three message kinds that cross the
wire:
Inbound calls and pushes are dispatched in their own tasks so a handler that
itself issues :meth:`Channel.call` does not block the reader — the reply for
the nested call has to come back through the same reader. A bounded
semaphore caps how many handlers can run concurrently; the N+1th inbound
message queues at the semaphore (not at the reader) until a slot frees up.
* **call**: ``id`` (>0), ``type``, ``payload`` — expects a reply
* **push**: ``id`` 0, ``type``, ``payload`` — one-way, no reply
* **response**: ``id`` (>0), ``ok``, and either ``result`` or
``error`` / ``error_type`` / ``error_data``
The channel is symmetric: either side may call or be called on. The same
class runs in the HA Core integration and inside the sandbox subprocess
(the sandbox side lives at :mod:`hass_client.channel`; the two are kept in
sync by the protocol shape rather than a shared import — the integration
must not depend on ``hass_client``).
Inbound calls and pushes are dispatched in their own tasks so a handler
that itself issues :meth:`Channel.call` does not block the reader — the
reply for the nested call has to come back through the same reader. A
bounded semaphore caps how many handlers can run concurrently; the N+1th
inbound message queues at the semaphore (not at the reader) until a slot
frees up.
"""
import asyncio
from collections.abc import Awaitable, Callable, Coroutine
import contextlib
from dataclasses import dataclass, field
from enum import StrEnum
import json
import logging
from typing import Any
import struct
from typing import Any, Protocol
import voluptuous as vol
@@ -36,6 +58,13 @@ Handler = Callable[[Any], Awaitable[Any]]
DEFAULT_MAX_INFLIGHT = 16
# Hard cap on a single frame's body. A length prefix larger than this aborts
# the channel rather than letting a compromised sandbox allocate the host to
# death (same hardening spirit as the auth key check).
MAX_FRAME_SIZE = 16 * 1024 * 1024
_LENGTH_PREFIX = struct.Struct(">I")
def _serialize_invalid(err: vol.Invalid) -> dict[str, Any]:
"""Capture a ``vol.Invalid``'s message + path so the peer can rebuild it.
@@ -66,6 +95,188 @@ def error_data_for(err: BaseException) -> dict[str, Any] | None:
return None
class FrameKind(StrEnum):
"""Which of the three wire shapes a :class:`Frame` carries."""
CALL = "call"
PUSH = "push"
RESPONSE = "response"
@dataclass(slots=True)
class Frame:
"""Transport/codec-neutral representation of one wire message."""
kind: FrameKind
id: int = 0
type: str = ""
payload: Any = None
ok: bool = False
result: Any = None
error: str | None = None
error_type: str | None = None
error_data: dict[str, Any] | None = field(default=None)
@classmethod
def call(cls, call_id: int, msg_type: str, payload: Any) -> Frame:
"""Build a request frame that expects a reply."""
return cls(FrameKind.CALL, id=call_id, type=msg_type, payload=payload)
@classmethod
def push(cls, msg_type: str, payload: Any) -> Frame:
"""Build a one-way push frame."""
return cls(FrameKind.PUSH, id=0, type=msg_type, payload=payload)
@classmethod
def ok_response(cls, call_id: int, result: Any) -> Frame:
"""Build a success response frame."""
return cls(FrameKind.RESPONSE, id=call_id, ok=True, result=result)
@classmethod
def error_response(
cls,
call_id: int,
error: str,
error_type: str | None,
error_data: dict[str, Any] | None = None,
) -> Frame:
"""Build a failure response frame."""
return cls(
FrameKind.RESPONSE,
id=call_id,
ok=False,
error=error,
error_type=error_type,
error_data=error_data,
)
class Codec(Protocol):
"""Serialises a :class:`Frame` to bytes and back."""
def encode(self, frame: Frame) -> bytes:
"""Return the wire bytes for ``frame``."""
def decode(self, data: bytes) -> Frame:
"""Rebuild a :class:`Frame` from wire bytes."""
class JsonCodec:
"""One-JSON-object-per-frame codec.
Line-compatible with the original wire shape (sans the trailing
newline, which the length prefix replaces). Kept as the default for
tests and debugging; production rides :class:`ProtobufCodec`.
"""
def encode(self, frame: Frame) -> bytes:
"""Encode a frame to a compact JSON object."""
message: dict[str, Any]
if frame.kind is FrameKind.CALL:
message = {"id": frame.id, "type": frame.type, "payload": frame.payload}
elif frame.kind is FrameKind.PUSH:
message = {"type": frame.type, "payload": frame.payload}
elif frame.ok:
message = {"id": frame.id, "ok": True, "result": frame.result}
else:
message = {
"id": frame.id,
"ok": False,
"error": frame.error,
"error_type": frame.error_type,
}
if frame.error_data is not None:
message["error_data"] = frame.error_data
return json.dumps(message, separators=(",", ":")).encode("utf-8")
def decode(self, data: bytes) -> Frame:
"""Decode a JSON object into a frame, inferring the kind from keys."""
message = json.loads(data)
has_id = "id" in message
has_type = "type" in message
if has_id and not has_type:
# Response to a call we sent out.
if message.get("ok"):
return Frame.ok_response(message["id"], message.get("result"))
return Frame.error_response(
message["id"],
message.get("error", "unknown error"),
message.get("error_type"),
message.get("error_data"),
)
if not has_id:
return Frame.push(message.get("type", ""), message.get("payload"))
return Frame.call(message["id"], message["type"], message.get("payload"))
class Transport(Protocol):
"""Moves whole frame blobs over some byte channel."""
async def read_frame(self) -> bytes | None:
"""Return the next frame's bytes, or ``None`` at end-of-stream."""
async def write_frame(self, data: bytes) -> None:
"""Write one frame's bytes."""
def close(self) -> None:
"""Begin closing the underlying channel."""
async def wait_closed(self) -> None:
"""Wait for the underlying channel to finish closing."""
class FrameTooLargeError(Exception):
"""A peer announced a frame larger than :data:`MAX_FRAME_SIZE`."""
class StreamTransport:
"""Length-prefixed framing over a reader/writer pair.
Each frame is a 4-byte big-endian length followed by exactly that many
body bytes. Used for stdio and unix-socket connections — anywhere the
byte channel is an :class:`asyncio.StreamReader` /
:class:`asyncio.StreamWriter` pair.
"""
def __init__(
self,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
) -> None:
"""Wrap a reader/writer pair with length-prefixed framing."""
self._reader = reader
self._writer = writer
async def read_frame(self) -> bytes | None:
"""Read one length-prefixed frame, or ``None`` at clean EOF."""
try:
header = await self._reader.readexactly(_LENGTH_PREFIX.size)
except asyncio.IncompleteReadError:
return None
(length,) = _LENGTH_PREFIX.unpack(header)
if length > MAX_FRAME_SIZE:
raise FrameTooLargeError(
f"frame length {length} exceeds cap {MAX_FRAME_SIZE}"
)
try:
return await self._reader.readexactly(length)
except asyncio.IncompleteReadError:
return None
async def write_frame(self, data: bytes) -> None:
"""Write one length-prefixed frame and flush it."""
self._writer.write(_LENGTH_PREFIX.pack(len(data)) + data)
await self._writer.drain()
def close(self) -> None:
"""Close the writer side of the connection."""
self._writer.close()
async def wait_closed(self) -> None:
"""Wait for the writer to finish closing."""
await self._writer.wait_closed()
class ChannelClosedError(Exception):
"""Raised when an operation is attempted on a closed channel."""
@@ -91,26 +302,37 @@ class ChannelRemoteError(Exception):
class Channel:
"""One bidirectional request/response channel over a line-oriented stream."""
"""One bidirectional request/response channel over a transport + codec."""
def __init__(
self,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
reader: asyncio.StreamReader | None = None,
writer: asyncio.StreamWriter | None = None,
*,
transport: Transport | None = None,
codec: Codec | None = None,
name: str = "channel",
max_inflight: int = DEFAULT_MAX_INFLIGHT,
) -> None:
"""Wrap a reader/writer pair into a request/response channel.
"""Wrap a reader/writer pair (or a transport) into a channel.
``max_inflight`` bounds how many handler tasks may run at once.
Once the cap is reached, the read loop keeps draining the wire
but newly-spawned handlers wait on the semaphore until a slot
frees up — so a misbehaving integration can't starve the reader
by fanning out unbounded inbound work.
The common case passes a ``reader``/``writer`` pair, framed with
:class:`StreamTransport` (length-prefixed). To run over a non-stream
transport (e.g. websockets), pass ``transport=`` instead — see
:meth:`from_transport`.
``codec`` defaults to :class:`JsonCodec`. ``max_inflight`` bounds how
many handler tasks may run at once. Once the cap is reached, the read
loop keeps draining the wire but newly-spawned handlers wait on the
semaphore until a slot frees up — so a misbehaving integration can't
starve the reader by fanning out unbounded inbound work.
"""
self._reader = reader
self._writer = writer
if transport is None:
if reader is None or writer is None:
raise TypeError("Channel needs a reader/writer pair or a transport")
transport = StreamTransport(reader, writer)
self._transport: Transport = transport
self._codec: Codec = codec if codec is not None else JsonCodec()
self._name = name
self._next_id = 1
self._pending: dict[int, asyncio.Future[Any]] = {}
@@ -121,6 +343,24 @@ class Channel:
self._inflight: set[asyncio.Task[None]] = set()
self._inflight_sem = asyncio.Semaphore(max_inflight)
@classmethod
def from_transport(
cls,
transport: Transport,
*,
codec: Codec | None = None,
name: str = "channel",
max_inflight: int = DEFAULT_MAX_INFLIGHT,
) -> Channel:
"""Build a channel over an arbitrary :class:`Transport`.
This is the seam a future ``WebSocketTransport`` drops into — the
dispatch core is identical regardless of how frames reach the wire.
"""
return cls(
transport=transport, codec=codec, name=name, max_inflight=max_inflight
)
@property
def closed(self) -> bool:
"""Return True once the channel has been closed."""
@@ -154,7 +394,7 @@ class Channel:
future: asyncio.Future[Any] = asyncio.get_running_loop().create_future()
self._pending[call_id] = future
try:
await self._write({"id": call_id, "type": msg_type, "payload": payload})
await self._write(Frame.call(call_id, msg_type, payload))
if timeout is None:
return await future
return await asyncio.wait_for(future, timeout=timeout)
@@ -165,7 +405,7 @@ class Channel:
"""Send a one-way push message; the remote does not reply."""
if self._closed:
raise ChannelClosedError(f"channel {self._name!r} is closed")
await self._write({"type": msg_type, "payload": payload})
await self._write(Frame.push(msg_type, payload))
async def close(self) -> None:
"""Close the channel and cancel any in-flight calls."""
@@ -182,9 +422,9 @@ class Channel:
for task in inflight:
task.cancel()
with contextlib.suppress(Exception):
self._writer.close()
self._transport.close()
with contextlib.suppress(asyncio.CancelledError):
await self._writer.wait_closed()
await self._transport.wait_closed()
if self._reader_task is not None:
self._reader_task.cancel()
with contextlib.suppress(asyncio.CancelledError, Exception):
@@ -193,26 +433,31 @@ class Channel:
if inflight:
await asyncio.gather(*inflight, return_exceptions=True)
async def _write(self, message: dict[str, Any]) -> None:
line = json.dumps(message, separators=(",", ":")).encode("utf-8") + b"\n"
async def _write(self, frame: Frame) -> None:
data = self._codec.encode(frame)
async with self._write_lock:
self._writer.write(line)
await self._writer.drain()
await self._transport.write_frame(data)
async def _read_loop(self) -> None:
try:
while True:
line = await self._reader.readline()
if not line:
try:
data = await self._transport.read_frame()
except FrameTooLargeError as err:
_LOGGER.error("Channel %s: %s; aborting channel", self._name, err)
return
if data is None:
return
try:
message = json.loads(line)
except json.JSONDecodeError:
frame = self._codec.decode(data)
except Exception: # noqa: BLE001
_LOGGER.warning(
"Channel %s: dropping malformed line %r", self._name, line
"Channel %s: dropping undecodable frame (%d bytes)",
self._name,
len(data),
)
continue
self._dispatch(message)
self._dispatch(frame)
except asyncio.CancelledError:
raise
except Exception:
@@ -230,56 +475,53 @@ class Channel:
for task in list(self._inflight):
task.cancel()
def _dispatch(self, message: dict[str, Any]) -> None:
"""Route an inbound message; non-blocking — handlers run in tasks."""
if "id" in message and "type" not in message:
def _dispatch(self, frame: Frame) -> None:
"""Route an inbound frame; non-blocking — handlers run in tasks."""
if frame.kind is FrameKind.RESPONSE:
# Response to a call we sent out — set the future inline; no I/O.
call_id = message["id"]
future = self._pending.get(call_id)
future = self._pending.get(frame.id)
if future is None or future.done():
return
if message.get("ok"):
future.set_result(message.get("result"))
if frame.ok:
future.set_result(frame.result)
else:
future.set_exception(
ChannelRemoteError(
message.get("error", "unknown error"),
message.get("error_type"),
message.get("error_data"),
frame.error or "unknown error",
frame.error_type,
frame.error_data,
)
)
return
msg_type = message.get("type")
if msg_type is None:
return
handler = self._handlers.get(msg_type)
payload = message.get("payload")
handler = self._handlers.get(frame.type)
if "id" not in message:
if frame.kind is FrameKind.PUSH:
# One-way push. Dispatch in a task so a slow push handler
# cannot block the reader from draining the next message.
if handler is not None:
self._spawn_handler(self._run_push_handler(msg_type, handler, payload))
self._spawn_handler(
self._run_push_handler(frame.type, handler, frame.payload)
)
return
call_id = message["id"]
if handler is None:
# No work to do — write the unknown-type error directly. Still
# spawn it so a stalled writer cannot stall the reader.
self._spawn_handler(
self._write(
{
"id": call_id,
"ok": False,
"error": f"no handler for {msg_type!r}",
"error_type": "ChannelUnknownType",
}
Frame.error_response(
frame.id,
f"no handler for {frame.type!r}",
"ChannelUnknownType",
)
)
)
return
self._spawn_handler(self._run_call_handler(call_id, msg_type, handler, payload))
self._spawn_handler(
self._run_call_handler(frame.id, frame.type, handler, frame.payload)
)
def _spawn_handler(self, coro: Coroutine[Any, Any, Any]) -> None:
"""Start a handler task and track it for cancellation on close."""
@@ -319,27 +561,32 @@ class Channel:
except Exception as err: # noqa: BLE001
if self._closed:
return
frame: dict[str, Any] = {
"id": call_id,
"ok": False,
"error": str(err) or err.__class__.__name__,
"error_type": err.__class__.__name__,
}
if (error_data := error_data_for(err)) is not None:
frame["error_data"] = error_data
frame = Frame.error_response(
call_id,
str(err) or err.__class__.__name__,
err.__class__.__name__,
error_data_for(err),
)
with contextlib.suppress(Exception):
await self._write(frame)
return
if self._closed:
return
with contextlib.suppress(Exception):
await self._write({"id": call_id, "ok": True, "result": result})
await self._write(Frame.ok_response(call_id, result))
__all__ = [
"Channel",
"ChannelClosedError",
"ChannelRemoteError",
"Codec",
"Frame",
"FrameKind",
"FrameTooLargeError",
"Handler",
"JsonCodec",
"StreamTransport",
"Transport",
"error_data_for",
]
+37 -48
View File
@@ -4,11 +4,11 @@ Phase 3 building block. The manager owns one supervised subprocess per
sandbox group (``main`` / ``built-in`` / ``custom``); higher phases call
:meth:`SandboxManager.ensure_started` lazily as config entries are routed.
The websocket protocol between manager and runtime is not yet implemented
— Phase 4 plugs it in. For now the contract is just:
The contract between manager and runtime is:
* the manager launches ``python -m hass_client.sandbox_v2``
* the runtime prints :data:`READY_MARKER` to stdout once it is up
* the runtime opens the control channel and sends a :data:`MSG_READY`
frame as its first message once it is up (no stdout text marker)
* on ``SIGTERM`` the runtime exits cleanly
"""
@@ -24,14 +24,10 @@ import time
from homeassistant.core import HomeAssistant
from .channel import Channel, ChannelClosedError, ChannelRemoteError
from .protocol import MSG_SHUTDOWN
from .protocol import MSG_READY, MSG_SHUTDOWN
_LOGGER = logging.getLogger(__name__)
# Stdout token the runtime prints once it is ready to take work. Kept in
# sync with ``hass_client.sandbox.READY_MARKER``.
READY_MARKER = "sandbox_v2:ready"
DEFAULT_RESTART_LIMIT = 3
DEFAULT_RESTART_WINDOW = 60.0
DEFAULT_RESTART_BACKOFF = 1.0
@@ -85,10 +81,11 @@ class SandboxProcess:
) -> None:
"""Initialise a supervised sandbox subprocess.
``on_channel_ready`` is invoked with the live :class:`Channel` once
the runtime has printed its ready marker. It runs synchronously on
the manager's loop — register Phase 4 protocol handlers there
before any caller can issue a call.
``on_channel_ready`` is invoked with the live :class:`Channel` as
soon as it is opened — before the runtime's :data:`MSG_READY`
frame arrives — so its handlers are in place before the runtime's
own warm-load round-trip lands. It runs synchronously on the
manager's loop.
``on_shutdown_reply`` is invoked with the runtime's reply to
:data:`MSG_SHUTDOWN` (Phase 9) so the caller can persist any
@@ -323,7 +320,27 @@ class SandboxProcess:
return
self._process = proc
ready_task = asyncio.create_task(self._await_ready(proc))
# Open the channel up front — stdout carries nothing but frames now.
# Handlers go on before the reader starts so the runtime's warm-load
# round-trip (and any early push) is never dropped.
channel = self._open_channel(proc)
self._channel = channel
ready_frame = asyncio.Event()
async def _on_ready(_payload: object) -> None:
ready_frame.set()
channel.register(MSG_READY, _on_ready)
if self._on_channel_ready is not None:
try:
self._on_channel_ready(self.group, channel)
except Exception:
_LOGGER.exception(
"Sandbox %s on_channel_ready callback raised", self.group
)
channel.start()
ready_task = asyncio.create_task(ready_frame.wait())
exit_task = asyncio.create_task(proc.wait())
stderr_task = asyncio.create_task(self._drain_stream(proc.stderr, "stderr"))
@@ -332,21 +349,10 @@ class SandboxProcess:
{ready_task, exit_task}, return_when=asyncio.FIRST_COMPLETED
)
if ready_task.done() and not ready_task.cancelled():
if ready_task.exception() is None and ready_task.result():
self._channel = self._open_channel(proc)
if self._on_channel_ready is not None:
try:
self._on_channel_ready(self.group, self._channel)
except Exception:
_LOGGER.exception(
"Sandbox %s on_channel_ready callback raised",
self.group,
)
self._channel.start()
self._state = "running"
self._ready.set()
# Hold here until the process exits.
await exit_task
self._state = "running"
self._ready.set()
# Hold here until the process exits.
await exit_task
finally:
for task in (ready_task, exit_task):
if not task.done():
@@ -366,30 +372,14 @@ class SandboxProcess:
def _open_channel(self, proc: asyncio.subprocess.Process) -> Channel:
"""Wrap the subprocess pipes in a :class:`Channel`.
Stdout is post-marker — the rest is JSON-line protocol. Stdin is
always JSON-line.
``proc.stdout`` is a :class:`~asyncio.StreamReader`; ``proc.stdin``
is a :class:`~asyncio.StreamWriter`. Both carry length-prefixed
channel frames end-to-end — there is no text preamble.
"""
assert proc.stdout is not None
assert proc.stdin is not None
# proc.stdin is a StreamWriter; proc.stdout is a StreamReader. They
# are exactly what Channel needs.
return Channel(proc.stdout, proc.stdin, name=self.group)
async def _await_ready(self, proc: asyncio.subprocess.Process) -> bool:
"""Read stdout until the ready marker arrives or stdout closes."""
stream = proc.stdout
if stream is None:
return False
while True:
line = await stream.readline()
if not line:
return False
text = line.decode("utf-8", errors="replace").rstrip()
if text:
_LOGGER.debug("sandbox %s: %s", self.group, text)
if READY_MARKER in text:
return True
async def _drain_stream(
self, stream: asyncio.StreamReader | None, name: str
) -> None:
@@ -556,7 +546,6 @@ class SandboxManager:
__all__ = [
"READY_MARKER",
"CommandFactory",
"SandboxConfig",
"SandboxFailedError",
@@ -66,6 +66,12 @@ Main → Sandbox shutdown (Phase 9):
from typing import Final
# Handshake (Sandbox → Main): the runtime's first frame on the channel.
# Replaces the old ``sandbox_v2:ready`` stdout text marker — the manager
# registers a handler for this push and treats its arrival as "running",
# so stdout carries nothing but channel frames.
MSG_READY: Final = "sandbox_v2/ready"
# Main → Sandbox
MSG_ENTRY_SETUP: Final = "sandbox_v2/entry_setup"
MSG_ENTRY_UNLOAD: Final = "sandbox_v2/entry_unload"
@@ -89,6 +95,7 @@ __all__ = [
"MSG_ENTRY_SETUP",
"MSG_ENTRY_UNLOAD",
"MSG_FIRE_EVENT",
"MSG_READY",
"MSG_REGISTER_ENTITY",
"MSG_REGISTER_SERVICE",
"MSG_SHUTDOWN",
+292 -68
View File
@@ -4,21 +4,26 @@ Kept as a stand-alone module to honour the project boundary: the HA Core
integration must not import from ``hass_client`` at integration-load time,
and ``hass_client`` does not pull from ``homeassistant.components.*``. The
two files speak the same wire format — see the docstring on the HA side
for the message schema.
for the layering (Channel / Codec / Transport) and the :class:`Frame`
shape.
Inbound calls and pushes are dispatched in their own tasks so a handler that
itself issues :meth:`Channel.call` does not block the reader — the reply for
the nested call has to come back through the same reader. A bounded
semaphore caps how many handlers can run concurrently; the N+1th inbound
message queues at the semaphore (not at the reader) until a slot frees up.
Inbound calls and pushes are dispatched in their own tasks so a handler
that itself issues :meth:`Channel.call` does not block the reader — the
reply for the nested call has to come back through the same reader. A
bounded semaphore caps how many handlers can run concurrently; the N+1th
inbound message queues at the semaphore (not at the reader) until a slot
frees up.
"""
import asyncio
from collections.abc import Awaitable, Callable, Coroutine
import contextlib
from dataclasses import dataclass, field
from enum import StrEnum
import json
import logging
from typing import Any
import struct
from typing import Any, Protocol
import voluptuous as vol
@@ -28,6 +33,13 @@ Handler = Callable[[Any], Awaitable[Any]]
DEFAULT_MAX_INFLIGHT = 16
# Hard cap on a single frame's body. A length prefix larger than this aborts
# the channel rather than letting a compromised peer allocate the process to
# death.
MAX_FRAME_SIZE = 16 * 1024 * 1024
_LENGTH_PREFIX = struct.Struct(">I")
def _serialize_invalid(err: vol.Invalid) -> dict[str, Any]:
"""Capture a ``vol.Invalid``'s message + path so main can rebuild it.
@@ -58,6 +70,186 @@ def error_data_for(err: BaseException) -> dict[str, Any] | None:
return None
class FrameKind(StrEnum):
"""Which of the three wire shapes a :class:`Frame` carries."""
CALL = "call"
PUSH = "push"
RESPONSE = "response"
@dataclass(slots=True)
class Frame:
"""Transport/codec-neutral representation of one wire message."""
kind: FrameKind
id: int = 0
type: str = ""
payload: Any = None
ok: bool = False
result: Any = None
error: str | None = None
error_type: str | None = None
error_data: dict[str, Any] | None = field(default=None)
@classmethod
def call(cls, call_id: int, msg_type: str, payload: Any) -> Frame:
"""Build a request frame that expects a reply."""
return cls(FrameKind.CALL, id=call_id, type=msg_type, payload=payload)
@classmethod
def push(cls, msg_type: str, payload: Any) -> Frame:
"""Build a one-way push frame."""
return cls(FrameKind.PUSH, id=0, type=msg_type, payload=payload)
@classmethod
def ok_response(cls, call_id: int, result: Any) -> Frame:
"""Build a success response frame."""
return cls(FrameKind.RESPONSE, id=call_id, ok=True, result=result)
@classmethod
def error_response(
cls,
call_id: int,
error: str,
error_type: str | None,
error_data: dict[str, Any] | None = None,
) -> Frame:
"""Build a failure response frame."""
return cls(
FrameKind.RESPONSE,
id=call_id,
ok=False,
error=error,
error_type=error_type,
error_data=error_data,
)
class Codec(Protocol):
"""Serialises a :class:`Frame` to bytes and back."""
def encode(self, frame: Frame) -> bytes:
"""Return the wire bytes for ``frame``."""
def decode(self, data: bytes) -> Frame:
"""Rebuild a :class:`Frame` from wire bytes."""
class JsonCodec:
"""One-JSON-object-per-frame codec.
Line-compatible with the original wire shape (sans the trailing
newline, which the length prefix replaces). Kept as the default for
tests and debugging; production rides :class:`ProtobufCodec`.
"""
def encode(self, frame: Frame) -> bytes:
"""Encode a frame to a compact JSON object."""
message: dict[str, Any]
if frame.kind is FrameKind.CALL:
message = {"id": frame.id, "type": frame.type, "payload": frame.payload}
elif frame.kind is FrameKind.PUSH:
message = {"type": frame.type, "payload": frame.payload}
elif frame.ok:
message = {"id": frame.id, "ok": True, "result": frame.result}
else:
message = {
"id": frame.id,
"ok": False,
"error": frame.error,
"error_type": frame.error_type,
}
if frame.error_data is not None:
message["error_data"] = frame.error_data
return json.dumps(message, separators=(",", ":")).encode("utf-8")
def decode(self, data: bytes) -> Frame:
"""Decode a JSON object into a frame, inferring the kind from keys."""
message = json.loads(data)
has_id = "id" in message
has_type = "type" in message
if has_id and not has_type:
# Response to a call we sent out.
if message.get("ok"):
return Frame.ok_response(message["id"], message.get("result"))
return Frame.error_response(
message["id"],
message.get("error", "unknown error"),
message.get("error_type"),
message.get("error_data"),
)
if not has_id:
return Frame.push(message.get("type", ""), message.get("payload"))
return Frame.call(message["id"], message["type"], message.get("payload"))
class Transport(Protocol):
"""Moves whole frame blobs over some byte channel."""
async def read_frame(self) -> bytes | None:
"""Return the next frame's bytes, or ``None`` at end-of-stream."""
async def write_frame(self, data: bytes) -> None:
"""Write one frame's bytes."""
def close(self) -> None:
"""Begin closing the underlying channel."""
async def wait_closed(self) -> None:
"""Wait for the underlying channel to finish closing."""
class FrameTooLargeError(Exception):
"""A peer announced a frame larger than :data:`MAX_FRAME_SIZE`."""
class StreamTransport:
"""Length-prefixed framing over a reader/writer pair.
Each frame is a 4-byte big-endian length followed by exactly that many
body bytes. Used for stdio and unix-socket connections.
"""
def __init__(
self,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
) -> None:
"""Wrap a reader/writer pair with length-prefixed framing."""
self._reader = reader
self._writer = writer
async def read_frame(self) -> bytes | None:
"""Read one length-prefixed frame, or ``None`` at clean EOF."""
try:
header = await self._reader.readexactly(_LENGTH_PREFIX.size)
except asyncio.IncompleteReadError:
return None
(length,) = _LENGTH_PREFIX.unpack(header)
if length > MAX_FRAME_SIZE:
raise FrameTooLargeError(
f"frame length {length} exceeds cap {MAX_FRAME_SIZE}"
)
try:
return await self._reader.readexactly(length)
except asyncio.IncompleteReadError:
return None
async def write_frame(self, data: bytes) -> None:
"""Write one length-prefixed frame and flush it."""
self._writer.write(_LENGTH_PREFIX.pack(len(data)) + data)
await self._writer.drain()
def close(self) -> None:
"""Close the writer side of the connection."""
self._writer.close()
async def wait_closed(self) -> None:
"""Wait for the writer to finish closing."""
await self._writer.wait_closed()
class ChannelClosedError(Exception):
"""Raised when an operation is attempted on a closed channel."""
@@ -83,26 +275,37 @@ class ChannelRemoteError(Exception):
class Channel:
"""One bidirectional request/response channel over a line-oriented stream."""
"""One bidirectional request/response channel over a transport + codec."""
def __init__(
self,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
reader: asyncio.StreamReader | None = None,
writer: asyncio.StreamWriter | None = None,
*,
transport: Transport | None = None,
codec: Codec | None = None,
name: str = "channel",
max_inflight: int = DEFAULT_MAX_INFLIGHT,
) -> None:
"""Wrap a reader/writer pair into a request/response channel.
"""Wrap a reader/writer pair (or a transport) into a channel.
``max_inflight`` bounds how many handler tasks may run at once.
Once the cap is reached, the read loop keeps draining the wire
but newly-spawned handlers wait on the semaphore until a slot
frees up — so a misbehaving integration can't starve the reader
by fanning out unbounded inbound work.
The common case passes a ``reader``/``writer`` pair, framed with
:class:`StreamTransport` (length-prefixed). To run over a non-stream
transport (e.g. websockets), pass ``transport=`` instead — see
:meth:`from_transport`.
``codec`` defaults to :class:`JsonCodec`. ``max_inflight`` bounds how
many handler tasks may run at once. Once the cap is reached, the read
loop keeps draining the wire but newly-spawned handlers wait on the
semaphore until a slot frees up — so a misbehaving integration can't
starve the reader by fanning out unbounded inbound work.
"""
self._reader = reader
self._writer = writer
if transport is None:
if reader is None or writer is None:
raise TypeError("Channel needs a reader/writer pair or a transport")
transport = StreamTransport(reader, writer)
self._transport: Transport = transport
self._codec: Codec = codec if codec is not None else JsonCodec()
self._name = name
self._next_id = 1
self._pending: dict[int, asyncio.Future[Any]] = {}
@@ -113,6 +316,24 @@ class Channel:
self._inflight: set[asyncio.Task[None]] = set()
self._inflight_sem = asyncio.Semaphore(max_inflight)
@classmethod
def from_transport(
cls,
transport: Transport,
*,
codec: Codec | None = None,
name: str = "channel",
max_inflight: int = DEFAULT_MAX_INFLIGHT,
) -> Channel:
"""Build a channel over an arbitrary :class:`Transport`.
This is the seam a future ``WebSocketTransport`` drops into — the
dispatch core is identical regardless of how frames reach the wire.
"""
return cls(
transport=transport, codec=codec, name=name, max_inflight=max_inflight
)
@property
def closed(self) -> bool:
"""Return True once the channel has been closed."""
@@ -141,7 +362,7 @@ class Channel:
future: asyncio.Future[Any] = asyncio.get_running_loop().create_future()
self._pending[call_id] = future
try:
await self._write({"id": call_id, "type": msg_type, "payload": payload})
await self._write(Frame.call(call_id, msg_type, payload))
if timeout is None:
return await future
return await asyncio.wait_for(future, timeout=timeout)
@@ -152,7 +373,7 @@ class Channel:
"""Send a one-way push message; the remote does not reply."""
if self._closed:
raise ChannelClosedError(f"channel {self._name!r} is closed")
await self._write({"type": msg_type, "payload": payload})
await self._write(Frame.push(msg_type, payload))
async def close(self) -> None:
"""Close the channel and cancel any in-flight calls."""
@@ -169,9 +390,9 @@ class Channel:
for task in inflight:
task.cancel()
with contextlib.suppress(Exception):
self._writer.close()
self._transport.close()
with contextlib.suppress(asyncio.CancelledError):
await self._writer.wait_closed()
await self._transport.wait_closed()
if self._reader_task is not None:
self._reader_task.cancel()
with contextlib.suppress(asyncio.CancelledError, Exception):
@@ -180,26 +401,33 @@ class Channel:
if inflight:
await asyncio.gather(*inflight, return_exceptions=True)
async def _write(self, message: dict[str, Any]) -> None:
line = json.dumps(message, separators=(",", ":")).encode("utf-8") + b"\n"
async def _write(self, frame: Frame) -> None:
data = self._codec.encode(frame)
async with self._write_lock:
self._writer.write(line)
await self._writer.drain()
await self._transport.write_frame(data)
async def _read_loop(self) -> None:
try:
while True:
line = await self._reader.readline()
if not line:
try:
data = await self._transport.read_frame()
except FrameTooLargeError as err:
_LOGGER.error(
"channel %s: %s; aborting channel", self._name, err
)
return
if data is None:
return
try:
message = json.loads(line)
except json.JSONDecodeError:
frame = self._codec.decode(data)
except Exception: # noqa: BLE001
_LOGGER.warning(
"channel %s: dropping malformed line %r", self._name, line
"channel %s: dropping undecodable frame (%d bytes)",
self._name,
len(data),
)
continue
self._dispatch(message)
self._dispatch(frame)
except asyncio.CancelledError:
raise
except Exception:
@@ -218,54 +446,47 @@ class Channel:
for task in list(self._inflight):
task.cancel()
def _dispatch(self, message: dict[str, Any]) -> None:
"""Route an inbound message; non-blocking — handlers run in tasks."""
if "id" in message and "type" not in message:
call_id = message["id"]
future = self._pending.get(call_id)
def _dispatch(self, frame: Frame) -> None:
"""Route an inbound frame; non-blocking — handlers run in tasks."""
if frame.kind is FrameKind.RESPONSE:
future = self._pending.get(frame.id)
if future is None or future.done():
return
if message.get("ok"):
future.set_result(message.get("result"))
if frame.ok:
future.set_result(frame.result)
else:
future.set_exception(
ChannelRemoteError(
message.get("error", "unknown error"),
message.get("error_type"),
message.get("error_data"),
frame.error or "unknown error",
frame.error_type,
frame.error_data,
)
)
return
msg_type = message.get("type")
if msg_type is None:
return
handler = self._handlers.get(msg_type)
payload = message.get("payload")
handler = self._handlers.get(frame.type)
if "id" not in message:
if frame.kind is FrameKind.PUSH:
if handler is not None:
self._spawn_handler(
self._run_push_handler(msg_type, handler, payload)
self._run_push_handler(frame.type, handler, frame.payload)
)
return
call_id = message["id"]
if handler is None:
self._spawn_handler(
self._write(
{
"id": call_id,
"ok": False,
"error": f"no handler for {msg_type!r}",
"error_type": "ChannelUnknownType",
}
Frame.error_response(
frame.id,
f"no handler for {frame.type!r}",
"ChannelUnknownType",
)
)
)
return
self._spawn_handler(
self._run_call_handler(call_id, msg_type, handler, payload)
self._run_call_handler(frame.id, frame.type, handler, frame.payload)
)
def _spawn_handler(self, coro: Coroutine[Any, Any, Any]) -> None:
@@ -308,29 +529,32 @@ class Channel:
except Exception as err: # noqa: BLE001
if self._closed:
return
frame: dict[str, Any] = {
"id": call_id,
"ok": False,
"error": str(err) or err.__class__.__name__,
"error_type": err.__class__.__name__,
}
if (error_data := error_data_for(err)) is not None:
frame["error_data"] = error_data
frame = Frame.error_response(
call_id,
str(err) or err.__class__.__name__,
err.__class__.__name__,
error_data_for(err),
)
with contextlib.suppress(Exception):
await self._write(frame)
return
if self._closed:
return
with contextlib.suppress(Exception):
await self._write(
{"id": call_id, "ok": True, "result": result}
)
await self._write(Frame.ok_response(call_id, result))
__all__ = [
"Channel",
"ChannelClosedError",
"ChannelRemoteError",
"Codec",
"Frame",
"FrameKind",
"FrameTooLargeError",
"Handler",
"JsonCodec",
"StreamTransport",
"Transport",
"error_data_for",
]
@@ -9,6 +9,11 @@ for the message catalogue.
from typing import Final
# Handshake: the runtime's first frame on the channel. Replaces the old
# stdout text marker — the manager waits for this push instead of scanning
# stdout, so stdout carries nothing but channel frames.
MSG_READY: Final = "sandbox_v2/ready"
# Main → Sandbox
MSG_ENTRY_SETUP: Final = "sandbox_v2/entry_setup"
MSG_ENTRY_UNLOAD: Final = "sandbox_v2/entry_unload"
@@ -32,6 +37,7 @@ __all__ = [
"MSG_ENTRY_SETUP",
"MSG_ENTRY_UNLOAD",
"MSG_FIRE_EVENT",
"MSG_READY",
"MSG_REGISTER_ENTITY",
"MSG_REGISTER_SERVICE",
"MSG_SHUTDOWN",
+17 -20
View File
@@ -12,10 +12,10 @@ Composes the sandbox's per-process services:
registrations and ``<owned_domain>_*`` events up to main, gated by
:class:`ApprovedDomains` (Phase 6).
The handshake order is unchanged: write the ready marker, open the
stdio channel, register handlers, then idle until SIGTERM (or until
main asks for a graceful shutdown over the channel — see Phase 9's
:meth:`SandboxRuntime._handle_shutdown`).
The handshake: open the stdio channel, send a :data:`MSG_READY` frame
as the first message, warm-load restore state, register handlers, then
idle until SIGTERM (or until main asks for a graceful shutdown over the
channel — see Phase 9's :meth:`SandboxRuntime._handle_shutdown`).
"""
import asyncio
@@ -40,7 +40,7 @@ from .entity_bridge import EntityBridge
from .entry_runner import EntryRunner
from .event_mirror import EventMirror
from .flow_runner import FlowRunner
from .protocol import MSG_SHUTDOWN
from .protocol import MSG_READY, MSG_SHUTDOWN
from .sandbox_bridge import ChannelSandboxBridge
from .service_mirror import ServiceMirror
@@ -48,18 +48,16 @@ _LOGGER = logging.getLogger(__name__)
ChannelFactory = Callable[[], Awaitable[Channel | None]]
# Stdout token the manager scans for to mark the runtime ready. Kept in
# sync with ``homeassistant.components.sandbox_v2.manager.READY_MARKER``.
READY_MARKER = "sandbox_v2:ready"
class SandboxRuntime:
"""Phase 4 runtime: stdout-marker handshake + JSON-line control channel.
"""Runtime: Ready-frame handshake + length-prefixed control channel.
The websocket URL/token still come in on the CLI for forward-compat
with Phase 7 (the scoped sandbox token will travel that path), but
Phase 4 only uses the stdin/stdout control channel that the manager
sets up after the ready marker.
with the deferred WS transport (the scoped sandbox token will travel
that path), but today the runtime only uses the stdin/stdout control
channel that the manager opens. The handshake is a :data:`MSG_READY`
frame sent as the channel's first message — there is no stdout text
marker.
"""
def __init__(
@@ -151,12 +149,6 @@ class SandboxRuntime:
self._service_mirror = ServiceMirror(hass, self._approved)
self._event_mirror = EventMirror(hass, self._approved)
# The marker MUST be written before we hand stdout to asyncio — once
# connect_write_pipe takes ownership of the FD, Python's stdout
# buffer and the StreamWriter race for bytes on the wire.
sys.stdout.write(f"{READY_MARKER}\n")
sys.stdout.flush()
self._channel = await self._channel_factory()
sandbox_token: Any = None
if self._channel is not None:
@@ -193,6 +185,11 @@ class SandboxRuntime:
# cached. Handlers register *after* the warm-load so no
# entry_setup can arrive before the cache is populated.
self._channel.start()
# Signal readiness as the channel's first outbound frame — the
# manager flips to "running" on its arrival. Sent before the
# warm-load so the handshake timing matches the old stdout
# marker (which was written before warm-load too).
await self._channel.push(MSG_READY)
await _load_restore_state(hass)
self._channel.register("sandbox_v2/ping", _handle_ping)
self._channel.register(MSG_SHUTDOWN, self._handle_shutdown)
@@ -371,4 +368,4 @@ async def _handle_ping(_payload: object) -> dict[str, str]:
return {"pong": "sandbox_v2"}
__all__ = ["READY_MARKER", "SandboxRuntime"]
__all__ = ["SandboxRuntime"]
@@ -9,7 +9,8 @@ public contract: argparser, the constant the manager scans for, and that
import asyncio
from hass_client.channel import Channel
from hass_client.sandbox import READY_MARKER, SandboxRuntime
from hass_client.protocol import MSG_READY
from hass_client.sandbox import SandboxRuntime
from hass_client.sandbox_v2.__main__ import _build_parser
import pytest
@@ -19,9 +20,9 @@ async def _noop_channel_factory() -> Channel | None:
return None
def test_ready_marker_is_stable() -> None:
"""The marker is part of the manager↔runtime protocol; do not rename."""
assert READY_MARKER == "sandbox_v2:ready"
def test_ready_msg_type_is_stable() -> None:
"""The Ready frame type is part of the manager↔runtime protocol."""
assert MSG_READY == "sandbox_v2/ready"
def test_cli_parser_requires_name_url_and_token() -> None:
@@ -103,5 +104,7 @@ async def test_runtime_shuts_down_on_request(
exit_code = await asyncio.wait_for(task, timeout=2.0)
assert exit_code == 0
captured = capsys.readouterr()
assert READY_MARKER in captured.out
# With the noop channel factory there is no channel, so no Ready frame
# is sent and stdout stays clean (the handshake is a channel frame now,
# not a stdout text marker). Drain captured output regardless.
capsys.readouterr()
@@ -6,6 +6,7 @@ import pytest
import voluptuous as vol
from homeassistant.components.sandbox_v2.channel import (
Channel,
ChannelClosedError,
ChannelRemoteError,
)
@@ -13,6 +14,57 @@ from homeassistant.components.sandbox_v2.channel import (
from ._helpers import make_channel_pair
class _QueueTransport:
"""In-memory :class:`Transport` backed by a pair of queues.
Stands in for a non-stream transport (the seam a future
``WebSocketTransport`` uses) so :meth:`Channel.from_transport` is
exercised without any reader/writer pipe.
"""
def __init__(
self, inbox: asyncio.Queue[bytes | None], outbox: asyncio.Queue[bytes | None]
) -> None:
self._inbox = inbox
self._outbox = outbox
self._closed = False
async def read_frame(self) -> bytes | None:
return await self._inbox.get()
async def write_frame(self, data: bytes) -> None:
self._outbox.put_nowait(data)
def close(self) -> None:
if not self._closed:
self._closed = True
self._inbox.put_nowait(None) # EOF sentinel for our reader
async def wait_closed(self) -> None:
return None
async def test_from_transport_round_trips() -> None:
"""A channel built over an arbitrary Transport dispatches normally."""
q1: asyncio.Queue[bytes | None] = asyncio.Queue()
q2: asyncio.Queue[bytes | None] = asyncio.Queue()
channel_a = Channel.from_transport(_QueueTransport(q1, q2), name="a")
channel_b = Channel.from_transport(_QueueTransport(q2, q1), name="b")
channel_a.start()
channel_b.start()
async def echo(payload: object) -> dict[str, object]:
return {"echoed": payload}
channel_b.register("test/echo", echo)
try:
result = await asyncio.wait_for(channel_a.call("test/echo", 7), timeout=2.0)
assert result == {"echoed": 7}
finally:
await channel_a.close()
await channel_b.close()
@pytest.fixture(name="channels")
async def _channels_fixture() -> tuple:
"""Return a paired Channel + Channel, both started, both auto-cleaned."""
@@ -82,7 +82,7 @@ async def test_graceful_shutdown_falls_through_to_sigterm_on_timeout(
) -> None:
"""A sandbox that ignores ``sandbox_v2/shutdown`` is killed by ``stop()``.
The stub runtime here writes the ready marker, then idles forever
The stub runtime here sends the Ready frame, then idles forever
reading from stdin without ever replying. ``async_graceful_shutdown``
must time out; the follow-up ``async_stop_all`` then escalates to
SIGTERM / SIGKILL.
@@ -93,9 +93,14 @@ async def test_graceful_shutdown_falls_through_to_sigterm_on_timeout(
sys.executable,
"-c",
(
"import sys, time;"
"sys.stdout.write('sandbox_v2:ready\\n');"
"sys.stdout.flush();"
"import sys, time, struct, json;"
# Length-prefixed Ready push frame — the manager's
# StreamTransport reads this and flips to "running".
"body = json.dumps("
"{'type': 'sandbox_v2/ready', 'payload': None}"
").encode();"
"sys.stdout.buffer.write(struct.pack('>I', len(body)) + body);"
"sys.stdout.buffer.flush();"
# Just sleep — stdin is wired to the manager but we never read.
"time.sleep(600)"
),