Compare commits

...

16 Commits

Author SHA1 Message Date
J. Nick Koston
9847f1c9a4 Skip error log for negative return codes (killed by signal) 2026-03-20 15:51:45 -10:00
J. Nick Koston
0e3eb972cf Await cancelled stderr task in timeout path, fix test realism 2026-03-20 15:00:07 -10:00
J. Nick Koston
cd60e14813 Keep last 64 stderr lines instead of first 64 using deque 2026-03-20 14:51:04 -10:00
J. Nick Koston
61e4c5b8e1 Re-raise CancelledError, narrow write_eof suppress, extract stderr drain timeout 2026-03-20 14:49:41 -10:00
J. Nick Koston
adc11061fe Fix import ordering, move constant after imports 2026-03-20 14:46:16 -10:00
J. Nick Koston
0f4d94fb5b Simplify mock proc factory as side_effect 2026-03-20 14:45:22 -10:00
J. Nick Koston
d500aed92d Extract proc wait timeout to const, patch in test 2026-03-20 14:41:48 -10:00
J. Nick Koston
a628bc5f6f Address review: except Exception, proc.wait timeout, suppress write_eof errors 2026-03-20 14:40:48 -10:00
J. Nick Koston
cce20b1ec8 Fix tests: end-to-end mocked ffmpeg, no hass.data access, deterministic sync
- Remove direct hass.data access in tests
- Use asyncio.Event for deterministic process creation synchronization
- Fix unused variables and top-level imports
- Anchor sensitive param regex to avoid false positives
2026-03-20 14:35:23 -10:00
J. Nick Koston
83a49af083 Add tests for full coverage of ffmpeg_proxy.py 2026-03-20 14:26:15 -10:00
J. Nick Koston
d8da249ae0 Handle cancellation during cleanup, redact sensitive params in debug logs 2026-03-20 14:15:01 -10:00
J. Nick Koston
c24987bb3d Fix import ordering 2026-03-20 14:06:56 -10:00
J. Nick Koston
0b744932ec Use errors=replace for stderr decoding, await cancelled stderr task 2026-03-20 14:00:03 -10:00
J. Nick Koston
c615194dbe Use Python 3.14 except syntax without parentheses 2026-03-20 13:51:55 -10:00
J. Nick Koston
96c7bd4882 Address review feedback
- Wait for ffmpeg process exit before checking returncode
- Let stderr collector drain before logging
- Redact sensitive query params (authSig, token, etc.) from error logs
- Fix test to yield to event loop so stderr collector runs deterministically
- Add test for URL redaction
2026-03-20 13:47:10 -10:00
J. Nick Koston
afdc76646a Log ffmpeg conversion errors in ESPHome media proxy
When ffmpeg fails to convert audio (e.g., TLS errors, unreachable
URLs), the proxy silently serves a 0-byte response. This makes it
very difficult to diagnose issues like voice assistant TTS failures.

Check the ffmpeg process return code after completion and log
collected stderr output at error level when non-zero. Stderr
collection is capped at 64 lines to prevent unbounded memory usage.
2026-03-20 13:37:02 -10:00
2 changed files with 400 additions and 14 deletions

View File

@@ -1,10 +1,12 @@
"""HTTP view that converts audio from a URL to a preferred format."""
import asyncio
from collections import defaultdict
from collections import defaultdict, deque
import contextlib
from dataclasses import dataclass, field
from http import HTTPStatus
import logging
import re
import secrets
from typing import Final
@@ -22,6 +24,12 @@ from .const import DOMAIN
_LOGGER = logging.getLogger(__name__)
_MAX_CONVERSIONS_PER_DEVICE: Final[int] = 2
_MAX_STDERR_LINES: Final[int] = 64
_PROC_WAIT_TIMEOUT: Final[int] = 5
_STDERR_DRAIN_TIMEOUT: Final[int] = 1
_SENSITIVE_QUERY_PARAMS: Final[re.Pattern[str]] = re.compile(
r"(?<=[?&])(authSig|token|key|password|secret)=[^&\s]+", re.IGNORECASE
)
@callback
@@ -215,8 +223,10 @@ class FFmpegConvertResponse(web.StreamResponse):
assert proc.stdout is not None
assert proc.stderr is not None
stderr_lines: deque[str] = deque(maxlen=_MAX_STDERR_LINES)
stderr_task = self.hass.async_create_background_task(
self._dump_ffmpeg_stderr(proc), "ESPHome media proxy dump stderr"
self._collect_ffmpeg_stderr(proc, stderr_lines),
"ESPHome media proxy dump stderr",
)
try:
@@ -235,33 +245,80 @@ class FFmpegConvertResponse(web.StreamResponse):
if request.transport:
request.transport.abort()
raise # don't log error
except:
except Exception:
_LOGGER.exception("Unexpected error during ffmpeg conversion")
raise
finally:
# Allow conversion info to be removed
self.convert_info.is_finished = True
# stop dumping ffmpeg stderr task
stderr_task.cancel()
# Ensure subprocess and stderr cleanup run even if this task
# is cancelled (e.g., during shutdown)
try:
# Terminate hangs, so kill is used
if proc.returncode is None:
proc.kill()
# Terminate hangs, so kill is used
if proc.returncode is None:
proc.kill()
# Wait for process to exit so returncode is set
await asyncio.wait_for(proc.wait(), timeout=_PROC_WAIT_TIMEOUT)
# Let stderr collector finish draining
if not stderr_task.done():
try:
await asyncio.wait_for(
stderr_task, timeout=_STDERR_DRAIN_TIMEOUT
)
except TimeoutError:
stderr_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await stderr_task
except TimeoutError:
_LOGGER.warning(
"Timed out waiting for ffmpeg process to exit for device %s",
self.device_id,
)
stderr_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await stderr_task
except asyncio.CancelledError:
# Kill the process if we were interrupted
if proc.returncode is None:
proc.kill()
stderr_task.cancel()
raise
if proc.returncode is not None and proc.returncode > 0:
_LOGGER.error(
"FFmpeg conversion failed for device %s (return code %s):\n%s",
self.device_id,
proc.returncode,
"\n".join(
_SENSITIVE_QUERY_PARAMS.sub(r"\1=REDACTED", line)
for line in stderr_lines
),
)
# Close connection by writing EOF unless already closing
if request.transport and not request.transport.is_closing():
await writer.write_eof()
with contextlib.suppress(ConnectionResetError, RuntimeError, OSError):
await writer.write_eof()
async def _dump_ffmpeg_stderr(
async def _collect_ffmpeg_stderr(
self,
proc: asyncio.subprocess.Process,
stderr_lines: deque[str],
) -> None:
assert proc.stdout is not None
"""Collect stderr output from ffmpeg for error reporting."""
assert proc.stderr is not None
while self.hass.is_running and (chunk := await proc.stderr.readline()):
_LOGGER.debug("ffmpeg[%s] output: %s", proc.pid, chunk.decode().rstrip())
line = chunk.decode(errors="replace").rstrip()
stderr_lines.append(line)
_LOGGER.debug(
"ffmpeg[%s] output: %s",
proc.pid,
_SENSITIVE_QUERY_PARAMS.sub(r"\1=REDACTED", line),
)
class FFmpegProxyView(HomeAssistantView):

View File

@@ -1,11 +1,13 @@
"""Tests for ffmpeg proxy view."""
import asyncio
from collections.abc import Generator
from http import HTTPStatus
import io
import logging
import os
import tempfile
from unittest.mock import patch
from unittest.mock import AsyncMock, MagicMock, patch
from urllib.request import pathname2url
import wave
@@ -14,12 +16,17 @@ import mutagen
import pytest
from homeassistant.components import esphome
from homeassistant.components.esphome.ffmpeg_proxy import async_create_proxy_url
from homeassistant.components.esphome.ffmpeg_proxy import (
_MAX_STDERR_LINES,
async_create_proxy_url,
)
from homeassistant.core import HomeAssistant
from homeassistant.setup import async_setup_component
from tests.typing import ClientSessionGenerator
FFMPEG_PROXY = "homeassistant.components.esphome.ffmpeg_proxy"
@pytest.fixture(name="wav_file_length")
def wav_file_length_fixture() -> int:
@@ -119,6 +126,7 @@ async def test_proxy_view(
async def test_ffmpeg_file_doesnt_exist(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test ffmpeg conversion with a file that doesn't exist."""
device_id = "1234"
@@ -136,6 +144,327 @@ async def test_ffmpeg_file_doesnt_exist(
mp3_data = await req.content.read()
assert not mp3_data
# ffmpeg failure should be logged at error level
assert "FFmpeg conversion failed for device" in caplog.text
assert device_id in caplog.text
async def test_ffmpeg_error_stderr_truncated(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test that ffmpeg stderr output is truncated in error logs."""
device_id = "1234"
await async_setup_component(hass, esphome.DOMAIN, {esphome.DOMAIN: {}})
client = await hass_client()
total_lines = _MAX_STDERR_LINES + 50
stderr_lines_data = [f"stderr line {i}\n".encode() for i in range(total_lines)] + [
b""
]
async def _stdout_read(_size: int = -1) -> bytes:
"""Yield to event loop so stderr collector can run, then return EOF."""
await asyncio.sleep(0)
return b""
mock_proc = AsyncMock()
mock_proc.stdout.read = _stdout_read
mock_proc.stderr.readline = AsyncMock(side_effect=stderr_lines_data)
mock_proc.returncode = 1
with patch("asyncio.create_subprocess_exec", return_value=mock_proc):
url = async_create_proxy_url(hass, device_id, "dummy-input", media_format="mp3")
req = await client.get(url)
assert req.status == HTTPStatus.OK
await req.content.read()
# Should log an error with stderr content
assert "FFmpeg conversion failed for device" in caplog.text
# Find the error message to verify truncation.
# We can't just check caplog.text because lines beyond the limit
# are still present at debug level from _collect_ffmpeg_stderr.
error_message = next(
r.message
for r in caplog.records
if r.levelno >= logging.ERROR and "FFmpeg conversion failed" in r.message
)
total_lines = _MAX_STDERR_LINES + 50
# The last _MAX_STDERR_LINES lines should be present
for i in range(total_lines - _MAX_STDERR_LINES, total_lines):
assert f"stderr line {i}" in error_message
# Early lines that were evicted should not be in the error log
assert "stderr line 0" not in error_message
async def test_ffmpeg_error_redacts_sensitive_urls(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test that sensitive query params are redacted in error logs."""
device_id = "1234"
await async_setup_component(hass, esphome.DOMAIN, {esphome.DOMAIN: {}})
client = await hass_client()
sensitive_url = (
"https://example.com/api/tts?authSig=secret123&token=abc456&other=keep"
)
stderr_lines_data = [
f"Error opening input file {sensitive_url}\n".encode(),
b"",
]
async def _stdout_read(_size: int = -1) -> bytes:
await asyncio.sleep(0)
return b""
mock_proc = AsyncMock()
mock_proc.stdout.read = _stdout_read
mock_proc.stderr.readline = AsyncMock(side_effect=stderr_lines_data)
mock_proc.returncode = 1
with patch("asyncio.create_subprocess_exec", return_value=mock_proc):
url = async_create_proxy_url(hass, device_id, "dummy-input", media_format="mp3")
req = await client.get(url)
assert req.status == HTTPStatus.OK
await req.content.read()
error_message = next(
r.message
for r in caplog.records
if r.levelno >= logging.ERROR and "FFmpeg conversion failed" in r.message
)
assert "authSig=REDACTED" in error_message
assert "token=REDACTED" in error_message
assert "secret123" not in error_message
assert "abc456" not in error_message
assert "other=keep" in error_message
async def test_ffmpeg_stderr_drain_timeout(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test that stderr drain timeout is handled gracefully."""
device_id = "1234"
await async_setup_component(hass, esphome.DOMAIN, {esphome.DOMAIN: {}})
client = await hass_client()
never_finish: asyncio.Future[bytes] = asyncio.get_running_loop().create_future()
call_count = 0
async def _slow_stderr_readline() -> bytes:
nonlocal call_count
call_count += 1
if call_count == 1:
return b"first error line\n"
# Block forever on second call so the drain times out
return await never_finish
async def _stdout_read(_size: int = -1) -> bytes:
await asyncio.sleep(0)
return b""
mock_proc = AsyncMock()
mock_proc.stdout.read = _stdout_read
mock_proc.stderr.readline = _slow_stderr_readline
mock_proc.returncode = 1
with (
patch("asyncio.create_subprocess_exec", return_value=mock_proc),
patch(f"{FFMPEG_PROXY}._STDERR_DRAIN_TIMEOUT", 0),
):
url = async_create_proxy_url(hass, device_id, "dummy-input", media_format="mp3")
req = await client.get(url)
assert req.status == HTTPStatus.OK
await req.content.read()
assert "FFmpeg conversion failed for device" in caplog.text
assert "first error line" in caplog.text
async def test_ffmpeg_proc_wait_timeout(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test that proc.wait() timeout is handled gracefully."""
device_id = "1234"
await async_setup_component(hass, esphome.DOMAIN, {esphome.DOMAIN: {}})
client = await hass_client()
async def _stdout_read(_size: int = -1) -> bytes:
await asyncio.sleep(0)
return b""
async def _proc_wait() -> None:
# Block forever so wait_for times out
await asyncio.Future()
mock_proc = AsyncMock()
mock_proc.stdout.read = _stdout_read
mock_proc.stderr.readline = AsyncMock(return_value=b"")
mock_proc.returncode = None
mock_proc.kill = MagicMock()
mock_proc.wait = _proc_wait
with (
patch("asyncio.create_subprocess_exec", return_value=mock_proc),
patch(f"{FFMPEG_PROXY}._PROC_WAIT_TIMEOUT", 0),
patch(f"{FFMPEG_PROXY}._STDERR_DRAIN_TIMEOUT", 0),
):
url = async_create_proxy_url(hass, device_id, "dummy-input", media_format="mp3")
req = await client.get(url)
assert req.status == HTTPStatus.OK
await req.content.read()
assert "Timed out waiting for ffmpeg process to exit" in caplog.text
async def test_ffmpeg_cleanup_on_cancellation(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
) -> None:
"""Test that ffmpeg process is killed when task is cancelled during cleanup."""
device_id = "1234"
await async_setup_component(hass, esphome.DOMAIN, {esphome.DOMAIN: {}})
client = await hass_client()
async def _stdout_read(_size: int = -1) -> bytes:
await asyncio.sleep(0)
return b""
async def _proc_wait() -> None:
# Simulate cancellation during proc.wait()
raise asyncio.CancelledError
mock_kill = MagicMock()
mock_proc = AsyncMock()
mock_proc.stdout.read = _stdout_read
mock_proc.stderr.readline = AsyncMock(return_value=b"")
mock_proc.returncode = None
mock_proc.kill = mock_kill
mock_proc.wait = _proc_wait
with patch("asyncio.create_subprocess_exec", return_value=mock_proc):
url = async_create_proxy_url(hass, device_id, "dummy-input", media_format="mp3")
req = await client.get(url)
assert req.status == HTTPStatus.OK
with pytest.raises(client_exceptions.ClientPayloadError):
await req.content.read()
# proc.kill should have been called (once in the initial check, once in the
# CancelledError handler)
assert mock_kill.call_count >= 1
async def test_ffmpeg_unexpected_exception(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test that unexpected exceptions during ffmpeg conversion are logged."""
device_id = "1234"
await async_setup_component(hass, esphome.DOMAIN, {esphome.DOMAIN: {}})
client = await hass_client()
async def _stdout_read_error(_size: int = -1) -> bytes:
raise RuntimeError("unexpected read error")
mock_proc = AsyncMock()
mock_proc.stdout.read = _stdout_read_error
mock_proc.stderr.readline = AsyncMock(return_value=b"")
mock_proc.returncode = 0
with patch("asyncio.create_subprocess_exec", return_value=mock_proc):
url = async_create_proxy_url(hass, device_id, "dummy-input", media_format="mp3")
req = await client.get(url)
assert req.status == HTTPStatus.OK
await req.content.read()
assert "Unexpected error during ffmpeg conversion" in caplog.text
async def test_max_conversions_kills_running_process(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test that exceeding max conversions kills a running ffmpeg process."""
device_id = "1234"
await async_setup_component(hass, esphome.DOMAIN, {esphome.DOMAIN: {}})
client = await hass_client()
stdout_futures: list[asyncio.Future[bytes]] = []
mock_kills: list[MagicMock] = []
procs_started = asyncio.Event()
proc_count = 0
def _make_mock_proc(*_args: object, **_kwargs: object) -> AsyncMock:
"""Create a mock ffmpeg process that blocks on stdout read."""
nonlocal proc_count
future: asyncio.Future[bytes] = hass.loop.create_future()
stdout_futures.append(future)
kill = MagicMock()
mock_kills.append(kill)
async def _stdout_read(_size: int = -1) -> bytes:
return await future
mock = AsyncMock()
mock.stdout.read = _stdout_read
mock.stderr.readline = AsyncMock(return_value=b"")
mock.returncode = None
mock.kill = kill
proc_count += 1
if proc_count >= 2:
procs_started.set()
return mock
with patch(
"asyncio.create_subprocess_exec",
side_effect=_make_mock_proc,
):
url1 = async_create_proxy_url(hass, device_id, "url1", media_format="mp3")
url2 = async_create_proxy_url(hass, device_id, "url2", media_format="mp3")
# Start both HTTP requests — each spawns an ffmpeg process that blocks
task1 = hass.async_create_task(client.get(url1))
task2 = hass.async_create_task(client.get(url2))
# Wait until both ffmpeg processes have been created
await procs_started.wait()
assert len(mock_kills) == 2
# Creating a third conversion should kill the oldest running process
async_create_proxy_url(hass, device_id, "url3", media_format="mp3")
assert "Stopping existing ffmpeg process" in caplog.text
mock_kills[0].assert_called_once()
# Unblock stdout reads so background tasks can finish
for future in stdout_futures:
if not future.done():
future.set_result(b"")
await task1
await task2
async def test_lingering_process(
hass: HomeAssistant,