mirror of
https://github.com/home-assistant/core.git
synced 2026-06-03 18:03:43 +02:00
sandbox_v2: Transport/Codec seam (transport T1)
Split the control channel into three layers so the wire format and the byte transport can each be swapped without touching the concurrency- critical dispatch core: * Channel — dispatch core (pending map, inflight semaphore, register/ call/push/close); speaks Frame objects, never raw bytes. * Codec (Protocol) + JsonCodec — Frame <-> bytes. JsonCodec is line-compatible with the old wire shape. * Transport (Protocol) + StreamTransport — whole frame blobs over a reader/writer pair using a 4-byte big-endian length prefix (caps frame size at 16 MiB and aborts the channel on overflow). Channel.from_transport is the drop-in seam for a future WebSocketTransport. Replaces the stdout text marker (sandbox_v2:ready) with a MSG_READY *frame* sent as the channel's first message; the manager registers a handler for it and flips to "running" on arrival, so stdout now carries nothing but channel frames. Net behavior identical — still JSON, still stdio — only the framing and handshake changed. Both channel.py mirrors and protocol.py mirrors updated in lockstep. Handshake/marker test assertions updated; added coverage for the from_transport seam via an in-memory queue transport. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1,32 +1,54 @@
|
||||
"""JSON-line request/response channel between manager and sandbox runtime.
|
||||
"""Request/response channel between manager and sandbox runtime.
|
||||
|
||||
The wire format is intentionally trivial — one JSON object per line:
|
||||
The channel is split into three layers so the wire format and the byte
|
||||
transport can each be swapped without touching the concurrency-critical
|
||||
dispatch core:
|
||||
|
||||
* **request** (call): ``{"id": int, "type": str, "payload": Any}``
|
||||
* **response**: ``{"id": int, "ok": bool, "result": Any}``
|
||||
or ``{"id": int, "ok": false, "error": str, "error_type": str}``
|
||||
* **push** (one-way): ``{"type": str, "payload": Any}`` — no ``id``, no reply
|
||||
* :class:`Channel` — the dispatch core: pending-id map, inflight
|
||||
semaphore, ``register`` / ``call`` / ``push`` / ``close``. It speaks in
|
||||
:class:`Frame` objects and never touches raw bytes.
|
||||
* :class:`Codec` — turns a :class:`Frame` into bytes and back.
|
||||
:class:`JsonCodec` is the line-compatible default (one JSON object per
|
||||
frame); :mod:`.codec_protobuf` adds the protobuf wire on top of the same
|
||||
seam.
|
||||
* :class:`Transport` — moves whole frame blobs over some byte channel.
|
||||
:class:`StreamTransport` length-prefixes each frame (4-byte big-endian
|
||||
length + body) over an :class:`asyncio.StreamReader` /
|
||||
:class:`asyncio.StreamWriter` pair (stdio, unix socket). A future
|
||||
``WebSocketTransport`` drops in via :meth:`Channel.from_transport` using
|
||||
aiohttp's native binary framing.
|
||||
|
||||
Each side wraps its inbound/outbound byte streams in a :class:`Channel`. The
|
||||
channel is symmetric: either side may call or be called on. The same class
|
||||
runs in the HA Core integration and inside the sandbox subprocess (the
|
||||
sandbox side lives at :mod:`hass_client.channel`; the two are kept in sync
|
||||
by the protocol shape rather than a shared import — the integration must
|
||||
not depend on ``hass_client``).
|
||||
The :class:`Frame` shape mirrors the three message kinds that cross the
|
||||
wire:
|
||||
|
||||
Inbound calls and pushes are dispatched in their own tasks so a handler that
|
||||
itself issues :meth:`Channel.call` does not block the reader — the reply for
|
||||
the nested call has to come back through the same reader. A bounded
|
||||
semaphore caps how many handlers can run concurrently; the N+1th inbound
|
||||
message queues at the semaphore (not at the reader) until a slot frees up.
|
||||
* **call**: ``id`` (>0), ``type``, ``payload`` — expects a reply
|
||||
* **push**: ``id`` 0, ``type``, ``payload`` — one-way, no reply
|
||||
* **response**: ``id`` (>0), ``ok``, and either ``result`` or
|
||||
``error`` / ``error_type`` / ``error_data``
|
||||
|
||||
The channel is symmetric: either side may call or be called on. The same
|
||||
class runs in the HA Core integration and inside the sandbox subprocess
|
||||
(the sandbox side lives at :mod:`hass_client.channel`; the two are kept in
|
||||
sync by the protocol shape rather than a shared import — the integration
|
||||
must not depend on ``hass_client``).
|
||||
|
||||
Inbound calls and pushes are dispatched in their own tasks so a handler
|
||||
that itself issues :meth:`Channel.call` does not block the reader — the
|
||||
reply for the nested call has to come back through the same reader. A
|
||||
bounded semaphore caps how many handlers can run concurrently; the N+1th
|
||||
inbound message queues at the semaphore (not at the reader) until a slot
|
||||
frees up.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
from collections.abc import Awaitable, Callable, Coroutine
|
||||
import contextlib
|
||||
from dataclasses import dataclass, field
|
||||
from enum import StrEnum
|
||||
import json
|
||||
import logging
|
||||
from typing import Any
|
||||
import struct
|
||||
from typing import Any, Protocol
|
||||
|
||||
import voluptuous as vol
|
||||
|
||||
@@ -36,6 +58,13 @@ Handler = Callable[[Any], Awaitable[Any]]
|
||||
|
||||
DEFAULT_MAX_INFLIGHT = 16
|
||||
|
||||
# Hard cap on a single frame's body. A length prefix larger than this aborts
|
||||
# the channel rather than letting a compromised sandbox allocate the host to
|
||||
# death (same hardening spirit as the auth key check).
|
||||
MAX_FRAME_SIZE = 16 * 1024 * 1024
|
||||
|
||||
_LENGTH_PREFIX = struct.Struct(">I")
|
||||
|
||||
|
||||
def _serialize_invalid(err: vol.Invalid) -> dict[str, Any]:
|
||||
"""Capture a ``vol.Invalid``'s message + path so the peer can rebuild it.
|
||||
@@ -66,6 +95,188 @@ def error_data_for(err: BaseException) -> dict[str, Any] | None:
|
||||
return None
|
||||
|
||||
|
||||
class FrameKind(StrEnum):
|
||||
"""Which of the three wire shapes a :class:`Frame` carries."""
|
||||
|
||||
CALL = "call"
|
||||
PUSH = "push"
|
||||
RESPONSE = "response"
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class Frame:
|
||||
"""Transport/codec-neutral representation of one wire message."""
|
||||
|
||||
kind: FrameKind
|
||||
id: int = 0
|
||||
type: str = ""
|
||||
payload: Any = None
|
||||
ok: bool = False
|
||||
result: Any = None
|
||||
error: str | None = None
|
||||
error_type: str | None = None
|
||||
error_data: dict[str, Any] | None = field(default=None)
|
||||
|
||||
@classmethod
|
||||
def call(cls, call_id: int, msg_type: str, payload: Any) -> Frame:
|
||||
"""Build a request frame that expects a reply."""
|
||||
return cls(FrameKind.CALL, id=call_id, type=msg_type, payload=payload)
|
||||
|
||||
@classmethod
|
||||
def push(cls, msg_type: str, payload: Any) -> Frame:
|
||||
"""Build a one-way push frame."""
|
||||
return cls(FrameKind.PUSH, id=0, type=msg_type, payload=payload)
|
||||
|
||||
@classmethod
|
||||
def ok_response(cls, call_id: int, result: Any) -> Frame:
|
||||
"""Build a success response frame."""
|
||||
return cls(FrameKind.RESPONSE, id=call_id, ok=True, result=result)
|
||||
|
||||
@classmethod
|
||||
def error_response(
|
||||
cls,
|
||||
call_id: int,
|
||||
error: str,
|
||||
error_type: str | None,
|
||||
error_data: dict[str, Any] | None = None,
|
||||
) -> Frame:
|
||||
"""Build a failure response frame."""
|
||||
return cls(
|
||||
FrameKind.RESPONSE,
|
||||
id=call_id,
|
||||
ok=False,
|
||||
error=error,
|
||||
error_type=error_type,
|
||||
error_data=error_data,
|
||||
)
|
||||
|
||||
|
||||
class Codec(Protocol):
|
||||
"""Serialises a :class:`Frame` to bytes and back."""
|
||||
|
||||
def encode(self, frame: Frame) -> bytes:
|
||||
"""Return the wire bytes for ``frame``."""
|
||||
|
||||
def decode(self, data: bytes) -> Frame:
|
||||
"""Rebuild a :class:`Frame` from wire bytes."""
|
||||
|
||||
|
||||
class JsonCodec:
|
||||
"""One-JSON-object-per-frame codec.
|
||||
|
||||
Line-compatible with the original wire shape (sans the trailing
|
||||
newline, which the length prefix replaces). Kept as the default for
|
||||
tests and debugging; production rides :class:`ProtobufCodec`.
|
||||
"""
|
||||
|
||||
def encode(self, frame: Frame) -> bytes:
|
||||
"""Encode a frame to a compact JSON object."""
|
||||
message: dict[str, Any]
|
||||
if frame.kind is FrameKind.CALL:
|
||||
message = {"id": frame.id, "type": frame.type, "payload": frame.payload}
|
||||
elif frame.kind is FrameKind.PUSH:
|
||||
message = {"type": frame.type, "payload": frame.payload}
|
||||
elif frame.ok:
|
||||
message = {"id": frame.id, "ok": True, "result": frame.result}
|
||||
else:
|
||||
message = {
|
||||
"id": frame.id,
|
||||
"ok": False,
|
||||
"error": frame.error,
|
||||
"error_type": frame.error_type,
|
||||
}
|
||||
if frame.error_data is not None:
|
||||
message["error_data"] = frame.error_data
|
||||
return json.dumps(message, separators=(",", ":")).encode("utf-8")
|
||||
|
||||
def decode(self, data: bytes) -> Frame:
|
||||
"""Decode a JSON object into a frame, inferring the kind from keys."""
|
||||
message = json.loads(data)
|
||||
has_id = "id" in message
|
||||
has_type = "type" in message
|
||||
if has_id and not has_type:
|
||||
# Response to a call we sent out.
|
||||
if message.get("ok"):
|
||||
return Frame.ok_response(message["id"], message.get("result"))
|
||||
return Frame.error_response(
|
||||
message["id"],
|
||||
message.get("error", "unknown error"),
|
||||
message.get("error_type"),
|
||||
message.get("error_data"),
|
||||
)
|
||||
if not has_id:
|
||||
return Frame.push(message.get("type", ""), message.get("payload"))
|
||||
return Frame.call(message["id"], message["type"], message.get("payload"))
|
||||
|
||||
|
||||
class Transport(Protocol):
|
||||
"""Moves whole frame blobs over some byte channel."""
|
||||
|
||||
async def read_frame(self) -> bytes | None:
|
||||
"""Return the next frame's bytes, or ``None`` at end-of-stream."""
|
||||
|
||||
async def write_frame(self, data: bytes) -> None:
|
||||
"""Write one frame's bytes."""
|
||||
|
||||
def close(self) -> None:
|
||||
"""Begin closing the underlying channel."""
|
||||
|
||||
async def wait_closed(self) -> None:
|
||||
"""Wait for the underlying channel to finish closing."""
|
||||
|
||||
|
||||
class FrameTooLargeError(Exception):
|
||||
"""A peer announced a frame larger than :data:`MAX_FRAME_SIZE`."""
|
||||
|
||||
|
||||
class StreamTransport:
|
||||
"""Length-prefixed framing over a reader/writer pair.
|
||||
|
||||
Each frame is a 4-byte big-endian length followed by exactly that many
|
||||
body bytes. Used for stdio and unix-socket connections — anywhere the
|
||||
byte channel is an :class:`asyncio.StreamReader` /
|
||||
:class:`asyncio.StreamWriter` pair.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
reader: asyncio.StreamReader,
|
||||
writer: asyncio.StreamWriter,
|
||||
) -> None:
|
||||
"""Wrap a reader/writer pair with length-prefixed framing."""
|
||||
self._reader = reader
|
||||
self._writer = writer
|
||||
|
||||
async def read_frame(self) -> bytes | None:
|
||||
"""Read one length-prefixed frame, or ``None`` at clean EOF."""
|
||||
try:
|
||||
header = await self._reader.readexactly(_LENGTH_PREFIX.size)
|
||||
except asyncio.IncompleteReadError:
|
||||
return None
|
||||
(length,) = _LENGTH_PREFIX.unpack(header)
|
||||
if length > MAX_FRAME_SIZE:
|
||||
raise FrameTooLargeError(
|
||||
f"frame length {length} exceeds cap {MAX_FRAME_SIZE}"
|
||||
)
|
||||
try:
|
||||
return await self._reader.readexactly(length)
|
||||
except asyncio.IncompleteReadError:
|
||||
return None
|
||||
|
||||
async def write_frame(self, data: bytes) -> None:
|
||||
"""Write one length-prefixed frame and flush it."""
|
||||
self._writer.write(_LENGTH_PREFIX.pack(len(data)) + data)
|
||||
await self._writer.drain()
|
||||
|
||||
def close(self) -> None:
|
||||
"""Close the writer side of the connection."""
|
||||
self._writer.close()
|
||||
|
||||
async def wait_closed(self) -> None:
|
||||
"""Wait for the writer to finish closing."""
|
||||
await self._writer.wait_closed()
|
||||
|
||||
|
||||
class ChannelClosedError(Exception):
|
||||
"""Raised when an operation is attempted on a closed channel."""
|
||||
|
||||
@@ -91,26 +302,37 @@ class ChannelRemoteError(Exception):
|
||||
|
||||
|
||||
class Channel:
|
||||
"""One bidirectional request/response channel over a line-oriented stream."""
|
||||
"""One bidirectional request/response channel over a transport + codec."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
reader: asyncio.StreamReader,
|
||||
writer: asyncio.StreamWriter,
|
||||
reader: asyncio.StreamReader | None = None,
|
||||
writer: asyncio.StreamWriter | None = None,
|
||||
*,
|
||||
transport: Transport | None = None,
|
||||
codec: Codec | None = None,
|
||||
name: str = "channel",
|
||||
max_inflight: int = DEFAULT_MAX_INFLIGHT,
|
||||
) -> None:
|
||||
"""Wrap a reader/writer pair into a request/response channel.
|
||||
"""Wrap a reader/writer pair (or a transport) into a channel.
|
||||
|
||||
``max_inflight`` bounds how many handler tasks may run at once.
|
||||
Once the cap is reached, the read loop keeps draining the wire
|
||||
but newly-spawned handlers wait on the semaphore until a slot
|
||||
frees up — so a misbehaving integration can't starve the reader
|
||||
by fanning out unbounded inbound work.
|
||||
The common case passes a ``reader``/``writer`` pair, framed with
|
||||
:class:`StreamTransport` (length-prefixed). To run over a non-stream
|
||||
transport (e.g. websockets), pass ``transport=`` instead — see
|
||||
:meth:`from_transport`.
|
||||
|
||||
``codec`` defaults to :class:`JsonCodec`. ``max_inflight`` bounds how
|
||||
many handler tasks may run at once. Once the cap is reached, the read
|
||||
loop keeps draining the wire but newly-spawned handlers wait on the
|
||||
semaphore until a slot frees up — so a misbehaving integration can't
|
||||
starve the reader by fanning out unbounded inbound work.
|
||||
"""
|
||||
self._reader = reader
|
||||
self._writer = writer
|
||||
if transport is None:
|
||||
if reader is None or writer is None:
|
||||
raise TypeError("Channel needs a reader/writer pair or a transport")
|
||||
transport = StreamTransport(reader, writer)
|
||||
self._transport: Transport = transport
|
||||
self._codec: Codec = codec if codec is not None else JsonCodec()
|
||||
self._name = name
|
||||
self._next_id = 1
|
||||
self._pending: dict[int, asyncio.Future[Any]] = {}
|
||||
@@ -121,6 +343,24 @@ class Channel:
|
||||
self._inflight: set[asyncio.Task[None]] = set()
|
||||
self._inflight_sem = asyncio.Semaphore(max_inflight)
|
||||
|
||||
@classmethod
|
||||
def from_transport(
|
||||
cls,
|
||||
transport: Transport,
|
||||
*,
|
||||
codec: Codec | None = None,
|
||||
name: str = "channel",
|
||||
max_inflight: int = DEFAULT_MAX_INFLIGHT,
|
||||
) -> Channel:
|
||||
"""Build a channel over an arbitrary :class:`Transport`.
|
||||
|
||||
This is the seam a future ``WebSocketTransport`` drops into — the
|
||||
dispatch core is identical regardless of how frames reach the wire.
|
||||
"""
|
||||
return cls(
|
||||
transport=transport, codec=codec, name=name, max_inflight=max_inflight
|
||||
)
|
||||
|
||||
@property
|
||||
def closed(self) -> bool:
|
||||
"""Return True once the channel has been closed."""
|
||||
@@ -154,7 +394,7 @@ class Channel:
|
||||
future: asyncio.Future[Any] = asyncio.get_running_loop().create_future()
|
||||
self._pending[call_id] = future
|
||||
try:
|
||||
await self._write({"id": call_id, "type": msg_type, "payload": payload})
|
||||
await self._write(Frame.call(call_id, msg_type, payload))
|
||||
if timeout is None:
|
||||
return await future
|
||||
return await asyncio.wait_for(future, timeout=timeout)
|
||||
@@ -165,7 +405,7 @@ class Channel:
|
||||
"""Send a one-way push message; the remote does not reply."""
|
||||
if self._closed:
|
||||
raise ChannelClosedError(f"channel {self._name!r} is closed")
|
||||
await self._write({"type": msg_type, "payload": payload})
|
||||
await self._write(Frame.push(msg_type, payload))
|
||||
|
||||
async def close(self) -> None:
|
||||
"""Close the channel and cancel any in-flight calls."""
|
||||
@@ -182,9 +422,9 @@ class Channel:
|
||||
for task in inflight:
|
||||
task.cancel()
|
||||
with contextlib.suppress(Exception):
|
||||
self._writer.close()
|
||||
self._transport.close()
|
||||
with contextlib.suppress(asyncio.CancelledError):
|
||||
await self._writer.wait_closed()
|
||||
await self._transport.wait_closed()
|
||||
if self._reader_task is not None:
|
||||
self._reader_task.cancel()
|
||||
with contextlib.suppress(asyncio.CancelledError, Exception):
|
||||
@@ -193,26 +433,31 @@ class Channel:
|
||||
if inflight:
|
||||
await asyncio.gather(*inflight, return_exceptions=True)
|
||||
|
||||
async def _write(self, message: dict[str, Any]) -> None:
|
||||
line = json.dumps(message, separators=(",", ":")).encode("utf-8") + b"\n"
|
||||
async def _write(self, frame: Frame) -> None:
|
||||
data = self._codec.encode(frame)
|
||||
async with self._write_lock:
|
||||
self._writer.write(line)
|
||||
await self._writer.drain()
|
||||
await self._transport.write_frame(data)
|
||||
|
||||
async def _read_loop(self) -> None:
|
||||
try:
|
||||
while True:
|
||||
line = await self._reader.readline()
|
||||
if not line:
|
||||
try:
|
||||
data = await self._transport.read_frame()
|
||||
except FrameTooLargeError as err:
|
||||
_LOGGER.error("Channel %s: %s; aborting channel", self._name, err)
|
||||
return
|
||||
if data is None:
|
||||
return
|
||||
try:
|
||||
message = json.loads(line)
|
||||
except json.JSONDecodeError:
|
||||
frame = self._codec.decode(data)
|
||||
except Exception: # noqa: BLE001
|
||||
_LOGGER.warning(
|
||||
"Channel %s: dropping malformed line %r", self._name, line
|
||||
"Channel %s: dropping undecodable frame (%d bytes)",
|
||||
self._name,
|
||||
len(data),
|
||||
)
|
||||
continue
|
||||
self._dispatch(message)
|
||||
self._dispatch(frame)
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
except Exception:
|
||||
@@ -230,56 +475,53 @@ class Channel:
|
||||
for task in list(self._inflight):
|
||||
task.cancel()
|
||||
|
||||
def _dispatch(self, message: dict[str, Any]) -> None:
|
||||
"""Route an inbound message; non-blocking — handlers run in tasks."""
|
||||
if "id" in message and "type" not in message:
|
||||
def _dispatch(self, frame: Frame) -> None:
|
||||
"""Route an inbound frame; non-blocking — handlers run in tasks."""
|
||||
if frame.kind is FrameKind.RESPONSE:
|
||||
# Response to a call we sent out — set the future inline; no I/O.
|
||||
call_id = message["id"]
|
||||
future = self._pending.get(call_id)
|
||||
future = self._pending.get(frame.id)
|
||||
if future is None or future.done():
|
||||
return
|
||||
if message.get("ok"):
|
||||
future.set_result(message.get("result"))
|
||||
if frame.ok:
|
||||
future.set_result(frame.result)
|
||||
else:
|
||||
future.set_exception(
|
||||
ChannelRemoteError(
|
||||
message.get("error", "unknown error"),
|
||||
message.get("error_type"),
|
||||
message.get("error_data"),
|
||||
frame.error or "unknown error",
|
||||
frame.error_type,
|
||||
frame.error_data,
|
||||
)
|
||||
)
|
||||
return
|
||||
|
||||
msg_type = message.get("type")
|
||||
if msg_type is None:
|
||||
return
|
||||
handler = self._handlers.get(msg_type)
|
||||
payload = message.get("payload")
|
||||
handler = self._handlers.get(frame.type)
|
||||
|
||||
if "id" not in message:
|
||||
if frame.kind is FrameKind.PUSH:
|
||||
# One-way push. Dispatch in a task so a slow push handler
|
||||
# cannot block the reader from draining the next message.
|
||||
if handler is not None:
|
||||
self._spawn_handler(self._run_push_handler(msg_type, handler, payload))
|
||||
self._spawn_handler(
|
||||
self._run_push_handler(frame.type, handler, frame.payload)
|
||||
)
|
||||
return
|
||||
|
||||
call_id = message["id"]
|
||||
if handler is None:
|
||||
# No work to do — write the unknown-type error directly. Still
|
||||
# spawn it so a stalled writer cannot stall the reader.
|
||||
self._spawn_handler(
|
||||
self._write(
|
||||
{
|
||||
"id": call_id,
|
||||
"ok": False,
|
||||
"error": f"no handler for {msg_type!r}",
|
||||
"error_type": "ChannelUnknownType",
|
||||
}
|
||||
Frame.error_response(
|
||||
frame.id,
|
||||
f"no handler for {frame.type!r}",
|
||||
"ChannelUnknownType",
|
||||
)
|
||||
)
|
||||
)
|
||||
return
|
||||
|
||||
self._spawn_handler(self._run_call_handler(call_id, msg_type, handler, payload))
|
||||
self._spawn_handler(
|
||||
self._run_call_handler(frame.id, frame.type, handler, frame.payload)
|
||||
)
|
||||
|
||||
def _spawn_handler(self, coro: Coroutine[Any, Any, Any]) -> None:
|
||||
"""Start a handler task and track it for cancellation on close."""
|
||||
@@ -319,27 +561,32 @@ class Channel:
|
||||
except Exception as err: # noqa: BLE001
|
||||
if self._closed:
|
||||
return
|
||||
frame: dict[str, Any] = {
|
||||
"id": call_id,
|
||||
"ok": False,
|
||||
"error": str(err) or err.__class__.__name__,
|
||||
"error_type": err.__class__.__name__,
|
||||
}
|
||||
if (error_data := error_data_for(err)) is not None:
|
||||
frame["error_data"] = error_data
|
||||
frame = Frame.error_response(
|
||||
call_id,
|
||||
str(err) or err.__class__.__name__,
|
||||
err.__class__.__name__,
|
||||
error_data_for(err),
|
||||
)
|
||||
with contextlib.suppress(Exception):
|
||||
await self._write(frame)
|
||||
return
|
||||
if self._closed:
|
||||
return
|
||||
with contextlib.suppress(Exception):
|
||||
await self._write({"id": call_id, "ok": True, "result": result})
|
||||
await self._write(Frame.ok_response(call_id, result))
|
||||
|
||||
|
||||
__all__ = [
|
||||
"Channel",
|
||||
"ChannelClosedError",
|
||||
"ChannelRemoteError",
|
||||
"Codec",
|
||||
"Frame",
|
||||
"FrameKind",
|
||||
"FrameTooLargeError",
|
||||
"Handler",
|
||||
"JsonCodec",
|
||||
"StreamTransport",
|
||||
"Transport",
|
||||
"error_data_for",
|
||||
]
|
||||
|
||||
@@ -4,11 +4,11 @@ Phase 3 building block. The manager owns one supervised subprocess per
|
||||
sandbox group (``main`` / ``built-in`` / ``custom``); higher phases call
|
||||
:meth:`SandboxManager.ensure_started` lazily as config entries are routed.
|
||||
|
||||
The websocket protocol between manager and runtime is not yet implemented
|
||||
— Phase 4 plugs it in. For now the contract is just:
|
||||
The contract between manager and runtime is:
|
||||
|
||||
* the manager launches ``python -m hass_client.sandbox_v2``
|
||||
* the runtime prints :data:`READY_MARKER` to stdout once it is up
|
||||
* the runtime opens the control channel and sends a :data:`MSG_READY`
|
||||
frame as its first message once it is up (no stdout text marker)
|
||||
* on ``SIGTERM`` the runtime exits cleanly
|
||||
"""
|
||||
|
||||
@@ -24,14 +24,10 @@ import time
|
||||
from homeassistant.core import HomeAssistant
|
||||
|
||||
from .channel import Channel, ChannelClosedError, ChannelRemoteError
|
||||
from .protocol import MSG_SHUTDOWN
|
||||
from .protocol import MSG_READY, MSG_SHUTDOWN
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
# Stdout token the runtime prints once it is ready to take work. Kept in
|
||||
# sync with ``hass_client.sandbox.READY_MARKER``.
|
||||
READY_MARKER = "sandbox_v2:ready"
|
||||
|
||||
DEFAULT_RESTART_LIMIT = 3
|
||||
DEFAULT_RESTART_WINDOW = 60.0
|
||||
DEFAULT_RESTART_BACKOFF = 1.0
|
||||
@@ -85,10 +81,11 @@ class SandboxProcess:
|
||||
) -> None:
|
||||
"""Initialise a supervised sandbox subprocess.
|
||||
|
||||
``on_channel_ready`` is invoked with the live :class:`Channel` once
|
||||
the runtime has printed its ready marker. It runs synchronously on
|
||||
the manager's loop — register Phase 4 protocol handlers there
|
||||
before any caller can issue a call.
|
||||
``on_channel_ready`` is invoked with the live :class:`Channel` as
|
||||
soon as it is opened — before the runtime's :data:`MSG_READY`
|
||||
frame arrives — so its handlers are in place before the runtime's
|
||||
own warm-load round-trip lands. It runs synchronously on the
|
||||
manager's loop.
|
||||
|
||||
``on_shutdown_reply`` is invoked with the runtime's reply to
|
||||
:data:`MSG_SHUTDOWN` (Phase 9) so the caller can persist any
|
||||
@@ -323,7 +320,27 @@ class SandboxProcess:
|
||||
return
|
||||
|
||||
self._process = proc
|
||||
ready_task = asyncio.create_task(self._await_ready(proc))
|
||||
# Open the channel up front — stdout carries nothing but frames now.
|
||||
# Handlers go on before the reader starts so the runtime's warm-load
|
||||
# round-trip (and any early push) is never dropped.
|
||||
channel = self._open_channel(proc)
|
||||
self._channel = channel
|
||||
ready_frame = asyncio.Event()
|
||||
|
||||
async def _on_ready(_payload: object) -> None:
|
||||
ready_frame.set()
|
||||
|
||||
channel.register(MSG_READY, _on_ready)
|
||||
if self._on_channel_ready is not None:
|
||||
try:
|
||||
self._on_channel_ready(self.group, channel)
|
||||
except Exception:
|
||||
_LOGGER.exception(
|
||||
"Sandbox %s on_channel_ready callback raised", self.group
|
||||
)
|
||||
channel.start()
|
||||
|
||||
ready_task = asyncio.create_task(ready_frame.wait())
|
||||
exit_task = asyncio.create_task(proc.wait())
|
||||
stderr_task = asyncio.create_task(self._drain_stream(proc.stderr, "stderr"))
|
||||
|
||||
@@ -332,21 +349,10 @@ class SandboxProcess:
|
||||
{ready_task, exit_task}, return_when=asyncio.FIRST_COMPLETED
|
||||
)
|
||||
if ready_task.done() and not ready_task.cancelled():
|
||||
if ready_task.exception() is None and ready_task.result():
|
||||
self._channel = self._open_channel(proc)
|
||||
if self._on_channel_ready is not None:
|
||||
try:
|
||||
self._on_channel_ready(self.group, self._channel)
|
||||
except Exception:
|
||||
_LOGGER.exception(
|
||||
"Sandbox %s on_channel_ready callback raised",
|
||||
self.group,
|
||||
)
|
||||
self._channel.start()
|
||||
self._state = "running"
|
||||
self._ready.set()
|
||||
# Hold here until the process exits.
|
||||
await exit_task
|
||||
self._state = "running"
|
||||
self._ready.set()
|
||||
# Hold here until the process exits.
|
||||
await exit_task
|
||||
finally:
|
||||
for task in (ready_task, exit_task):
|
||||
if not task.done():
|
||||
@@ -366,30 +372,14 @@ class SandboxProcess:
|
||||
def _open_channel(self, proc: asyncio.subprocess.Process) -> Channel:
|
||||
"""Wrap the subprocess pipes in a :class:`Channel`.
|
||||
|
||||
Stdout is post-marker — the rest is JSON-line protocol. Stdin is
|
||||
always JSON-line.
|
||||
``proc.stdout`` is a :class:`~asyncio.StreamReader`; ``proc.stdin``
|
||||
is a :class:`~asyncio.StreamWriter`. Both carry length-prefixed
|
||||
channel frames end-to-end — there is no text preamble.
|
||||
"""
|
||||
assert proc.stdout is not None
|
||||
assert proc.stdin is not None
|
||||
# proc.stdin is a StreamWriter; proc.stdout is a StreamReader. They
|
||||
# are exactly what Channel needs.
|
||||
return Channel(proc.stdout, proc.stdin, name=self.group)
|
||||
|
||||
async def _await_ready(self, proc: asyncio.subprocess.Process) -> bool:
|
||||
"""Read stdout until the ready marker arrives or stdout closes."""
|
||||
stream = proc.stdout
|
||||
if stream is None:
|
||||
return False
|
||||
while True:
|
||||
line = await stream.readline()
|
||||
if not line:
|
||||
return False
|
||||
text = line.decode("utf-8", errors="replace").rstrip()
|
||||
if text:
|
||||
_LOGGER.debug("sandbox %s: %s", self.group, text)
|
||||
if READY_MARKER in text:
|
||||
return True
|
||||
|
||||
async def _drain_stream(
|
||||
self, stream: asyncio.StreamReader | None, name: str
|
||||
) -> None:
|
||||
@@ -556,7 +546,6 @@ class SandboxManager:
|
||||
|
||||
|
||||
__all__ = [
|
||||
"READY_MARKER",
|
||||
"CommandFactory",
|
||||
"SandboxConfig",
|
||||
"SandboxFailedError",
|
||||
|
||||
@@ -66,6 +66,12 @@ Main → Sandbox shutdown (Phase 9):
|
||||
|
||||
from typing import Final
|
||||
|
||||
# Handshake (Sandbox → Main): the runtime's first frame on the channel.
|
||||
# Replaces the old ``sandbox_v2:ready`` stdout text marker — the manager
|
||||
# registers a handler for this push and treats its arrival as "running",
|
||||
# so stdout carries nothing but channel frames.
|
||||
MSG_READY: Final = "sandbox_v2/ready"
|
||||
|
||||
# Main → Sandbox
|
||||
MSG_ENTRY_SETUP: Final = "sandbox_v2/entry_setup"
|
||||
MSG_ENTRY_UNLOAD: Final = "sandbox_v2/entry_unload"
|
||||
@@ -89,6 +95,7 @@ __all__ = [
|
||||
"MSG_ENTRY_SETUP",
|
||||
"MSG_ENTRY_UNLOAD",
|
||||
"MSG_FIRE_EVENT",
|
||||
"MSG_READY",
|
||||
"MSG_REGISTER_ENTITY",
|
||||
"MSG_REGISTER_SERVICE",
|
||||
"MSG_SHUTDOWN",
|
||||
|
||||
@@ -4,21 +4,26 @@ Kept as a stand-alone module to honour the project boundary: the HA Core
|
||||
integration must not import from ``hass_client`` at integration-load time,
|
||||
and ``hass_client`` does not pull from ``homeassistant.components.*``. The
|
||||
two files speak the same wire format — see the docstring on the HA side
|
||||
for the message schema.
|
||||
for the layering (Channel / Codec / Transport) and the :class:`Frame`
|
||||
shape.
|
||||
|
||||
Inbound calls and pushes are dispatched in their own tasks so a handler that
|
||||
itself issues :meth:`Channel.call` does not block the reader — the reply for
|
||||
the nested call has to come back through the same reader. A bounded
|
||||
semaphore caps how many handlers can run concurrently; the N+1th inbound
|
||||
message queues at the semaphore (not at the reader) until a slot frees up.
|
||||
Inbound calls and pushes are dispatched in their own tasks so a handler
|
||||
that itself issues :meth:`Channel.call` does not block the reader — the
|
||||
reply for the nested call has to come back through the same reader. A
|
||||
bounded semaphore caps how many handlers can run concurrently; the N+1th
|
||||
inbound message queues at the semaphore (not at the reader) until a slot
|
||||
frees up.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
from collections.abc import Awaitable, Callable, Coroutine
|
||||
import contextlib
|
||||
from dataclasses import dataclass, field
|
||||
from enum import StrEnum
|
||||
import json
|
||||
import logging
|
||||
from typing import Any
|
||||
import struct
|
||||
from typing import Any, Protocol
|
||||
|
||||
import voluptuous as vol
|
||||
|
||||
@@ -28,6 +33,13 @@ Handler = Callable[[Any], Awaitable[Any]]
|
||||
|
||||
DEFAULT_MAX_INFLIGHT = 16
|
||||
|
||||
# Hard cap on a single frame's body. A length prefix larger than this aborts
|
||||
# the channel rather than letting a compromised peer allocate the process to
|
||||
# death.
|
||||
MAX_FRAME_SIZE = 16 * 1024 * 1024
|
||||
|
||||
_LENGTH_PREFIX = struct.Struct(">I")
|
||||
|
||||
|
||||
def _serialize_invalid(err: vol.Invalid) -> dict[str, Any]:
|
||||
"""Capture a ``vol.Invalid``'s message + path so main can rebuild it.
|
||||
@@ -58,6 +70,186 @@ def error_data_for(err: BaseException) -> dict[str, Any] | None:
|
||||
return None
|
||||
|
||||
|
||||
class FrameKind(StrEnum):
|
||||
"""Which of the three wire shapes a :class:`Frame` carries."""
|
||||
|
||||
CALL = "call"
|
||||
PUSH = "push"
|
||||
RESPONSE = "response"
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class Frame:
|
||||
"""Transport/codec-neutral representation of one wire message."""
|
||||
|
||||
kind: FrameKind
|
||||
id: int = 0
|
||||
type: str = ""
|
||||
payload: Any = None
|
||||
ok: bool = False
|
||||
result: Any = None
|
||||
error: str | None = None
|
||||
error_type: str | None = None
|
||||
error_data: dict[str, Any] | None = field(default=None)
|
||||
|
||||
@classmethod
|
||||
def call(cls, call_id: int, msg_type: str, payload: Any) -> Frame:
|
||||
"""Build a request frame that expects a reply."""
|
||||
return cls(FrameKind.CALL, id=call_id, type=msg_type, payload=payload)
|
||||
|
||||
@classmethod
|
||||
def push(cls, msg_type: str, payload: Any) -> Frame:
|
||||
"""Build a one-way push frame."""
|
||||
return cls(FrameKind.PUSH, id=0, type=msg_type, payload=payload)
|
||||
|
||||
@classmethod
|
||||
def ok_response(cls, call_id: int, result: Any) -> Frame:
|
||||
"""Build a success response frame."""
|
||||
return cls(FrameKind.RESPONSE, id=call_id, ok=True, result=result)
|
||||
|
||||
@classmethod
|
||||
def error_response(
|
||||
cls,
|
||||
call_id: int,
|
||||
error: str,
|
||||
error_type: str | None,
|
||||
error_data: dict[str, Any] | None = None,
|
||||
) -> Frame:
|
||||
"""Build a failure response frame."""
|
||||
return cls(
|
||||
FrameKind.RESPONSE,
|
||||
id=call_id,
|
||||
ok=False,
|
||||
error=error,
|
||||
error_type=error_type,
|
||||
error_data=error_data,
|
||||
)
|
||||
|
||||
|
||||
class Codec(Protocol):
|
||||
"""Serialises a :class:`Frame` to bytes and back."""
|
||||
|
||||
def encode(self, frame: Frame) -> bytes:
|
||||
"""Return the wire bytes for ``frame``."""
|
||||
|
||||
def decode(self, data: bytes) -> Frame:
|
||||
"""Rebuild a :class:`Frame` from wire bytes."""
|
||||
|
||||
|
||||
class JsonCodec:
|
||||
"""One-JSON-object-per-frame codec.
|
||||
|
||||
Line-compatible with the original wire shape (sans the trailing
|
||||
newline, which the length prefix replaces). Kept as the default for
|
||||
tests and debugging; production rides :class:`ProtobufCodec`.
|
||||
"""
|
||||
|
||||
def encode(self, frame: Frame) -> bytes:
|
||||
"""Encode a frame to a compact JSON object."""
|
||||
message: dict[str, Any]
|
||||
if frame.kind is FrameKind.CALL:
|
||||
message = {"id": frame.id, "type": frame.type, "payload": frame.payload}
|
||||
elif frame.kind is FrameKind.PUSH:
|
||||
message = {"type": frame.type, "payload": frame.payload}
|
||||
elif frame.ok:
|
||||
message = {"id": frame.id, "ok": True, "result": frame.result}
|
||||
else:
|
||||
message = {
|
||||
"id": frame.id,
|
||||
"ok": False,
|
||||
"error": frame.error,
|
||||
"error_type": frame.error_type,
|
||||
}
|
||||
if frame.error_data is not None:
|
||||
message["error_data"] = frame.error_data
|
||||
return json.dumps(message, separators=(",", ":")).encode("utf-8")
|
||||
|
||||
def decode(self, data: bytes) -> Frame:
|
||||
"""Decode a JSON object into a frame, inferring the kind from keys."""
|
||||
message = json.loads(data)
|
||||
has_id = "id" in message
|
||||
has_type = "type" in message
|
||||
if has_id and not has_type:
|
||||
# Response to a call we sent out.
|
||||
if message.get("ok"):
|
||||
return Frame.ok_response(message["id"], message.get("result"))
|
||||
return Frame.error_response(
|
||||
message["id"],
|
||||
message.get("error", "unknown error"),
|
||||
message.get("error_type"),
|
||||
message.get("error_data"),
|
||||
)
|
||||
if not has_id:
|
||||
return Frame.push(message.get("type", ""), message.get("payload"))
|
||||
return Frame.call(message["id"], message["type"], message.get("payload"))
|
||||
|
||||
|
||||
class Transport(Protocol):
|
||||
"""Moves whole frame blobs over some byte channel."""
|
||||
|
||||
async def read_frame(self) -> bytes | None:
|
||||
"""Return the next frame's bytes, or ``None`` at end-of-stream."""
|
||||
|
||||
async def write_frame(self, data: bytes) -> None:
|
||||
"""Write one frame's bytes."""
|
||||
|
||||
def close(self) -> None:
|
||||
"""Begin closing the underlying channel."""
|
||||
|
||||
async def wait_closed(self) -> None:
|
||||
"""Wait for the underlying channel to finish closing."""
|
||||
|
||||
|
||||
class FrameTooLargeError(Exception):
|
||||
"""A peer announced a frame larger than :data:`MAX_FRAME_SIZE`."""
|
||||
|
||||
|
||||
class StreamTransport:
|
||||
"""Length-prefixed framing over a reader/writer pair.
|
||||
|
||||
Each frame is a 4-byte big-endian length followed by exactly that many
|
||||
body bytes. Used for stdio and unix-socket connections.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
reader: asyncio.StreamReader,
|
||||
writer: asyncio.StreamWriter,
|
||||
) -> None:
|
||||
"""Wrap a reader/writer pair with length-prefixed framing."""
|
||||
self._reader = reader
|
||||
self._writer = writer
|
||||
|
||||
async def read_frame(self) -> bytes | None:
|
||||
"""Read one length-prefixed frame, or ``None`` at clean EOF."""
|
||||
try:
|
||||
header = await self._reader.readexactly(_LENGTH_PREFIX.size)
|
||||
except asyncio.IncompleteReadError:
|
||||
return None
|
||||
(length,) = _LENGTH_PREFIX.unpack(header)
|
||||
if length > MAX_FRAME_SIZE:
|
||||
raise FrameTooLargeError(
|
||||
f"frame length {length} exceeds cap {MAX_FRAME_SIZE}"
|
||||
)
|
||||
try:
|
||||
return await self._reader.readexactly(length)
|
||||
except asyncio.IncompleteReadError:
|
||||
return None
|
||||
|
||||
async def write_frame(self, data: bytes) -> None:
|
||||
"""Write one length-prefixed frame and flush it."""
|
||||
self._writer.write(_LENGTH_PREFIX.pack(len(data)) + data)
|
||||
await self._writer.drain()
|
||||
|
||||
def close(self) -> None:
|
||||
"""Close the writer side of the connection."""
|
||||
self._writer.close()
|
||||
|
||||
async def wait_closed(self) -> None:
|
||||
"""Wait for the writer to finish closing."""
|
||||
await self._writer.wait_closed()
|
||||
|
||||
|
||||
class ChannelClosedError(Exception):
|
||||
"""Raised when an operation is attempted on a closed channel."""
|
||||
|
||||
@@ -83,26 +275,37 @@ class ChannelRemoteError(Exception):
|
||||
|
||||
|
||||
class Channel:
|
||||
"""One bidirectional request/response channel over a line-oriented stream."""
|
||||
"""One bidirectional request/response channel over a transport + codec."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
reader: asyncio.StreamReader,
|
||||
writer: asyncio.StreamWriter,
|
||||
reader: asyncio.StreamReader | None = None,
|
||||
writer: asyncio.StreamWriter | None = None,
|
||||
*,
|
||||
transport: Transport | None = None,
|
||||
codec: Codec | None = None,
|
||||
name: str = "channel",
|
||||
max_inflight: int = DEFAULT_MAX_INFLIGHT,
|
||||
) -> None:
|
||||
"""Wrap a reader/writer pair into a request/response channel.
|
||||
"""Wrap a reader/writer pair (or a transport) into a channel.
|
||||
|
||||
``max_inflight`` bounds how many handler tasks may run at once.
|
||||
Once the cap is reached, the read loop keeps draining the wire
|
||||
but newly-spawned handlers wait on the semaphore until a slot
|
||||
frees up — so a misbehaving integration can't starve the reader
|
||||
by fanning out unbounded inbound work.
|
||||
The common case passes a ``reader``/``writer`` pair, framed with
|
||||
:class:`StreamTransport` (length-prefixed). To run over a non-stream
|
||||
transport (e.g. websockets), pass ``transport=`` instead — see
|
||||
:meth:`from_transport`.
|
||||
|
||||
``codec`` defaults to :class:`JsonCodec`. ``max_inflight`` bounds how
|
||||
many handler tasks may run at once. Once the cap is reached, the read
|
||||
loop keeps draining the wire but newly-spawned handlers wait on the
|
||||
semaphore until a slot frees up — so a misbehaving integration can't
|
||||
starve the reader by fanning out unbounded inbound work.
|
||||
"""
|
||||
self._reader = reader
|
||||
self._writer = writer
|
||||
if transport is None:
|
||||
if reader is None or writer is None:
|
||||
raise TypeError("Channel needs a reader/writer pair or a transport")
|
||||
transport = StreamTransport(reader, writer)
|
||||
self._transport: Transport = transport
|
||||
self._codec: Codec = codec if codec is not None else JsonCodec()
|
||||
self._name = name
|
||||
self._next_id = 1
|
||||
self._pending: dict[int, asyncio.Future[Any]] = {}
|
||||
@@ -113,6 +316,24 @@ class Channel:
|
||||
self._inflight: set[asyncio.Task[None]] = set()
|
||||
self._inflight_sem = asyncio.Semaphore(max_inflight)
|
||||
|
||||
@classmethod
|
||||
def from_transport(
|
||||
cls,
|
||||
transport: Transport,
|
||||
*,
|
||||
codec: Codec | None = None,
|
||||
name: str = "channel",
|
||||
max_inflight: int = DEFAULT_MAX_INFLIGHT,
|
||||
) -> Channel:
|
||||
"""Build a channel over an arbitrary :class:`Transport`.
|
||||
|
||||
This is the seam a future ``WebSocketTransport`` drops into — the
|
||||
dispatch core is identical regardless of how frames reach the wire.
|
||||
"""
|
||||
return cls(
|
||||
transport=transport, codec=codec, name=name, max_inflight=max_inflight
|
||||
)
|
||||
|
||||
@property
|
||||
def closed(self) -> bool:
|
||||
"""Return True once the channel has been closed."""
|
||||
@@ -141,7 +362,7 @@ class Channel:
|
||||
future: asyncio.Future[Any] = asyncio.get_running_loop().create_future()
|
||||
self._pending[call_id] = future
|
||||
try:
|
||||
await self._write({"id": call_id, "type": msg_type, "payload": payload})
|
||||
await self._write(Frame.call(call_id, msg_type, payload))
|
||||
if timeout is None:
|
||||
return await future
|
||||
return await asyncio.wait_for(future, timeout=timeout)
|
||||
@@ -152,7 +373,7 @@ class Channel:
|
||||
"""Send a one-way push message; the remote does not reply."""
|
||||
if self._closed:
|
||||
raise ChannelClosedError(f"channel {self._name!r} is closed")
|
||||
await self._write({"type": msg_type, "payload": payload})
|
||||
await self._write(Frame.push(msg_type, payload))
|
||||
|
||||
async def close(self) -> None:
|
||||
"""Close the channel and cancel any in-flight calls."""
|
||||
@@ -169,9 +390,9 @@ class Channel:
|
||||
for task in inflight:
|
||||
task.cancel()
|
||||
with contextlib.suppress(Exception):
|
||||
self._writer.close()
|
||||
self._transport.close()
|
||||
with contextlib.suppress(asyncio.CancelledError):
|
||||
await self._writer.wait_closed()
|
||||
await self._transport.wait_closed()
|
||||
if self._reader_task is not None:
|
||||
self._reader_task.cancel()
|
||||
with contextlib.suppress(asyncio.CancelledError, Exception):
|
||||
@@ -180,26 +401,33 @@ class Channel:
|
||||
if inflight:
|
||||
await asyncio.gather(*inflight, return_exceptions=True)
|
||||
|
||||
async def _write(self, message: dict[str, Any]) -> None:
|
||||
line = json.dumps(message, separators=(",", ":")).encode("utf-8") + b"\n"
|
||||
async def _write(self, frame: Frame) -> None:
|
||||
data = self._codec.encode(frame)
|
||||
async with self._write_lock:
|
||||
self._writer.write(line)
|
||||
await self._writer.drain()
|
||||
await self._transport.write_frame(data)
|
||||
|
||||
async def _read_loop(self) -> None:
|
||||
try:
|
||||
while True:
|
||||
line = await self._reader.readline()
|
||||
if not line:
|
||||
try:
|
||||
data = await self._transport.read_frame()
|
||||
except FrameTooLargeError as err:
|
||||
_LOGGER.error(
|
||||
"channel %s: %s; aborting channel", self._name, err
|
||||
)
|
||||
return
|
||||
if data is None:
|
||||
return
|
||||
try:
|
||||
message = json.loads(line)
|
||||
except json.JSONDecodeError:
|
||||
frame = self._codec.decode(data)
|
||||
except Exception: # noqa: BLE001
|
||||
_LOGGER.warning(
|
||||
"channel %s: dropping malformed line %r", self._name, line
|
||||
"channel %s: dropping undecodable frame (%d bytes)",
|
||||
self._name,
|
||||
len(data),
|
||||
)
|
||||
continue
|
||||
self._dispatch(message)
|
||||
self._dispatch(frame)
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
except Exception:
|
||||
@@ -218,54 +446,47 @@ class Channel:
|
||||
for task in list(self._inflight):
|
||||
task.cancel()
|
||||
|
||||
def _dispatch(self, message: dict[str, Any]) -> None:
|
||||
"""Route an inbound message; non-blocking — handlers run in tasks."""
|
||||
if "id" in message and "type" not in message:
|
||||
call_id = message["id"]
|
||||
future = self._pending.get(call_id)
|
||||
def _dispatch(self, frame: Frame) -> None:
|
||||
"""Route an inbound frame; non-blocking — handlers run in tasks."""
|
||||
if frame.kind is FrameKind.RESPONSE:
|
||||
future = self._pending.get(frame.id)
|
||||
if future is None or future.done():
|
||||
return
|
||||
if message.get("ok"):
|
||||
future.set_result(message.get("result"))
|
||||
if frame.ok:
|
||||
future.set_result(frame.result)
|
||||
else:
|
||||
future.set_exception(
|
||||
ChannelRemoteError(
|
||||
message.get("error", "unknown error"),
|
||||
message.get("error_type"),
|
||||
message.get("error_data"),
|
||||
frame.error or "unknown error",
|
||||
frame.error_type,
|
||||
frame.error_data,
|
||||
)
|
||||
)
|
||||
return
|
||||
|
||||
msg_type = message.get("type")
|
||||
if msg_type is None:
|
||||
return
|
||||
handler = self._handlers.get(msg_type)
|
||||
payload = message.get("payload")
|
||||
handler = self._handlers.get(frame.type)
|
||||
|
||||
if "id" not in message:
|
||||
if frame.kind is FrameKind.PUSH:
|
||||
if handler is not None:
|
||||
self._spawn_handler(
|
||||
self._run_push_handler(msg_type, handler, payload)
|
||||
self._run_push_handler(frame.type, handler, frame.payload)
|
||||
)
|
||||
return
|
||||
|
||||
call_id = message["id"]
|
||||
if handler is None:
|
||||
self._spawn_handler(
|
||||
self._write(
|
||||
{
|
||||
"id": call_id,
|
||||
"ok": False,
|
||||
"error": f"no handler for {msg_type!r}",
|
||||
"error_type": "ChannelUnknownType",
|
||||
}
|
||||
Frame.error_response(
|
||||
frame.id,
|
||||
f"no handler for {frame.type!r}",
|
||||
"ChannelUnknownType",
|
||||
)
|
||||
)
|
||||
)
|
||||
return
|
||||
|
||||
self._spawn_handler(
|
||||
self._run_call_handler(call_id, msg_type, handler, payload)
|
||||
self._run_call_handler(frame.id, frame.type, handler, frame.payload)
|
||||
)
|
||||
|
||||
def _spawn_handler(self, coro: Coroutine[Any, Any, Any]) -> None:
|
||||
@@ -308,29 +529,32 @@ class Channel:
|
||||
except Exception as err: # noqa: BLE001
|
||||
if self._closed:
|
||||
return
|
||||
frame: dict[str, Any] = {
|
||||
"id": call_id,
|
||||
"ok": False,
|
||||
"error": str(err) or err.__class__.__name__,
|
||||
"error_type": err.__class__.__name__,
|
||||
}
|
||||
if (error_data := error_data_for(err)) is not None:
|
||||
frame["error_data"] = error_data
|
||||
frame = Frame.error_response(
|
||||
call_id,
|
||||
str(err) or err.__class__.__name__,
|
||||
err.__class__.__name__,
|
||||
error_data_for(err),
|
||||
)
|
||||
with contextlib.suppress(Exception):
|
||||
await self._write(frame)
|
||||
return
|
||||
if self._closed:
|
||||
return
|
||||
with contextlib.suppress(Exception):
|
||||
await self._write(
|
||||
{"id": call_id, "ok": True, "result": result}
|
||||
)
|
||||
await self._write(Frame.ok_response(call_id, result))
|
||||
|
||||
|
||||
__all__ = [
|
||||
"Channel",
|
||||
"ChannelClosedError",
|
||||
"ChannelRemoteError",
|
||||
"Codec",
|
||||
"Frame",
|
||||
"FrameKind",
|
||||
"FrameTooLargeError",
|
||||
"Handler",
|
||||
"JsonCodec",
|
||||
"StreamTransport",
|
||||
"Transport",
|
||||
"error_data_for",
|
||||
]
|
||||
|
||||
@@ -9,6 +9,11 @@ for the message catalogue.
|
||||
|
||||
from typing import Final
|
||||
|
||||
# Handshake: the runtime's first frame on the channel. Replaces the old
|
||||
# stdout text marker — the manager waits for this push instead of scanning
|
||||
# stdout, so stdout carries nothing but channel frames.
|
||||
MSG_READY: Final = "sandbox_v2/ready"
|
||||
|
||||
# Main → Sandbox
|
||||
MSG_ENTRY_SETUP: Final = "sandbox_v2/entry_setup"
|
||||
MSG_ENTRY_UNLOAD: Final = "sandbox_v2/entry_unload"
|
||||
@@ -32,6 +37,7 @@ __all__ = [
|
||||
"MSG_ENTRY_SETUP",
|
||||
"MSG_ENTRY_UNLOAD",
|
||||
"MSG_FIRE_EVENT",
|
||||
"MSG_READY",
|
||||
"MSG_REGISTER_ENTITY",
|
||||
"MSG_REGISTER_SERVICE",
|
||||
"MSG_SHUTDOWN",
|
||||
|
||||
@@ -12,10 +12,10 @@ Composes the sandbox's per-process services:
|
||||
registrations and ``<owned_domain>_*`` events up to main, gated by
|
||||
:class:`ApprovedDomains` (Phase 6).
|
||||
|
||||
The handshake order is unchanged: write the ready marker, open the
|
||||
stdio channel, register handlers, then idle until SIGTERM (or until
|
||||
main asks for a graceful shutdown over the channel — see Phase 9's
|
||||
:meth:`SandboxRuntime._handle_shutdown`).
|
||||
The handshake: open the stdio channel, send a :data:`MSG_READY` frame
|
||||
as the first message, warm-load restore state, register handlers, then
|
||||
idle until SIGTERM (or until main asks for a graceful shutdown over the
|
||||
channel — see Phase 9's :meth:`SandboxRuntime._handle_shutdown`).
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
@@ -40,7 +40,7 @@ from .entity_bridge import EntityBridge
|
||||
from .entry_runner import EntryRunner
|
||||
from .event_mirror import EventMirror
|
||||
from .flow_runner import FlowRunner
|
||||
from .protocol import MSG_SHUTDOWN
|
||||
from .protocol import MSG_READY, MSG_SHUTDOWN
|
||||
from .sandbox_bridge import ChannelSandboxBridge
|
||||
from .service_mirror import ServiceMirror
|
||||
|
||||
@@ -48,18 +48,16 @@ _LOGGER = logging.getLogger(__name__)
|
||||
|
||||
ChannelFactory = Callable[[], Awaitable[Channel | None]]
|
||||
|
||||
# Stdout token the manager scans for to mark the runtime ready. Kept in
|
||||
# sync with ``homeassistant.components.sandbox_v2.manager.READY_MARKER``.
|
||||
READY_MARKER = "sandbox_v2:ready"
|
||||
|
||||
|
||||
class SandboxRuntime:
|
||||
"""Phase 4 runtime: stdout-marker handshake + JSON-line control channel.
|
||||
"""Runtime: Ready-frame handshake + length-prefixed control channel.
|
||||
|
||||
The websocket URL/token still come in on the CLI for forward-compat
|
||||
with Phase 7 (the scoped sandbox token will travel that path), but
|
||||
Phase 4 only uses the stdin/stdout control channel that the manager
|
||||
sets up after the ready marker.
|
||||
with the deferred WS transport (the scoped sandbox token will travel
|
||||
that path), but today the runtime only uses the stdin/stdout control
|
||||
channel that the manager opens. The handshake is a :data:`MSG_READY`
|
||||
frame sent as the channel's first message — there is no stdout text
|
||||
marker.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
@@ -151,12 +149,6 @@ class SandboxRuntime:
|
||||
self._service_mirror = ServiceMirror(hass, self._approved)
|
||||
self._event_mirror = EventMirror(hass, self._approved)
|
||||
|
||||
# The marker MUST be written before we hand stdout to asyncio — once
|
||||
# connect_write_pipe takes ownership of the FD, Python's stdout
|
||||
# buffer and the StreamWriter race for bytes on the wire.
|
||||
sys.stdout.write(f"{READY_MARKER}\n")
|
||||
sys.stdout.flush()
|
||||
|
||||
self._channel = await self._channel_factory()
|
||||
sandbox_token: Any = None
|
||||
if self._channel is not None:
|
||||
@@ -193,6 +185,11 @@ class SandboxRuntime:
|
||||
# cached. Handlers register *after* the warm-load so no
|
||||
# entry_setup can arrive before the cache is populated.
|
||||
self._channel.start()
|
||||
# Signal readiness as the channel's first outbound frame — the
|
||||
# manager flips to "running" on its arrival. Sent before the
|
||||
# warm-load so the handshake timing matches the old stdout
|
||||
# marker (which was written before warm-load too).
|
||||
await self._channel.push(MSG_READY)
|
||||
await _load_restore_state(hass)
|
||||
self._channel.register("sandbox_v2/ping", _handle_ping)
|
||||
self._channel.register(MSG_SHUTDOWN, self._handle_shutdown)
|
||||
@@ -371,4 +368,4 @@ async def _handle_ping(_payload: object) -> dict[str, str]:
|
||||
return {"pong": "sandbox_v2"}
|
||||
|
||||
|
||||
__all__ = ["READY_MARKER", "SandboxRuntime"]
|
||||
__all__ = ["SandboxRuntime"]
|
||||
|
||||
@@ -9,7 +9,8 @@ public contract: argparser, the constant the manager scans for, and that
|
||||
import asyncio
|
||||
|
||||
from hass_client.channel import Channel
|
||||
from hass_client.sandbox import READY_MARKER, SandboxRuntime
|
||||
from hass_client.protocol import MSG_READY
|
||||
from hass_client.sandbox import SandboxRuntime
|
||||
from hass_client.sandbox_v2.__main__ import _build_parser
|
||||
import pytest
|
||||
|
||||
@@ -19,9 +20,9 @@ async def _noop_channel_factory() -> Channel | None:
|
||||
return None
|
||||
|
||||
|
||||
def test_ready_marker_is_stable() -> None:
|
||||
"""The marker is part of the manager↔runtime protocol; do not rename."""
|
||||
assert READY_MARKER == "sandbox_v2:ready"
|
||||
def test_ready_msg_type_is_stable() -> None:
|
||||
"""The Ready frame type is part of the manager↔runtime protocol."""
|
||||
assert MSG_READY == "sandbox_v2/ready"
|
||||
|
||||
|
||||
def test_cli_parser_requires_name_url_and_token() -> None:
|
||||
@@ -103,5 +104,7 @@ async def test_runtime_shuts_down_on_request(
|
||||
exit_code = await asyncio.wait_for(task, timeout=2.0)
|
||||
assert exit_code == 0
|
||||
|
||||
captured = capsys.readouterr()
|
||||
assert READY_MARKER in captured.out
|
||||
# With the noop channel factory there is no channel, so no Ready frame
|
||||
# is sent and stdout stays clean (the handshake is a channel frame now,
|
||||
# not a stdout text marker). Drain captured output regardless.
|
||||
capsys.readouterr()
|
||||
|
||||
@@ -6,6 +6,7 @@ import pytest
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.components.sandbox_v2.channel import (
|
||||
Channel,
|
||||
ChannelClosedError,
|
||||
ChannelRemoteError,
|
||||
)
|
||||
@@ -13,6 +14,57 @@ from homeassistant.components.sandbox_v2.channel import (
|
||||
from ._helpers import make_channel_pair
|
||||
|
||||
|
||||
class _QueueTransport:
|
||||
"""In-memory :class:`Transport` backed by a pair of queues.
|
||||
|
||||
Stands in for a non-stream transport (the seam a future
|
||||
``WebSocketTransport`` uses) so :meth:`Channel.from_transport` is
|
||||
exercised without any reader/writer pipe.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self, inbox: asyncio.Queue[bytes | None], outbox: asyncio.Queue[bytes | None]
|
||||
) -> None:
|
||||
self._inbox = inbox
|
||||
self._outbox = outbox
|
||||
self._closed = False
|
||||
|
||||
async def read_frame(self) -> bytes | None:
|
||||
return await self._inbox.get()
|
||||
|
||||
async def write_frame(self, data: bytes) -> None:
|
||||
self._outbox.put_nowait(data)
|
||||
|
||||
def close(self) -> None:
|
||||
if not self._closed:
|
||||
self._closed = True
|
||||
self._inbox.put_nowait(None) # EOF sentinel for our reader
|
||||
|
||||
async def wait_closed(self) -> None:
|
||||
return None
|
||||
|
||||
|
||||
async def test_from_transport_round_trips() -> None:
|
||||
"""A channel built over an arbitrary Transport dispatches normally."""
|
||||
q1: asyncio.Queue[bytes | None] = asyncio.Queue()
|
||||
q2: asyncio.Queue[bytes | None] = asyncio.Queue()
|
||||
channel_a = Channel.from_transport(_QueueTransport(q1, q2), name="a")
|
||||
channel_b = Channel.from_transport(_QueueTransport(q2, q1), name="b")
|
||||
channel_a.start()
|
||||
channel_b.start()
|
||||
|
||||
async def echo(payload: object) -> dict[str, object]:
|
||||
return {"echoed": payload}
|
||||
|
||||
channel_b.register("test/echo", echo)
|
||||
try:
|
||||
result = await asyncio.wait_for(channel_a.call("test/echo", 7), timeout=2.0)
|
||||
assert result == {"echoed": 7}
|
||||
finally:
|
||||
await channel_a.close()
|
||||
await channel_b.close()
|
||||
|
||||
|
||||
@pytest.fixture(name="channels")
|
||||
async def _channels_fixture() -> tuple:
|
||||
"""Return a paired Channel + Channel, both started, both auto-cleaned."""
|
||||
|
||||
@@ -82,7 +82,7 @@ async def test_graceful_shutdown_falls_through_to_sigterm_on_timeout(
|
||||
) -> None:
|
||||
"""A sandbox that ignores ``sandbox_v2/shutdown`` is killed by ``stop()``.
|
||||
|
||||
The stub runtime here writes the ready marker, then idles forever
|
||||
The stub runtime here sends the Ready frame, then idles forever
|
||||
reading from stdin without ever replying. ``async_graceful_shutdown``
|
||||
must time out; the follow-up ``async_stop_all`` then escalates to
|
||||
SIGTERM / SIGKILL.
|
||||
@@ -93,9 +93,14 @@ async def test_graceful_shutdown_falls_through_to_sigterm_on_timeout(
|
||||
sys.executable,
|
||||
"-c",
|
||||
(
|
||||
"import sys, time;"
|
||||
"sys.stdout.write('sandbox_v2:ready\\n');"
|
||||
"sys.stdout.flush();"
|
||||
"import sys, time, struct, json;"
|
||||
# Length-prefixed Ready push frame — the manager's
|
||||
# StreamTransport reads this and flips to "running".
|
||||
"body = json.dumps("
|
||||
"{'type': 'sandbox_v2/ready', 'payload': None}"
|
||||
").encode();"
|
||||
"sys.stdout.buffer.write(struct.pack('>I', len(body)) + body);"
|
||||
"sys.stdout.buffer.flush();"
|
||||
# Just sleep — stdin is wired to the manager but we never read.
|
||||
"time.sleep(600)"
|
||||
),
|
||||
|
||||
Reference in New Issue
Block a user