Compare commits

...

5 Commits

Author SHA1 Message Date
Erik
7ae3e0a4ee Set default securetar version in constant 2026-02-23 09:41:44 +01:00
Erik
01058c4284 Check for expected log message in util.validate_password tests 2026-02-23 09:39:23 +01:00
Erik
50623b4dfd Correct error handling in validate_password util 2026-02-23 09:32:15 +01:00
Erik
5084614b76 Update pyproject.toml 2026-02-17 08:49:41 +01:00
Erik
50edb21ec7 Bump securetar to 2026.2.0 2026-02-17 08:11:04 +01:00
13 changed files with 131 additions and 162 deletions

View File

@@ -4,7 +4,6 @@ from __future__ import annotations
from collections.abc import Iterable
from dataclasses import dataclass
import hashlib
import json
import logging
from pathlib import Path
@@ -40,17 +39,6 @@ class RestoreBackupFileContent:
restore_homeassistant: bool
def password_to_key(password: str) -> bytes:
"""Generate a AES Key from password.
Matches the implementation in supervisor.backups.utils.password_to_key.
"""
key: bytes = password.encode()
for _ in range(100):
key = hashlib.sha256(key).digest()
return key[:16]
def restore_backup_file_content(config_dir: Path) -> RestoreBackupFileContent | None:
"""Return the contents of the restore backup file."""
instruction_path = config_dir.joinpath(RESTORE_BACKUP_FILE)
@@ -96,15 +84,14 @@ def _extract_backup(
"""Extract the backup file to the config directory."""
with (
TemporaryDirectory() as tempdir,
securetar.SecureTarFile(
securetar.SecureTarArchive(
restore_content.backup_file_path,
gzip=False,
mode="r",
) as ostf,
):
ostf.extractall(
ostf.tar.extractall(
path=Path(tempdir, "extracted"),
members=securetar.secure_path(ostf),
members=securetar.secure_path(ostf.tar),
filter="fully_trusted",
)
backup_meta_file = Path(tempdir, "extracted", "backup.json")
@@ -126,10 +113,7 @@ def _extract_backup(
f"homeassistant.tar{'.gz' if backup_meta['compressed'] else ''}",
),
gzip=backup_meta["compressed"],
key=password_to_key(restore_content.password)
if restore_content.password is not None
else None,
mode="r",
password=restore_content.password,
) as istf:
istf.extractall(
path=Path(tempdir, "homeassistant"),

View File

@@ -33,3 +33,5 @@ EXCLUDE_DATABASE_FROM_BACKUP = [
"home-assistant_v2.db",
"home-assistant_v2.db-wal",
]
SECURETAR_CREATE_VERSION = 2

View File

@@ -20,13 +20,9 @@ import time
from typing import IO, TYPE_CHECKING, Any, Protocol, TypedDict, cast
import aiohttp
from securetar import SecureTarFile, atomic_contents_add
from securetar import SecureTarArchive, atomic_contents_add
from homeassistant.backup_restore import (
RESTORE_BACKUP_FILE,
RESTORE_BACKUP_RESULT_FILE,
password_to_key,
)
from homeassistant.backup_restore import RESTORE_BACKUP_FILE, RESTORE_BACKUP_RESULT_FILE
from homeassistant.const import __version__ as HAVERSION
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import (
@@ -60,6 +56,7 @@ from .const import (
EXCLUDE_DATABASE_FROM_BACKUP,
EXCLUDE_FROM_BACKUP,
LOGGER,
SECURETAR_CREATE_VERSION,
)
from .models import (
AddonInfo,
@@ -1858,20 +1855,22 @@ class CoreBackupReaderWriter(BackupReaderWriter):
return False
outer_secure_tarfile = SecureTarFile(
tar_file_path, "w", gzip=False, bufsize=BUF_SIZE
)
with outer_secure_tarfile as outer_secure_tarfile_tarfile:
with SecureTarArchive(
tar_file_path,
"w",
bufsize=BUF_SIZE,
create_version=SECURETAR_CREATE_VERSION,
password=password,
) as outer_secure_tarfile:
raw_bytes = json_bytes(backup_data)
fileobj = io.BytesIO(raw_bytes)
tar_info = tarfile.TarInfo(name="./backup.json")
tar_info.size = len(raw_bytes)
tar_info.mtime = int(time.time())
outer_secure_tarfile_tarfile.addfile(tar_info, fileobj=fileobj)
with outer_secure_tarfile.create_inner_tar(
outer_secure_tarfile.tar.addfile(tar_info, fileobj=fileobj)
with outer_secure_tarfile.create_tar(
"./homeassistant.tar.gz",
gzip=True,
key=password_to_key(password) if password is not None else None,
) as core_tar:
atomic_contents_add(
tar_file=core_tar,

View File

@@ -8,6 +8,6 @@
"integration_type": "service",
"iot_class": "calculated",
"quality_scale": "internal",
"requirements": ["cronsim==2.7", "securetar==2025.2.1"],
"requirements": ["cronsim==2.7", "securetar==2026.2.0"],
"single_config_entry": true
}

View File

@@ -8,7 +8,6 @@ import copy
from dataclasses import dataclass, replace
from io import BytesIO
import json
import os
from pathlib import Path, PurePath
from queue import SimpleQueue
import tarfile
@@ -16,9 +15,14 @@ import threading
from typing import IO, Any, cast
import aiohttp
from securetar import SecureTarError, SecureTarFile, SecureTarReadError
from securetar import (
SecureTarArchive,
SecureTarError,
SecureTarFile,
SecureTarReadError,
SecureTarRootKeyContext,
)
from homeassistant.backup_restore import password_to_key
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.util import dt as dt_util
@@ -29,7 +33,7 @@ from homeassistant.util.async_iterator import (
)
from homeassistant.util.json import JsonObjectType, json_loads_object
from .const import BUF_SIZE, LOGGER
from .const import BUF_SIZE, LOGGER, SECURETAR_CREATE_VERSION
from .models import AddonInfo, AgentBackup, Folder
@@ -132,17 +136,23 @@ def suggested_filename(backup: AgentBackup) -> str:
def validate_password(path: Path, password: str | None) -> bool:
"""Validate the password."""
with tarfile.open(path, "r:", bufsize=BUF_SIZE) as backup_file:
"""Validate the password.
This assumes every inner tar is encrypted with the same secure tar version and
same password.
"""
with SecureTarArchive(
path, "r", bufsize=BUF_SIZE, password=password
) as backup_file:
compressed = False
ha_tar_name = "homeassistant.tar"
try:
ha_tar = backup_file.extractfile(ha_tar_name)
ha_tar = backup_file.tar.extractfile(ha_tar_name)
except KeyError:
compressed = True
ha_tar_name = "homeassistant.tar.gz"
try:
ha_tar = backup_file.extractfile(ha_tar_name)
ha_tar = backup_file.tar.extractfile(ha_tar_name)
except KeyError:
LOGGER.error("No homeassistant.tar or homeassistant.tar.gz found")
return False
@@ -150,13 +160,12 @@ def validate_password(path: Path, password: str | None) -> bool:
with SecureTarFile(
path, # Not used
gzip=compressed,
key=password_to_key(password) if password is not None else None,
mode="r",
password=password,
fileobj=ha_tar,
):
# If we can read the tar file, the password is correct
return True
except tarfile.ReadError:
except tarfile.ReadError, SecureTarReadError:
LOGGER.debug("Invalid password")
return False
except Exception: # noqa: BLE001
@@ -168,22 +177,23 @@ def validate_password_stream(
input_stream: IO[bytes],
password: str | None,
) -> None:
"""Decrypt a backup."""
with (
tarfile.open(fileobj=input_stream, mode="r|", bufsize=BUF_SIZE) as input_tar,
):
for obj in input_tar:
"""Validate the password.
This assumes every inner tar is encrypted with the same secure tar version and
same password.
"""
with SecureTarArchive(
fileobj=input_stream,
mode="r",
bufsize=BUF_SIZE,
streaming=True,
password=password,
) as input_archive:
for obj in input_archive.tar:
if not obj.name.endswith((".tar", ".tgz", ".tar.gz")):
continue
istf = SecureTarFile(
None, # Not used
gzip=False,
key=password_to_key(password) if password is not None else None,
mode="r",
fileobj=input_tar.extractfile(obj),
)
with istf.decrypt(obj) as decrypted:
if istf.securetar_header.plaintext_size is None:
with input_archive.extract_tar(obj) as decrypted:
if decrypted.plaintext_size is None:
raise UnsupportedSecureTarVersion
try:
decrypted.read(1) # Read a single byte to trigger the decryption
@@ -212,21 +222,25 @@ def decrypt_backup(
password: str | None,
on_done: Callable[[Exception | None], None],
minimum_size: int,
nonces: NonceGenerator,
key_context: SecureTarRootKeyContext,
) -> None:
"""Decrypt a backup."""
error: Exception | None = None
try:
try:
with (
tarfile.open(
fileobj=input_stream, mode="r|", bufsize=BUF_SIZE
) as input_tar,
SecureTarArchive(
fileobj=input_stream,
mode="r",
bufsize=BUF_SIZE,
streaming=True,
password=password,
) as input_archive,
tarfile.open(
fileobj=output_stream, mode="w|", bufsize=BUF_SIZE
) as output_tar,
):
_decrypt_backup(backup, input_tar, output_tar, password)
_decrypt_backup(backup, input_archive, output_tar)
except (DecryptError, SecureTarError, tarfile.TarError) as err:
LOGGER.warning("Error decrypting backup: %s", err)
error = err
@@ -248,19 +262,18 @@ def decrypt_backup(
def _decrypt_backup(
backup: AgentBackup,
input_tar: tarfile.TarFile,
input_archive: SecureTarArchive,
output_tar: tarfile.TarFile,
password: str | None,
) -> None:
"""Decrypt a backup."""
expected_archives = _get_expected_archives(backup)
for obj in input_tar:
for obj in input_archive.tar:
# We compare with PurePath to avoid issues with different path separators,
# for example when backup.json is added as "./backup.json"
object_path = PurePath(obj.name)
if object_path == PurePath("backup.json"):
# Rewrite the backup.json file to indicate that the backup is decrypted
if not (reader := input_tar.extractfile(obj)):
if not (reader := input_archive.tar.extractfile(obj)):
raise DecryptError
metadata = json_loads_object(reader.read())
metadata["protected"] = False
@@ -272,21 +285,15 @@ def _decrypt_backup(
prefix, _, suffix = object_path.name.partition(".")
if suffix not in ("tar", "tgz", "tar.gz"):
LOGGER.debug("Unknown file %s will not be decrypted", obj.name)
output_tar.addfile(obj, input_tar.extractfile(obj))
output_tar.addfile(obj, input_archive.tar.extractfile(obj))
continue
if prefix not in expected_archives:
LOGGER.debug("Unknown inner tar file %s will not be decrypted", obj.name)
output_tar.addfile(obj, input_tar.extractfile(obj))
output_tar.addfile(obj, input_archive.tar.extractfile(obj))
continue
istf = SecureTarFile(
None, # Not used
gzip=False,
key=password_to_key(password) if password is not None else None,
mode="r",
fileobj=input_tar.extractfile(obj),
)
with istf.decrypt(obj) as decrypted:
if (plaintext_size := istf.securetar_header.plaintext_size) is None:
with input_archive.extract_tar(obj) as decrypted:
# Guard against SecureTar v1 which doesn't store plaintext size
if (plaintext_size := decrypted.plaintext_size) is None:
raise UnsupportedSecureTarVersion
decrypted_obj = copy.deepcopy(obj)
decrypted_obj.size = plaintext_size
@@ -300,7 +307,7 @@ def encrypt_backup(
password: str | None,
on_done: Callable[[Exception | None], None],
minimum_size: int,
nonces: NonceGenerator,
key_context: SecureTarRootKeyContext,
) -> None:
"""Encrypt a backup."""
error: Exception | None = None
@@ -310,11 +317,16 @@ def encrypt_backup(
tarfile.open(
fileobj=input_stream, mode="r|", bufsize=BUF_SIZE
) as input_tar,
tarfile.open(
fileobj=output_stream, mode="w|", bufsize=BUF_SIZE
) as output_tar,
SecureTarArchive(
fileobj=output_stream,
mode="w",
bufsize=BUF_SIZE,
streaming=True,
root_key_context=key_context,
create_version=SECURETAR_CREATE_VERSION,
) as output_archive,
):
_encrypt_backup(backup, input_tar, output_tar, password, nonces)
_encrypt_backup(backup, input_tar, output_archive)
except (EncryptError, SecureTarError, tarfile.TarError) as err:
LOGGER.warning("Error encrypting backup: %s", err)
error = err
@@ -337,9 +349,7 @@ def encrypt_backup(
def _encrypt_backup(
backup: AgentBackup,
input_tar: tarfile.TarFile,
output_tar: tarfile.TarFile,
password: str | None,
nonces: NonceGenerator,
output_archive: SecureTarArchive,
) -> None:
"""Encrypt a backup."""
inner_tar_idx = 0
@@ -357,29 +367,20 @@ def _encrypt_backup(
updated_metadata_b = json.dumps(metadata).encode()
metadata_obj = copy.deepcopy(obj)
metadata_obj.size = len(updated_metadata_b)
output_tar.addfile(metadata_obj, BytesIO(updated_metadata_b))
output_archive.tar.addfile(metadata_obj, BytesIO(updated_metadata_b))
continue
prefix, _, suffix = object_path.name.partition(".")
if suffix not in ("tar", "tgz", "tar.gz"):
LOGGER.debug("Unknown file %s will not be encrypted", obj.name)
output_tar.addfile(obj, input_tar.extractfile(obj))
output_archive.tar.addfile(obj, input_tar.extractfile(obj))
continue
if prefix not in expected_archives:
LOGGER.debug("Unknown inner tar file %s will not be encrypted", obj.name)
continue
istf = SecureTarFile(
None, # Not used
gzip=False,
key=password_to_key(password) if password is not None else None,
mode="r",
fileobj=input_tar.extractfile(obj),
nonce=nonces.get(inner_tar_idx),
output_archive.import_tar(
input_tar.extractfile(obj), obj, derived_key_id=inner_tar_idx
)
inner_tar_idx += 1
with istf.encrypt(obj) as encrypted:
encrypted_obj = copy.deepcopy(obj)
encrypted_obj.size = encrypted.encrypted_size
output_tar.addfile(encrypted_obj, encrypted)
@dataclass(kw_only=True)
@@ -391,21 +392,6 @@ class _CipherWorkerStatus:
writer: AsyncIteratorWriter
class NonceGenerator:
"""Generate nonces for encryption."""
def __init__(self) -> None:
"""Initialize the generator."""
self._nonces: dict[int, bytes] = {}
def get(self, index: int) -> bytes:
"""Get a nonce for the given index."""
if index not in self._nonces:
# Generate a new nonce for the given index
self._nonces[index] = os.urandom(16)
return self._nonces[index]
class _CipherBackupStreamer:
"""Encrypt or decrypt a backup."""
@@ -417,7 +403,7 @@ class _CipherBackupStreamer:
str | None,
Callable[[Exception | None], None],
int,
NonceGenerator,
SecureTarRootKeyContext,
],
None,
]
@@ -435,7 +421,7 @@ class _CipherBackupStreamer:
self._hass = hass
self._open_stream = open_stream
self._password = password
self._nonces = NonceGenerator()
self._key_context = SecureTarRootKeyContext(password)
def size(self) -> int:
"""Return the maximum size of the decrypted or encrypted backup."""
@@ -466,7 +452,7 @@ class _CipherBackupStreamer:
self._password,
on_done,
self.size(),
self._nonces,
self._key_context,
],
)
worker_status = _CipherWorkerStatus(

View File

@@ -64,7 +64,7 @@ python-slugify==8.0.4
PyTurboJPEG==1.8.0
PyYAML==6.0.3
requests==2.32.5
securetar==2025.2.1
securetar==2026.2.0
SQLAlchemy==2.0.41
standard-aifc==3.13.0
standard-telnetlib==3.13.0

View File

@@ -71,7 +71,7 @@ dependencies = [
"python-slugify==8.0.4",
"PyYAML==6.0.3",
"requests==2.32.5",
"securetar==2025.2.1",
"securetar==2026.2.0",
"SQLAlchemy==2.0.41",
"standard-aifc==3.13.0",
"standard-telnetlib==3.13.0",

2
requirements.txt generated
View File

@@ -47,7 +47,7 @@ python-slugify==8.0.4
PyTurboJPEG==1.8.0
PyYAML==6.0.3
requests==2.32.5
securetar==2025.2.1
securetar==2026.2.0
SQLAlchemy==2.0.41
standard-aifc==3.13.0
standard-telnetlib==3.13.0

2
requirements_all.txt generated
View File

@@ -2852,7 +2852,7 @@ screenlogicpy==0.10.2
scsgate==0.1.0
# homeassistant.components.backup
securetar==2025.2.1
securetar==2026.2.0
# homeassistant.components.sendgrid
sendgrid==6.8.2

View File

@@ -2403,7 +2403,7 @@ satel-integra==0.3.7
screenlogicpy==0.10.2
# homeassistant.components.backup
securetar==2025.2.1
securetar==2026.2.0
# homeassistant.components.emulated_kasa
# homeassistant.components.sense

View File

@@ -24,7 +24,7 @@ from unittest.mock import (
from freezegun.api import FrozenDateTimeFactory
import pytest
from securetar import SecureTarFile
from securetar import SecureTarArchive, SecureTarFile
from homeassistant.components.backup import (
DOMAIN,
@@ -49,7 +49,6 @@ from homeassistant.components.backup.manager import (
RestoreBackupState,
WrittenBackup,
)
from homeassistant.components.backup.util import password_to_key
from homeassistant.const import EVENT_HOMEASSISTANT_START, EVENT_HOMEASSISTANT_STARTED
from homeassistant.core import CoreState, HomeAssistant
from homeassistant.exceptions import HomeAssistantError
@@ -671,8 +670,7 @@ async def test_initiate_backup(
with SecureTarFile(
fileobj=core_tar_io,
gzip=True,
key=password_to_key(password) if password is not None else None,
mode="r",
password=password,
) as core_tar:
assert set(core_tar.getnames()) == expected_files
@@ -3312,7 +3310,7 @@ async def test_restore_backup_file_error(
@pytest.mark.usefixtures("mock_ha_version")
@pytest.mark.parametrize(
("commands", "agent_ids", "password", "protected_backup", "inner_tar_key"),
("commands", "agent_ids", "password", "protected_backup", "inner_tar_password"),
[
(
[],
@@ -3326,7 +3324,7 @@ async def test_restore_backup_file_error(
["backup.local", "test.remote"],
"hunter2",
{"backup.local": True, "test.remote": True},
password_to_key("hunter2"),
"hunter2",
),
(
[
@@ -3371,7 +3369,7 @@ async def test_restore_backup_file_error(
["backup.local", "test.remote"],
"hunter2",
{"backup.local": True, "test.remote": False},
password_to_key("hunter2"), # Local agent is protected
"hunter2", # Local agent is protected
),
(
[
@@ -3386,7 +3384,7 @@ async def test_restore_backup_file_error(
["backup.local", "test.remote"],
"hunter2",
{"backup.local": True, "test.remote": True},
password_to_key("hunter2"),
"hunter2",
),
(
[
@@ -3416,7 +3414,7 @@ async def test_restore_backup_file_error(
["test.remote"],
"hunter2",
{"test.remote": True},
password_to_key("hunter2"),
"hunter2",
),
(
[
@@ -3431,7 +3429,7 @@ async def test_restore_backup_file_error(
["test.remote"],
"hunter2",
{"test.remote": False},
password_to_key("hunter2"), # Temporary backup protected when password set
"hunter2", # Temporary backup protected when password set
),
],
)
@@ -3443,7 +3441,7 @@ async def test_initiate_backup_per_agent_encryption(
agent_ids: list[str],
password: str | None,
protected_backup: dict[str, bool],
inner_tar_key: bytes | None,
inner_tar_password: str | None,
) -> None:
"""Test generate backup where encryption is selectively set on agents."""
await setup_backup_integration(hass, remote_agents=["test.remote"])
@@ -3479,7 +3477,11 @@ async def test_initiate_backup_per_agent_encryption(
with (
patch("pathlib.Path.open", mock_open(read_data=b"test")),
patch("securetar.SecureTarFile.create_inner_tar") as mock_create_inner_tar,
patch(
"securetar.SecureTarArchive.__init__",
autospec=True,
wraps=SecureTarArchive.__init__,
) as mock_secure_tar_archive,
):
await ws_client.send_json_auto_id(
{
@@ -3504,7 +3506,9 @@ async def test_initiate_backup_per_agent_encryption(
await hass.async_block_till_done()
mock_create_inner_tar.assert_called_once_with(ANY, gzip=True, key=inner_tar_key)
assert mock_secure_tar_archive.mock_calls[0] == call(
ANY, ANY, "w", bufsize=4194304, create_version=2, password=inner_tar_password
)
result = await ws_client.receive_json()
assert result["event"] == {

View File

@@ -160,15 +160,25 @@ def test_validate_password(
@pytest.mark.parametrize("password", [None, "hunter2"])
@pytest.mark.parametrize("secure_tar_side_effect", [tarfile.ReadError, Exception])
@pytest.mark.parametrize(
("secure_tar_side_effect", "expected_message"),
[
(tarfile.ReadError, "Invalid password"),
(securetar.SecureTarReadError, "Invalid password"),
(Exception, "Unexpected error validating password"),
],
)
def test_validate_password_with_error(
password: str | None, secure_tar_side_effect: type[Exception]
password: str | None,
secure_tar_side_effect: type[Exception],
expected_message: str,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test validating a password."""
mock_path = Mock()
with (
patch("homeassistant.components.backup.util.tarfile.open"),
patch("securetar.tarfile.open"),
patch(
"homeassistant.components.backup.util.SecureTarFile",
) as mock_secure_tar,
@@ -176,19 +186,21 @@ def test_validate_password_with_error(
mock_secure_tar.return_value.__enter__.side_effect = secure_tar_side_effect
assert validate_password(mock_path, password) is False
assert expected_message in caplog.text
def test_validate_password_no_homeassistant() -> None:
def test_validate_password_no_homeassistant(caplog: pytest.LogCaptureFixture) -> None:
"""Test validating a password."""
mock_path = Mock()
with (
patch("homeassistant.components.backup.util.tarfile.open") as mock_open_tar,
patch("securetar.tarfile.open") as mock_open_tar,
):
mock_open_tar.return_value.__enter__.return_value.extractfile.side_effect = (
KeyError
)
mock_open_tar.return_value.extractfile.side_effect = KeyError
assert validate_password(mock_path, "hunter2") is False
assert "No homeassistant.tar or homeassistant.tar.gz found" in caplog.text
@pytest.mark.parametrize(
("addons", "padding_size", "decrypted_backup"),

View File

@@ -463,21 +463,3 @@ def test_remove_backup_file_after_restore(
"error_type": None,
"success": True,
}
@pytest.mark.parametrize(
("password", "expected"),
[
("test", b"\xf0\x9b\xb9\x1f\xdc,\xff\xd5x\xd6\xd6\x8fz\x19.\x0f"),
("lorem ipsum...", b"#\xe0\xfc\xe0\xdb?_\x1f,$\rQ\xf4\xf5\xd8\xfb"),
],
)
def test_pw_to_key(password: str | None, expected: bytes | None) -> None:
"""Test password to key conversion."""
assert backup_restore.password_to_key(password) == expected
def test_pw_to_key_none() -> None:
"""Test password to key conversion."""
with pytest.raises(AttributeError):
backup_restore.password_to_key(None)