mirror of
https://github.com/home-assistant/core.git
synced 2026-05-27 19:25:18 +02:00
Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| cb8d1f17b1 |
@@ -92,7 +92,8 @@ def _extract_backup(
|
||||
):
|
||||
ostf.tar.extractall(
|
||||
path=Path(tempdir, "extracted"),
|
||||
filter="tar",
|
||||
members=securetar.secure_path(ostf.tar),
|
||||
filter="fully_trusted",
|
||||
)
|
||||
backup_meta_file = Path(tempdir, "extracted", "backup.json")
|
||||
backup_meta = json.loads(backup_meta_file.read_text(encoding="utf8"))
|
||||
@@ -118,7 +119,8 @@ def _extract_backup(
|
||||
) as istf:
|
||||
istf.extractall(
|
||||
path=Path(tempdir, "homeassistant"),
|
||||
filter="tar",
|
||||
members=securetar.secure_path(istf),
|
||||
filter="fully_trusted",
|
||||
)
|
||||
if restore_content.restore_homeassistant:
|
||||
keep = list(KEEP_BACKUPS)
|
||||
|
||||
@@ -11,7 +11,7 @@ from homeassistant.helpers.hassio import is_hassio
|
||||
|
||||
from .agent import BackupAgent, LocalBackupAgent, OnProgressCallback
|
||||
from .const import DOMAIN, LOGGER
|
||||
from .models import AgentBackup, BackupNotFound
|
||||
from .models import AgentBackup, BackupNotFound, InvalidBackupFilename
|
||||
from .util import read_backup, suggested_filename
|
||||
|
||||
|
||||
@@ -54,7 +54,13 @@ class CoreLocalBackupAgent(LocalBackupAgent):
|
||||
try:
|
||||
backup = read_backup(backup_path)
|
||||
backups[backup.backup_id] = (backup, backup_path)
|
||||
except (OSError, TarError, json.JSONDecodeError, KeyError) as err:
|
||||
except (
|
||||
OSError,
|
||||
TarError,
|
||||
json.JSONDecodeError,
|
||||
KeyError,
|
||||
InvalidBackupFilename,
|
||||
) as err:
|
||||
LOGGER.warning("Unable to read backup %s: %s", backup_path, err)
|
||||
return backups
|
||||
|
||||
@@ -122,7 +128,14 @@ class CoreLocalBackupAgent(LocalBackupAgent):
|
||||
|
||||
def get_new_backup_path(self, backup: AgentBackup) -> Path:
|
||||
"""Return the local path to a new backup."""
|
||||
return self._backup_dir / suggested_filename(backup)
|
||||
candidate = self._backup_dir / suggested_filename(backup)
|
||||
# suggested_filename does not strip separators; refuse paths that would
|
||||
# land outside the backup directory.
|
||||
if candidate.parent != self._backup_dir:
|
||||
raise InvalidBackupFilename(
|
||||
f"Refusing to write outside {self._backup_dir}: {candidate}"
|
||||
)
|
||||
return candidate
|
||||
|
||||
async def async_delete_backup(self, backup_id: str, **kwargs: Any) -> None:
|
||||
"""Delete a backup file."""
|
||||
|
||||
@@ -1978,7 +1978,13 @@ class CoreBackupReaderWriter(BackupReaderWriter):
|
||||
|
||||
try:
|
||||
backup = await async_add_executor_job(read_backup, temp_file)
|
||||
except (OSError, tarfile.TarError, json.JSONDecodeError, KeyError) as err:
|
||||
except (
|
||||
OSError,
|
||||
tarfile.TarError,
|
||||
json.JSONDecodeError,
|
||||
KeyError,
|
||||
InvalidBackupFilename,
|
||||
) as err:
|
||||
LOGGER.warning("Unable to parse backup %s: %s", temp_file, err)
|
||||
raise
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@ import copy
|
||||
from dataclasses import dataclass, replace
|
||||
from io import BytesIO
|
||||
import json
|
||||
from pathlib import Path, PurePath
|
||||
from pathlib import Path, PurePath, PureWindowsPath
|
||||
from queue import SimpleQueue
|
||||
import tarfile
|
||||
import threading
|
||||
@@ -34,7 +34,7 @@ from homeassistant.util.async_iterator import (
|
||||
from homeassistant.util.json import JsonObjectType, json_loads_object
|
||||
|
||||
from .const import BUF_SIZE, LOGGER, SECURETAR_CREATE_VERSION
|
||||
from .models import AddonInfo, AgentBackup, Folder
|
||||
from .models import AddonInfo, AgentBackup, Folder, InvalidBackupFilename
|
||||
|
||||
|
||||
class DecryptError(HomeAssistantError):
|
||||
@@ -109,6 +109,13 @@ def read_backup(backup_path: Path) -> AgentBackup:
|
||||
extra_metadata = cast(dict[str, bool | str], data.get("extra", {}))
|
||||
date = extra_metadata.get("supervisor.backup_request_date", data["date"])
|
||||
|
||||
name = cast(str, data["name"])
|
||||
# The name is used to derive the on-disk filename via suggested_filename;
|
||||
# reject anything that could escape the backup directory.
|
||||
safe_name = PureWindowsPath(name).name
|
||||
if safe_name != name or name in ("", ".", ".."):
|
||||
raise InvalidBackupFilename(f"Invalid backup name: {name!r}")
|
||||
|
||||
return AgentBackup(
|
||||
addons=addons,
|
||||
backup_id=cast(str, data["slug"]),
|
||||
@@ -118,7 +125,7 @@ def read_backup(backup_path: Path) -> AgentBackup:
|
||||
folders=folders,
|
||||
homeassistant_included=homeassistant_included,
|
||||
homeassistant_version=homeassistant_version,
|
||||
name=cast(str, data["name"]),
|
||||
name=name,
|
||||
protected=cast(bool, data.get("protected", False)),
|
||||
size=backup_path.stat().st_size,
|
||||
)
|
||||
|
||||
@@ -21,5 +21,5 @@
|
||||
"integration_type": "system",
|
||||
"preview_features": { "winter_mode": {} },
|
||||
"quality_scale": "internal",
|
||||
"requirements": ["home-assistant-frontend==20260527.0"]
|
||||
"requirements": ["home-assistant-frontend==20260429.4"]
|
||||
}
|
||||
|
||||
@@ -939,7 +939,6 @@ class DPCode(StrEnum):
|
||||
TEMP_INDOOR = "temp_indoor" # Indoor temperature in °C
|
||||
TEMP_SET = "temp_set" # Set the temperature in °C
|
||||
TEMP_SET_F = "temp_set_f" # Set the temperature in °F
|
||||
TEMP_SETTING_QUICK_C = "temp_setting_quick_c"
|
||||
TEMP_UNIT_CONVERT = "temp_unit_convert" # Temperature unit switching
|
||||
TEMP_VALUE = "temp_value" # Color temperature
|
||||
TEMP_VALUE_V2 = "temp_value_v2"
|
||||
@@ -993,7 +992,6 @@ class DPCode(StrEnum):
|
||||
WORK_POWER = "work_power"
|
||||
WORK_STATE = "work_state"
|
||||
WORK_STATE_E = "work_state_e"
|
||||
WORK_TYPE = "work_type"
|
||||
|
||||
|
||||
@dataclass
|
||||
|
||||
@@ -19,18 +19,6 @@ from .entity import TuyaEntity
|
||||
# All descriptions can be found here. Mostly the Enum data types in the
|
||||
# default instructions set of each category end up being a select.
|
||||
SELECTS: dict[DeviceCategory, tuple[SelectEntityDescription, ...]] = {
|
||||
DeviceCategory.BH: (
|
||||
SelectEntityDescription(
|
||||
key=DPCode.TEMP_SETTING_QUICK_C,
|
||||
entity_category=EntityCategory.CONFIG,
|
||||
translation_key="quick_heat_temperature",
|
||||
),
|
||||
SelectEntityDescription(
|
||||
key=DPCode.WORK_TYPE,
|
||||
entity_category=EntityCategory.CONFIG,
|
||||
translation_key="kettle_work_mode",
|
||||
),
|
||||
),
|
||||
DeviceCategory.CL: (
|
||||
SelectEntityDescription(
|
||||
key=DPCode.CONTROL_BACK_MODE,
|
||||
|
||||
@@ -478,15 +478,6 @@
|
||||
"1": "Continuous working mode"
|
||||
}
|
||||
},
|
||||
"kettle_work_mode": {
|
||||
"name": "Work mode",
|
||||
"state": {
|
||||
"boiling_quick": "Quick boil",
|
||||
"setting_quick": "Quick heat",
|
||||
"temp_boiling": "Boil and keep warm",
|
||||
"temp_setting": "Heat and keep warm"
|
||||
}
|
||||
},
|
||||
"led_type": {
|
||||
"name": "Light source type",
|
||||
"state": {
|
||||
@@ -524,16 +515,6 @@
|
||||
"smart": "Smart"
|
||||
}
|
||||
},
|
||||
"quick_heat_temperature": {
|
||||
"name": "Quick heat temperature",
|
||||
"state": {
|
||||
"80": "80 °C",
|
||||
"85": "85 °C",
|
||||
"90": "90 °C",
|
||||
"95": "95 °C",
|
||||
"100": "100 °C"
|
||||
}
|
||||
},
|
||||
"record_mode": {
|
||||
"name": "Record mode",
|
||||
"state": {
|
||||
|
||||
@@ -39,7 +39,7 @@ habluetooth==6.7.9
|
||||
hass-nabucasa==2.2.0
|
||||
hassil==3.5.0
|
||||
home-assistant-bluetooth==2.0.0
|
||||
home-assistant-frontend==20260527.0
|
||||
home-assistant-frontend==20260429.4
|
||||
home-assistant-intents==2026.5.5
|
||||
httpx==0.28.1
|
||||
ifaddr==0.2.0
|
||||
|
||||
+1
-1
@@ -5,7 +5,7 @@ To update, run python3 -m script.hassfest
|
||||
|
||||
from typing import Final
|
||||
|
||||
FRONTEND_VERSION: Final[str] = "20260527.0"
|
||||
FRONTEND_VERSION: Final[str] = "20260429.4"
|
||||
|
||||
MDI_ICONS: Final[set[str]] = {
|
||||
"ab-testing",
|
||||
|
||||
Generated
+1
-1
@@ -1266,7 +1266,7 @@ hole==0.9.0
|
||||
holidays==0.97
|
||||
|
||||
# homeassistant.components.frontend
|
||||
home-assistant-frontend==20260527.0
|
||||
home-assistant-frontend==20260429.4
|
||||
|
||||
# homeassistant.components.conversation
|
||||
home-assistant-intents==2026.5.5
|
||||
|
||||
@@ -2088,6 +2088,36 @@ async def test_receive_backup_path_traversal(
|
||||
assert resp.status == 400
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"name",
|
||||
[
|
||||
"/absolute/path",
|
||||
"../parent",
|
||||
"with/slash",
|
||||
],
|
||||
)
|
||||
async def test_receive_backup_rejects_unsafe_inner_name(
|
||||
hass: HomeAssistant,
|
||||
hass_client: ClientSessionGenerator,
|
||||
name: str,
|
||||
) -> None:
|
||||
"""Test receive backup rejects an inner name that would escape the backup dir."""
|
||||
await setup_backup_integration(hass)
|
||||
client = await hass_client()
|
||||
|
||||
backup = replace(TEST_BACKUP_ABC123, name=name)
|
||||
with patch(
|
||||
"homeassistant.components.backup.manager.read_backup",
|
||||
return_value=backup,
|
||||
):
|
||||
resp = await client.post(
|
||||
"/api/backup/upload?agent_id=backup.local",
|
||||
data={"file": StringIO("test")},
|
||||
)
|
||||
|
||||
assert resp.status == 400
|
||||
|
||||
|
||||
async def test_receive_backup_busy_manager(
|
||||
hass: HomeAssistant,
|
||||
hass_client: ClientSessionGenerator,
|
||||
|
||||
@@ -14,6 +14,7 @@ import pytest
|
||||
import securetar
|
||||
|
||||
from homeassistant.components.backup import DOMAIN, AddonInfo, AgentBackup, Folder
|
||||
from homeassistant.components.backup.models import InvalidBackupFilename
|
||||
from homeassistant.components.backup.util import (
|
||||
DecryptedBackupStreamer,
|
||||
EncryptedBackupStreamer,
|
||||
@@ -158,6 +159,37 @@ def test_read_backup(backup_json_content: bytes, expected_backup: AgentBackup) -
|
||||
assert backup == expected_backup
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"name",
|
||||
[
|
||||
"/absolute/path",
|
||||
"../parent",
|
||||
"with/slash",
|
||||
"with\\backslash",
|
||||
"C:\\drive\\path",
|
||||
"",
|
||||
".",
|
||||
"..",
|
||||
],
|
||||
)
|
||||
def test_read_backup_rejects_unsafe_name(name: str) -> None:
|
||||
"""Test that read_backup rejects names that could escape the backup directory."""
|
||||
backup_json_content = (
|
||||
b'{"compressed":true,"date":"2024-12-02T07:23:58.261875-05:00","homeassistant":'
|
||||
b'{"exclude_database":true,"version":"2024.12.0.dev0"},"name":"'
|
||||
+ name.encode().replace(b"\\", b"\\\\")
|
||||
+ b'","protected":true,"slug":"455645fe","type":"partial","version":2}'
|
||||
)
|
||||
mock_path = Mock()
|
||||
mock_path.stat.return_value.st_size = 1234
|
||||
|
||||
with patch("homeassistant.components.backup.util.tarfile.open") as mock_open_tar:
|
||||
tar_ctx = mock_open_tar.return_value.__enter__.return_value
|
||||
tar_ctx.extractfile.return_value.read.return_value = backup_json_content
|
||||
with pytest.raises(InvalidBackupFilename):
|
||||
read_backup(mock_path)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("backup", "password", "validation_result", "expected_messages"),
|
||||
[
|
||||
|
||||
@@ -5637,128 +5637,6 @@
|
||||
'state': 'low',
|
||||
})
|
||||
# ---
|
||||
# name: test_platform_setup_and_discovery[select.smart_kettle_quick_heat_temperature-entry]
|
||||
EntityRegistryEntrySnapshot({
|
||||
'aliases': list([
|
||||
None,
|
||||
]),
|
||||
'area_id': None,
|
||||
'capabilities': dict({
|
||||
'options': list([
|
||||
'85',
|
||||
'90',
|
||||
]),
|
||||
}),
|
||||
'config_entry_id': <ANY>,
|
||||
'config_subentry_id': <ANY>,
|
||||
'device_class': None,
|
||||
'device_id': <ANY>,
|
||||
'disabled_by': None,
|
||||
'domain': 'select',
|
||||
'entity_category': <EntityCategory.CONFIG: 'config'>,
|
||||
'entity_id': 'select.smart_kettle_quick_heat_temperature',
|
||||
'has_entity_name': True,
|
||||
'hidden_by': None,
|
||||
'icon': None,
|
||||
'id': <ANY>,
|
||||
'labels': set({
|
||||
}),
|
||||
'name': None,
|
||||
'object_id_base': 'Quick heat temperature',
|
||||
'options': dict({
|
||||
}),
|
||||
'original_device_class': None,
|
||||
'original_icon': None,
|
||||
'original_name': 'Quick heat temperature',
|
||||
'platform': 'tuya',
|
||||
'previous_unique_id': None,
|
||||
'suggested_object_id': None,
|
||||
'supported_features': 0,
|
||||
'translation_key': 'quick_heat_temperature',
|
||||
'unique_id': 'tuya.s5ah3novtabe4tfdhbtemp_setting_quick_c',
|
||||
'unit_of_measurement': None,
|
||||
})
|
||||
# ---
|
||||
# name: test_platform_setup_and_discovery[select.smart_kettle_quick_heat_temperature-state]
|
||||
StateSnapshot({
|
||||
'attributes': ReadOnlyDict({
|
||||
'friendly_name': 'Smart Kettle Quick heat temperature',
|
||||
'options': list([
|
||||
'85',
|
||||
'90',
|
||||
]),
|
||||
}),
|
||||
'context': <ANY>,
|
||||
'entity_id': 'select.smart_kettle_quick_heat_temperature',
|
||||
'last_changed': <ANY>,
|
||||
'last_reported': <ANY>,
|
||||
'last_updated': <ANY>,
|
||||
'state': '85',
|
||||
})
|
||||
# ---
|
||||
# name: test_platform_setup_and_discovery[select.smart_kettle_work_mode-entry]
|
||||
EntityRegistryEntrySnapshot({
|
||||
'aliases': list([
|
||||
None,
|
||||
]),
|
||||
'area_id': None,
|
||||
'capabilities': dict({
|
||||
'options': list([
|
||||
'setting_quick',
|
||||
'boiling_quick',
|
||||
'temp_setting',
|
||||
'temp_boiling',
|
||||
]),
|
||||
}),
|
||||
'config_entry_id': <ANY>,
|
||||
'config_subentry_id': <ANY>,
|
||||
'device_class': None,
|
||||
'device_id': <ANY>,
|
||||
'disabled_by': None,
|
||||
'domain': 'select',
|
||||
'entity_category': <EntityCategory.CONFIG: 'config'>,
|
||||
'entity_id': 'select.smart_kettle_work_mode',
|
||||
'has_entity_name': True,
|
||||
'hidden_by': None,
|
||||
'icon': None,
|
||||
'id': <ANY>,
|
||||
'labels': set({
|
||||
}),
|
||||
'name': None,
|
||||
'object_id_base': 'Work mode',
|
||||
'options': dict({
|
||||
}),
|
||||
'original_device_class': None,
|
||||
'original_icon': None,
|
||||
'original_name': 'Work mode',
|
||||
'platform': 'tuya',
|
||||
'previous_unique_id': None,
|
||||
'suggested_object_id': None,
|
||||
'supported_features': 0,
|
||||
'translation_key': 'kettle_work_mode',
|
||||
'unique_id': 'tuya.s5ah3novtabe4tfdhbwork_type',
|
||||
'unit_of_measurement': None,
|
||||
})
|
||||
# ---
|
||||
# name: test_platform_setup_and_discovery[select.smart_kettle_work_mode-state]
|
||||
StateSnapshot({
|
||||
'attributes': ReadOnlyDict({
|
||||
'friendly_name': 'Smart Kettle Work mode',
|
||||
'options': list([
|
||||
'setting_quick',
|
||||
'boiling_quick',
|
||||
'temp_setting',
|
||||
'temp_boiling',
|
||||
]),
|
||||
}),
|
||||
'context': <ANY>,
|
||||
'entity_id': 'select.smart_kettle_work_mode',
|
||||
'last_changed': <ANY>,
|
||||
'last_reported': <ANY>,
|
||||
'last_updated': <ANY>,
|
||||
'state': 'setting_quick',
|
||||
})
|
||||
# ---
|
||||
# name: test_platform_setup_and_discovery[select.smart_odor_eliminator_pro_odor_elimination_mode-entry]
|
||||
EntityRegistryEntrySnapshot({
|
||||
'aliases': list([
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
"""Test methods in backup_restore."""
|
||||
|
||||
from io import BytesIO
|
||||
import json
|
||||
from pathlib import Path
|
||||
import tarfile
|
||||
@@ -387,8 +386,8 @@ def test_restore_backup(
|
||||
}
|
||||
|
||||
|
||||
def test_restore_backup_rejects_unsafe_files(tmp_path: Path) -> None:
|
||||
"""Test that a backup with unsafe paths is rejected."""
|
||||
def test_restore_backup_filter_files(tmp_path: Path) -> None:
|
||||
"""Test filtering dangerous files when restoring a backup."""
|
||||
backup_file_path = tmp_path / "backups" / "test.tar"
|
||||
backup_file_path.parent.mkdir()
|
||||
get_fixture_path(
|
||||
@@ -411,55 +410,7 @@ def test_restore_backup_rejects_unsafe_files(tmp_path: Path) -> None:
|
||||
"data/home-assistant_v2.db-wal",
|
||||
}
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"homeassistant.backup_restore.restore_backup_file_content",
|
||||
return_value=backup_restore.RestoreBackupFileContent(
|
||||
backup_file_path=backup_file_path,
|
||||
password=None,
|
||||
remove_after_restore=False,
|
||||
restore_database=True,
|
||||
restore_homeassistant=True,
|
||||
),
|
||||
),
|
||||
pytest.raises(tarfile.FilterError),
|
||||
):
|
||||
backup_restore.restore_backup(tmp_path.as_posix())
|
||||
|
||||
result = restore_result_file_content(tmp_path)
|
||||
assert result is not None
|
||||
assert result["success"] is False
|
||||
assert result["error_type"] in {"AbsolutePathError", "OutsideDestinationError"}
|
||||
|
||||
|
||||
def test_restore_backup_rejects_absolute_symlink(tmp_path: Path) -> None:
|
||||
"""Test rejection of a symlink whose linkname escapes the destination.
|
||||
|
||||
A SYMTYPE entry followed by a regular file whose name traverses the
|
||||
symlink would otherwise land attacker-controlled bytes outside the
|
||||
extraction directory. The tar filter resolves the path after the
|
||||
symlink and rejects the entry.
|
||||
"""
|
||||
backup_file_path = tmp_path / "backups" / "test.tar"
|
||||
backup_file_path.parent.mkdir()
|
||||
|
||||
with tarfile.open(backup_file_path, "w") as tar:
|
||||
backup_json = json.dumps(
|
||||
{"homeassistant": {"version": "0.0.0"}, "compressed": False}
|
||||
).encode()
|
||||
info = tarfile.TarInfo(name="./backup.json")
|
||||
info.size = len(backup_json)
|
||||
tar.addfile(info, BytesIO(backup_json))
|
||||
|
||||
symlink = tarfile.TarInfo(name="pwn")
|
||||
symlink.type = tarfile.SYMTYPE
|
||||
symlink.linkname = "/tmp" # noqa: S108
|
||||
tar.addfile(symlink)
|
||||
|
||||
payload = b"pwned"
|
||||
evil = tarfile.TarInfo(name="pwn/ha_escape_target")
|
||||
evil.size = len(payload)
|
||||
tar.addfile(evil, BytesIO(payload))
|
||||
real_extractone = tarfile.TarFile._extract_one
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
@@ -472,11 +423,27 @@ def test_restore_backup_rejects_absolute_symlink(tmp_path: Path) -> None:
|
||||
restore_homeassistant=True,
|
||||
),
|
||||
),
|
||||
pytest.raises(tarfile.FilterError),
|
||||
mock.patch(
|
||||
"tarfile.TarFile._extract_one", autospec=True, wraps=real_extractone
|
||||
) as extractone_mock,
|
||||
):
|
||||
backup_restore.restore_backup(tmp_path.as_posix())
|
||||
assert backup_restore.restore_backup(tmp_path.as_posix()) is True
|
||||
|
||||
assert not Path("/tmp/ha_escape_target").exists() # noqa: S108
|
||||
# Check the unsafe files are not extracted, and that the safe files are extracted
|
||||
extracted_files = {call.args[1].name for call in extractone_mock.mock_calls}
|
||||
assert extracted_files == {
|
||||
"./backup.json", # From the outer tar
|
||||
"homeassistant.tar.gz", # From the outer tar
|
||||
".",
|
||||
"data",
|
||||
"data/home-assistant_v2.db",
|
||||
"data/home-assistant_v2.db-wal",
|
||||
}
|
||||
assert restore_result_file_content(tmp_path) == {
|
||||
"error": None,
|
||||
"error_type": None,
|
||||
"success": True,
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.parametrize(("remove_after_restore"), [True, False])
|
||||
|
||||
Reference in New Issue
Block a user