Compare commits

...

1 Commits

Author SHA1 Message Date
Stefan Agner cb8d1f17b1 Reject backup uploads with unsafe inner name
Validate the name field read from backup.json so an attacker-controlled
absolute or separator-bearing value can't escape the backup directory
when it flows through suggested_filename into get_new_backup_path. Also
add a containment check at the filesystem boundary as defense in depth.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-27 18:18:44 +02:00
5 changed files with 95 additions and 7 deletions
+16 -3
View File
@@ -11,7 +11,7 @@ from homeassistant.helpers.hassio import is_hassio
from .agent import BackupAgent, LocalBackupAgent, OnProgressCallback
from .const import DOMAIN, LOGGER
from .models import AgentBackup, BackupNotFound
from .models import AgentBackup, BackupNotFound, InvalidBackupFilename
from .util import read_backup, suggested_filename
@@ -54,7 +54,13 @@ class CoreLocalBackupAgent(LocalBackupAgent):
try:
backup = read_backup(backup_path)
backups[backup.backup_id] = (backup, backup_path)
except (OSError, TarError, json.JSONDecodeError, KeyError) as err:
except (
OSError,
TarError,
json.JSONDecodeError,
KeyError,
InvalidBackupFilename,
) as err:
LOGGER.warning("Unable to read backup %s: %s", backup_path, err)
return backups
@@ -122,7 +128,14 @@ class CoreLocalBackupAgent(LocalBackupAgent):
def get_new_backup_path(self, backup: AgentBackup) -> Path:
"""Return the local path to a new backup."""
return self._backup_dir / suggested_filename(backup)
candidate = self._backup_dir / suggested_filename(backup)
# suggested_filename does not strip separators; refuse paths that would
# land outside the backup directory.
if candidate.parent != self._backup_dir:
raise InvalidBackupFilename(
f"Refusing to write outside {self._backup_dir}: {candidate}"
)
return candidate
async def async_delete_backup(self, backup_id: str, **kwargs: Any) -> None:
"""Delete a backup file."""
+7 -1
View File
@@ -1978,7 +1978,13 @@ class CoreBackupReaderWriter(BackupReaderWriter):
try:
backup = await async_add_executor_job(read_backup, temp_file)
except (OSError, tarfile.TarError, json.JSONDecodeError, KeyError) as err:
except (
OSError,
tarfile.TarError,
json.JSONDecodeError,
KeyError,
InvalidBackupFilename,
) as err:
LOGGER.warning("Unable to parse backup %s: %s", temp_file, err)
raise
+10 -3
View File
@@ -6,7 +6,7 @@ import copy
from dataclasses import dataclass, replace
from io import BytesIO
import json
from pathlib import Path, PurePath
from pathlib import Path, PurePath, PureWindowsPath
from queue import SimpleQueue
import tarfile
import threading
@@ -34,7 +34,7 @@ from homeassistant.util.async_iterator import (
from homeassistant.util.json import JsonObjectType, json_loads_object
from .const import BUF_SIZE, LOGGER, SECURETAR_CREATE_VERSION
from .models import AddonInfo, AgentBackup, Folder
from .models import AddonInfo, AgentBackup, Folder, InvalidBackupFilename
class DecryptError(HomeAssistantError):
@@ -109,6 +109,13 @@ def read_backup(backup_path: Path) -> AgentBackup:
extra_metadata = cast(dict[str, bool | str], data.get("extra", {}))
date = extra_metadata.get("supervisor.backup_request_date", data["date"])
name = cast(str, data["name"])
# The name is used to derive the on-disk filename via suggested_filename;
# reject anything that could escape the backup directory.
safe_name = PureWindowsPath(name).name
if safe_name != name or name in ("", ".", ".."):
raise InvalidBackupFilename(f"Invalid backup name: {name!r}")
return AgentBackup(
addons=addons,
backup_id=cast(str, data["slug"]),
@@ -118,7 +125,7 @@ def read_backup(backup_path: Path) -> AgentBackup:
folders=folders,
homeassistant_included=homeassistant_included,
homeassistant_version=homeassistant_version,
name=cast(str, data["name"]),
name=name,
protected=cast(bool, data.get("protected", False)),
size=backup_path.stat().st_size,
)
+30
View File
@@ -2088,6 +2088,36 @@ async def test_receive_backup_path_traversal(
assert resp.status == 400
@pytest.mark.parametrize(
"name",
[
"/absolute/path",
"../parent",
"with/slash",
],
)
async def test_receive_backup_rejects_unsafe_inner_name(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
name: str,
) -> None:
"""Test receive backup rejects an inner name that would escape the backup dir."""
await setup_backup_integration(hass)
client = await hass_client()
backup = replace(TEST_BACKUP_ABC123, name=name)
with patch(
"homeassistant.components.backup.manager.read_backup",
return_value=backup,
):
resp = await client.post(
"/api/backup/upload?agent_id=backup.local",
data={"file": StringIO("test")},
)
assert resp.status == 400
async def test_receive_backup_busy_manager(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
+32
View File
@@ -14,6 +14,7 @@ import pytest
import securetar
from homeassistant.components.backup import DOMAIN, AddonInfo, AgentBackup, Folder
from homeassistant.components.backup.models import InvalidBackupFilename
from homeassistant.components.backup.util import (
DecryptedBackupStreamer,
EncryptedBackupStreamer,
@@ -158,6 +159,37 @@ def test_read_backup(backup_json_content: bytes, expected_backup: AgentBackup) -
assert backup == expected_backup
@pytest.mark.parametrize(
"name",
[
"/absolute/path",
"../parent",
"with/slash",
"with\\backslash",
"C:\\drive\\path",
"",
".",
"..",
],
)
def test_read_backup_rejects_unsafe_name(name: str) -> None:
"""Test that read_backup rejects names that could escape the backup directory."""
backup_json_content = (
b'{"compressed":true,"date":"2024-12-02T07:23:58.261875-05:00","homeassistant":'
b'{"exclude_database":true,"version":"2024.12.0.dev0"},"name":"'
+ name.encode().replace(b"\\", b"\\\\")
+ b'","protected":true,"slug":"455645fe","type":"partial","version":2}'
)
mock_path = Mock()
mock_path.stat.return_value.st_size = 1234
with patch("homeassistant.components.backup.util.tarfile.open") as mock_open_tar:
tar_ctx = mock_open_tar.return_value.__enter__.return_value
tar_ctx.extractfile.return_value.read.return_value = backup_json_content
with pytest.raises(InvalidBackupFilename):
read_backup(mock_path)
@pytest.mark.parametrize(
("backup", "password", "validation_result", "expected_messages"),
[