mirror of
https://github.com/home-assistant/core.git
synced 2026-03-21 18:25:15 +01:00
Compare commits
16 Commits
dev
...
esphome-ff
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9847f1c9a4 | ||
|
|
0e3eb972cf | ||
|
|
cd60e14813 | ||
|
|
61e4c5b8e1 | ||
|
|
adc11061fe | ||
|
|
0f4d94fb5b | ||
|
|
d500aed92d | ||
|
|
a628bc5f6f | ||
|
|
cce20b1ec8 | ||
|
|
83a49af083 | ||
|
|
d8da249ae0 | ||
|
|
c24987bb3d | ||
|
|
0b744932ec | ||
|
|
c615194dbe | ||
|
|
96c7bd4882 | ||
|
|
afdc76646a |
@@ -1,10 +1,12 @@
|
||||
"""HTTP view that converts audio from a URL to a preferred format."""
|
||||
|
||||
import asyncio
|
||||
from collections import defaultdict
|
||||
from collections import defaultdict, deque
|
||||
import contextlib
|
||||
from dataclasses import dataclass, field
|
||||
from http import HTTPStatus
|
||||
import logging
|
||||
import re
|
||||
import secrets
|
||||
from typing import Final
|
||||
|
||||
@@ -22,6 +24,12 @@ from .const import DOMAIN
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
_MAX_CONVERSIONS_PER_DEVICE: Final[int] = 2
|
||||
_MAX_STDERR_LINES: Final[int] = 64
|
||||
_PROC_WAIT_TIMEOUT: Final[int] = 5
|
||||
_STDERR_DRAIN_TIMEOUT: Final[int] = 1
|
||||
_SENSITIVE_QUERY_PARAMS: Final[re.Pattern[str]] = re.compile(
|
||||
r"(?<=[?&])(authSig|token|key|password|secret)=[^&\s]+", re.IGNORECASE
|
||||
)
|
||||
|
||||
|
||||
@callback
|
||||
@@ -215,8 +223,10 @@ class FFmpegConvertResponse(web.StreamResponse):
|
||||
assert proc.stdout is not None
|
||||
assert proc.stderr is not None
|
||||
|
||||
stderr_lines: deque[str] = deque(maxlen=_MAX_STDERR_LINES)
|
||||
stderr_task = self.hass.async_create_background_task(
|
||||
self._dump_ffmpeg_stderr(proc), "ESPHome media proxy dump stderr"
|
||||
self._collect_ffmpeg_stderr(proc, stderr_lines),
|
||||
"ESPHome media proxy dump stderr",
|
||||
)
|
||||
|
||||
try:
|
||||
@@ -235,33 +245,80 @@ class FFmpegConvertResponse(web.StreamResponse):
|
||||
if request.transport:
|
||||
request.transport.abort()
|
||||
raise # don't log error
|
||||
except:
|
||||
except Exception:
|
||||
_LOGGER.exception("Unexpected error during ffmpeg conversion")
|
||||
raise
|
||||
finally:
|
||||
# Allow conversion info to be removed
|
||||
self.convert_info.is_finished = True
|
||||
|
||||
# stop dumping ffmpeg stderr task
|
||||
stderr_task.cancel()
|
||||
# Ensure subprocess and stderr cleanup run even if this task
|
||||
# is cancelled (e.g., during shutdown)
|
||||
try:
|
||||
# Terminate hangs, so kill is used
|
||||
if proc.returncode is None:
|
||||
proc.kill()
|
||||
|
||||
# Terminate hangs, so kill is used
|
||||
if proc.returncode is None:
|
||||
proc.kill()
|
||||
# Wait for process to exit so returncode is set
|
||||
await asyncio.wait_for(proc.wait(), timeout=_PROC_WAIT_TIMEOUT)
|
||||
|
||||
# Let stderr collector finish draining
|
||||
if not stderr_task.done():
|
||||
try:
|
||||
await asyncio.wait_for(
|
||||
stderr_task, timeout=_STDERR_DRAIN_TIMEOUT
|
||||
)
|
||||
except TimeoutError:
|
||||
stderr_task.cancel()
|
||||
with contextlib.suppress(asyncio.CancelledError):
|
||||
await stderr_task
|
||||
except TimeoutError:
|
||||
_LOGGER.warning(
|
||||
"Timed out waiting for ffmpeg process to exit for device %s",
|
||||
self.device_id,
|
||||
)
|
||||
stderr_task.cancel()
|
||||
with contextlib.suppress(asyncio.CancelledError):
|
||||
await stderr_task
|
||||
except asyncio.CancelledError:
|
||||
# Kill the process if we were interrupted
|
||||
if proc.returncode is None:
|
||||
proc.kill()
|
||||
stderr_task.cancel()
|
||||
raise
|
||||
|
||||
if proc.returncode is not None and proc.returncode > 0:
|
||||
_LOGGER.error(
|
||||
"FFmpeg conversion failed for device %s (return code %s):\n%s",
|
||||
self.device_id,
|
||||
proc.returncode,
|
||||
"\n".join(
|
||||
_SENSITIVE_QUERY_PARAMS.sub(r"\1=REDACTED", line)
|
||||
for line in stderr_lines
|
||||
),
|
||||
)
|
||||
|
||||
# Close connection by writing EOF unless already closing
|
||||
if request.transport and not request.transport.is_closing():
|
||||
await writer.write_eof()
|
||||
with contextlib.suppress(ConnectionResetError, RuntimeError, OSError):
|
||||
await writer.write_eof()
|
||||
|
||||
async def _dump_ffmpeg_stderr(
|
||||
async def _collect_ffmpeg_stderr(
|
||||
self,
|
||||
proc: asyncio.subprocess.Process,
|
||||
stderr_lines: deque[str],
|
||||
) -> None:
|
||||
assert proc.stdout is not None
|
||||
"""Collect stderr output from ffmpeg for error reporting."""
|
||||
assert proc.stderr is not None
|
||||
|
||||
while self.hass.is_running and (chunk := await proc.stderr.readline()):
|
||||
_LOGGER.debug("ffmpeg[%s] output: %s", proc.pid, chunk.decode().rstrip())
|
||||
line = chunk.decode(errors="replace").rstrip()
|
||||
stderr_lines.append(line)
|
||||
_LOGGER.debug(
|
||||
"ffmpeg[%s] output: %s",
|
||||
proc.pid,
|
||||
_SENSITIVE_QUERY_PARAMS.sub(r"\1=REDACTED", line),
|
||||
)
|
||||
|
||||
|
||||
class FFmpegProxyView(HomeAssistantView):
|
||||
|
||||
@@ -1,11 +1,13 @@
|
||||
"""Tests for ffmpeg proxy view."""
|
||||
|
||||
import asyncio
|
||||
from collections.abc import Generator
|
||||
from http import HTTPStatus
|
||||
import io
|
||||
import logging
|
||||
import os
|
||||
import tempfile
|
||||
from unittest.mock import patch
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
from urllib.request import pathname2url
|
||||
import wave
|
||||
|
||||
@@ -14,12 +16,17 @@ import mutagen
|
||||
import pytest
|
||||
|
||||
from homeassistant.components import esphome
|
||||
from homeassistant.components.esphome.ffmpeg_proxy import async_create_proxy_url
|
||||
from homeassistant.components.esphome.ffmpeg_proxy import (
|
||||
_MAX_STDERR_LINES,
|
||||
async_create_proxy_url,
|
||||
)
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.setup import async_setup_component
|
||||
|
||||
from tests.typing import ClientSessionGenerator
|
||||
|
||||
FFMPEG_PROXY = "homeassistant.components.esphome.ffmpeg_proxy"
|
||||
|
||||
|
||||
@pytest.fixture(name="wav_file_length")
|
||||
def wav_file_length_fixture() -> int:
|
||||
@@ -119,6 +126,7 @@ async def test_proxy_view(
|
||||
async def test_ffmpeg_file_doesnt_exist(
|
||||
hass: HomeAssistant,
|
||||
hass_client: ClientSessionGenerator,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
"""Test ffmpeg conversion with a file that doesn't exist."""
|
||||
device_id = "1234"
|
||||
@@ -136,6 +144,327 @@ async def test_ffmpeg_file_doesnt_exist(
|
||||
mp3_data = await req.content.read()
|
||||
assert not mp3_data
|
||||
|
||||
# ffmpeg failure should be logged at error level
|
||||
assert "FFmpeg conversion failed for device" in caplog.text
|
||||
assert device_id in caplog.text
|
||||
|
||||
|
||||
async def test_ffmpeg_error_stderr_truncated(
|
||||
hass: HomeAssistant,
|
||||
hass_client: ClientSessionGenerator,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
"""Test that ffmpeg stderr output is truncated in error logs."""
|
||||
device_id = "1234"
|
||||
|
||||
await async_setup_component(hass, esphome.DOMAIN, {esphome.DOMAIN: {}})
|
||||
client = await hass_client()
|
||||
|
||||
total_lines = _MAX_STDERR_LINES + 50
|
||||
stderr_lines_data = [f"stderr line {i}\n".encode() for i in range(total_lines)] + [
|
||||
b""
|
||||
]
|
||||
|
||||
async def _stdout_read(_size: int = -1) -> bytes:
|
||||
"""Yield to event loop so stderr collector can run, then return EOF."""
|
||||
await asyncio.sleep(0)
|
||||
return b""
|
||||
|
||||
mock_proc = AsyncMock()
|
||||
mock_proc.stdout.read = _stdout_read
|
||||
mock_proc.stderr.readline = AsyncMock(side_effect=stderr_lines_data)
|
||||
mock_proc.returncode = 1
|
||||
|
||||
with patch("asyncio.create_subprocess_exec", return_value=mock_proc):
|
||||
url = async_create_proxy_url(hass, device_id, "dummy-input", media_format="mp3")
|
||||
req = await client.get(url)
|
||||
assert req.status == HTTPStatus.OK
|
||||
await req.content.read()
|
||||
|
||||
# Should log an error with stderr content
|
||||
assert "FFmpeg conversion failed for device" in caplog.text
|
||||
|
||||
# Find the error message to verify truncation.
|
||||
# We can't just check caplog.text because lines beyond the limit
|
||||
# are still present at debug level from _collect_ffmpeg_stderr.
|
||||
error_message = next(
|
||||
r.message
|
||||
for r in caplog.records
|
||||
if r.levelno >= logging.ERROR and "FFmpeg conversion failed" in r.message
|
||||
)
|
||||
|
||||
total_lines = _MAX_STDERR_LINES + 50
|
||||
|
||||
# The last _MAX_STDERR_LINES lines should be present
|
||||
for i in range(total_lines - _MAX_STDERR_LINES, total_lines):
|
||||
assert f"stderr line {i}" in error_message
|
||||
|
||||
# Early lines that were evicted should not be in the error log
|
||||
assert "stderr line 0" not in error_message
|
||||
|
||||
|
||||
async def test_ffmpeg_error_redacts_sensitive_urls(
|
||||
hass: HomeAssistant,
|
||||
hass_client: ClientSessionGenerator,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
"""Test that sensitive query params are redacted in error logs."""
|
||||
device_id = "1234"
|
||||
|
||||
await async_setup_component(hass, esphome.DOMAIN, {esphome.DOMAIN: {}})
|
||||
client = await hass_client()
|
||||
|
||||
sensitive_url = (
|
||||
"https://example.com/api/tts?authSig=secret123&token=abc456&other=keep"
|
||||
)
|
||||
stderr_lines_data = [
|
||||
f"Error opening input file {sensitive_url}\n".encode(),
|
||||
b"",
|
||||
]
|
||||
|
||||
async def _stdout_read(_size: int = -1) -> bytes:
|
||||
await asyncio.sleep(0)
|
||||
return b""
|
||||
|
||||
mock_proc = AsyncMock()
|
||||
mock_proc.stdout.read = _stdout_read
|
||||
mock_proc.stderr.readline = AsyncMock(side_effect=stderr_lines_data)
|
||||
mock_proc.returncode = 1
|
||||
|
||||
with patch("asyncio.create_subprocess_exec", return_value=mock_proc):
|
||||
url = async_create_proxy_url(hass, device_id, "dummy-input", media_format="mp3")
|
||||
req = await client.get(url)
|
||||
assert req.status == HTTPStatus.OK
|
||||
await req.content.read()
|
||||
|
||||
error_message = next(
|
||||
r.message
|
||||
for r in caplog.records
|
||||
if r.levelno >= logging.ERROR and "FFmpeg conversion failed" in r.message
|
||||
)
|
||||
|
||||
assert "authSig=REDACTED" in error_message
|
||||
assert "token=REDACTED" in error_message
|
||||
assert "secret123" not in error_message
|
||||
assert "abc456" not in error_message
|
||||
assert "other=keep" in error_message
|
||||
|
||||
|
||||
async def test_ffmpeg_stderr_drain_timeout(
|
||||
hass: HomeAssistant,
|
||||
hass_client: ClientSessionGenerator,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
"""Test that stderr drain timeout is handled gracefully."""
|
||||
device_id = "1234"
|
||||
|
||||
await async_setup_component(hass, esphome.DOMAIN, {esphome.DOMAIN: {}})
|
||||
client = await hass_client()
|
||||
|
||||
never_finish: asyncio.Future[bytes] = asyncio.get_running_loop().create_future()
|
||||
|
||||
call_count = 0
|
||||
|
||||
async def _slow_stderr_readline() -> bytes:
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
if call_count == 1:
|
||||
return b"first error line\n"
|
||||
# Block forever on second call so the drain times out
|
||||
return await never_finish
|
||||
|
||||
async def _stdout_read(_size: int = -1) -> bytes:
|
||||
await asyncio.sleep(0)
|
||||
return b""
|
||||
|
||||
mock_proc = AsyncMock()
|
||||
mock_proc.stdout.read = _stdout_read
|
||||
mock_proc.stderr.readline = _slow_stderr_readline
|
||||
mock_proc.returncode = 1
|
||||
|
||||
with (
|
||||
patch("asyncio.create_subprocess_exec", return_value=mock_proc),
|
||||
patch(f"{FFMPEG_PROXY}._STDERR_DRAIN_TIMEOUT", 0),
|
||||
):
|
||||
url = async_create_proxy_url(hass, device_id, "dummy-input", media_format="mp3")
|
||||
req = await client.get(url)
|
||||
assert req.status == HTTPStatus.OK
|
||||
await req.content.read()
|
||||
|
||||
assert "FFmpeg conversion failed for device" in caplog.text
|
||||
assert "first error line" in caplog.text
|
||||
|
||||
|
||||
async def test_ffmpeg_proc_wait_timeout(
|
||||
hass: HomeAssistant,
|
||||
hass_client: ClientSessionGenerator,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
"""Test that proc.wait() timeout is handled gracefully."""
|
||||
device_id = "1234"
|
||||
|
||||
await async_setup_component(hass, esphome.DOMAIN, {esphome.DOMAIN: {}})
|
||||
client = await hass_client()
|
||||
|
||||
async def _stdout_read(_size: int = -1) -> bytes:
|
||||
await asyncio.sleep(0)
|
||||
return b""
|
||||
|
||||
async def _proc_wait() -> None:
|
||||
# Block forever so wait_for times out
|
||||
await asyncio.Future()
|
||||
|
||||
mock_proc = AsyncMock()
|
||||
mock_proc.stdout.read = _stdout_read
|
||||
mock_proc.stderr.readline = AsyncMock(return_value=b"")
|
||||
mock_proc.returncode = None
|
||||
mock_proc.kill = MagicMock()
|
||||
mock_proc.wait = _proc_wait
|
||||
|
||||
with (
|
||||
patch("asyncio.create_subprocess_exec", return_value=mock_proc),
|
||||
patch(f"{FFMPEG_PROXY}._PROC_WAIT_TIMEOUT", 0),
|
||||
patch(f"{FFMPEG_PROXY}._STDERR_DRAIN_TIMEOUT", 0),
|
||||
):
|
||||
url = async_create_proxy_url(hass, device_id, "dummy-input", media_format="mp3")
|
||||
req = await client.get(url)
|
||||
assert req.status == HTTPStatus.OK
|
||||
await req.content.read()
|
||||
|
||||
assert "Timed out waiting for ffmpeg process to exit" in caplog.text
|
||||
|
||||
|
||||
async def test_ffmpeg_cleanup_on_cancellation(
|
||||
hass: HomeAssistant,
|
||||
hass_client: ClientSessionGenerator,
|
||||
) -> None:
|
||||
"""Test that ffmpeg process is killed when task is cancelled during cleanup."""
|
||||
device_id = "1234"
|
||||
|
||||
await async_setup_component(hass, esphome.DOMAIN, {esphome.DOMAIN: {}})
|
||||
client = await hass_client()
|
||||
|
||||
async def _stdout_read(_size: int = -1) -> bytes:
|
||||
await asyncio.sleep(0)
|
||||
return b""
|
||||
|
||||
async def _proc_wait() -> None:
|
||||
# Simulate cancellation during proc.wait()
|
||||
raise asyncio.CancelledError
|
||||
|
||||
mock_kill = MagicMock()
|
||||
mock_proc = AsyncMock()
|
||||
mock_proc.stdout.read = _stdout_read
|
||||
mock_proc.stderr.readline = AsyncMock(return_value=b"")
|
||||
mock_proc.returncode = None
|
||||
mock_proc.kill = mock_kill
|
||||
mock_proc.wait = _proc_wait
|
||||
|
||||
with patch("asyncio.create_subprocess_exec", return_value=mock_proc):
|
||||
url = async_create_proxy_url(hass, device_id, "dummy-input", media_format="mp3")
|
||||
req = await client.get(url)
|
||||
assert req.status == HTTPStatus.OK
|
||||
with pytest.raises(client_exceptions.ClientPayloadError):
|
||||
await req.content.read()
|
||||
|
||||
# proc.kill should have been called (once in the initial check, once in the
|
||||
# CancelledError handler)
|
||||
assert mock_kill.call_count >= 1
|
||||
|
||||
|
||||
async def test_ffmpeg_unexpected_exception(
|
||||
hass: HomeAssistant,
|
||||
hass_client: ClientSessionGenerator,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
"""Test that unexpected exceptions during ffmpeg conversion are logged."""
|
||||
device_id = "1234"
|
||||
|
||||
await async_setup_component(hass, esphome.DOMAIN, {esphome.DOMAIN: {}})
|
||||
client = await hass_client()
|
||||
|
||||
async def _stdout_read_error(_size: int = -1) -> bytes:
|
||||
raise RuntimeError("unexpected read error")
|
||||
|
||||
mock_proc = AsyncMock()
|
||||
mock_proc.stdout.read = _stdout_read_error
|
||||
mock_proc.stderr.readline = AsyncMock(return_value=b"")
|
||||
mock_proc.returncode = 0
|
||||
|
||||
with patch("asyncio.create_subprocess_exec", return_value=mock_proc):
|
||||
url = async_create_proxy_url(hass, device_id, "dummy-input", media_format="mp3")
|
||||
req = await client.get(url)
|
||||
assert req.status == HTTPStatus.OK
|
||||
await req.content.read()
|
||||
|
||||
assert "Unexpected error during ffmpeg conversion" in caplog.text
|
||||
|
||||
|
||||
async def test_max_conversions_kills_running_process(
|
||||
hass: HomeAssistant,
|
||||
hass_client: ClientSessionGenerator,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
"""Test that exceeding max conversions kills a running ffmpeg process."""
|
||||
device_id = "1234"
|
||||
|
||||
await async_setup_component(hass, esphome.DOMAIN, {esphome.DOMAIN: {}})
|
||||
client = await hass_client()
|
||||
|
||||
stdout_futures: list[asyncio.Future[bytes]] = []
|
||||
mock_kills: list[MagicMock] = []
|
||||
procs_started = asyncio.Event()
|
||||
proc_count = 0
|
||||
|
||||
def _make_mock_proc(*_args: object, **_kwargs: object) -> AsyncMock:
|
||||
"""Create a mock ffmpeg process that blocks on stdout read."""
|
||||
nonlocal proc_count
|
||||
future: asyncio.Future[bytes] = hass.loop.create_future()
|
||||
stdout_futures.append(future)
|
||||
kill = MagicMock()
|
||||
mock_kills.append(kill)
|
||||
|
||||
async def _stdout_read(_size: int = -1) -> bytes:
|
||||
return await future
|
||||
|
||||
mock = AsyncMock()
|
||||
mock.stdout.read = _stdout_read
|
||||
mock.stderr.readline = AsyncMock(return_value=b"")
|
||||
mock.returncode = None
|
||||
mock.kill = kill
|
||||
proc_count += 1
|
||||
if proc_count >= 2:
|
||||
procs_started.set()
|
||||
return mock
|
||||
|
||||
with patch(
|
||||
"asyncio.create_subprocess_exec",
|
||||
side_effect=_make_mock_proc,
|
||||
):
|
||||
url1 = async_create_proxy_url(hass, device_id, "url1", media_format="mp3")
|
||||
url2 = async_create_proxy_url(hass, device_id, "url2", media_format="mp3")
|
||||
|
||||
# Start both HTTP requests — each spawns an ffmpeg process that blocks
|
||||
task1 = hass.async_create_task(client.get(url1))
|
||||
task2 = hass.async_create_task(client.get(url2))
|
||||
|
||||
# Wait until both ffmpeg processes have been created
|
||||
await procs_started.wait()
|
||||
assert len(mock_kills) == 2
|
||||
|
||||
# Creating a third conversion should kill the oldest running process
|
||||
async_create_proxy_url(hass, device_id, "url3", media_format="mp3")
|
||||
assert "Stopping existing ffmpeg process" in caplog.text
|
||||
mock_kills[0].assert_called_once()
|
||||
|
||||
# Unblock stdout reads so background tasks can finish
|
||||
for future in stdout_futures:
|
||||
if not future.done():
|
||||
future.set_result(b"")
|
||||
|
||||
await task1
|
||||
await task2
|
||||
|
||||
|
||||
async def test_lingering_process(
|
||||
hass: HomeAssistant,
|
||||
|
||||
Reference in New Issue
Block a user