Compare commits

..

3 Commits

Author SHA1 Message Date
dependabot[bot] 7236b534a8 Bump github/codeql-action from 4.35.4 to 4.35.5
Bumps [github/codeql-action](https://github.com/github/codeql-action) from 4.35.4 to 4.35.5.
- [Release notes](https://github.com/github/codeql-action/releases)
- [Changelog](https://github.com/github/codeql-action/blob/main/CHANGELOG.md)
- [Commits](https://github.com/github/codeql-action/compare/68bde559dea0fdcac2102bfdf6230c5f70eb485e...9e0d7b8d25671d64c341c19c0152d693099fb5ba)

---
updated-dependencies:
- dependency-name: github/codeql-action
  dependency-version: 4.35.5
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2026-05-22 06:04:35 +00:00
Max Michels cb54fd4921 Replace duplicate constants with homeassistant.const imports (#171809) 2026-05-22 07:57:08 +02:00
Max Michels b391fc61ea Replace duplicate constants with homeassistant.const imports (#171808) 2026-05-22 07:56:29 +02:00
7 changed files with 36 additions and 683 deletions
+1 -12
View File
@@ -917,23 +917,12 @@ jobs:
key: >-
${{ runner.os }}-${{ runner.arch }}-${{ steps.python.outputs.python-version }}-${{
needs.info.outputs.python_cache_key }}
- name: Restore pytest test counts cache
uses: actions/cache@27d5ce7f107fe9357f9df03efb73ab90386fccae # v5.0.5
with:
path: pytest_test_counts.json
key: >-
pytest-counts-${{ runner.os }}-${{ runner.arch }}-${{
steps.python.outputs.python-version }}-${{ github.sha }}
restore-keys: |
pytest-counts-${{ runner.os }}-${{ runner.arch }}-${{ steps.python.outputs.python-version }}-
- name: Run split_tests.py
env:
TEST_GROUP_COUNT: ${{ needs.info.outputs.test_group_count }}
run: |
. venv/bin/activate
python -m script.split_tests \
--cache pytest_test_counts.json \
${TEST_GROUP_COUNT} tests
python -m script.split_tests ${TEST_GROUP_COUNT} tests
- name: Upload pytest_buckets
uses: actions/upload-artifact@043fb46d1a93c77aae656e7c1c64a875d1fc6a0a # v7.0.1
with:
+2 -2
View File
@@ -28,11 +28,11 @@ jobs:
persist-credentials: false
- name: Initialize CodeQL
uses: github/codeql-action/init@68bde559dea0fdcac2102bfdf6230c5f70eb485e # v4.35.4
uses: github/codeql-action/init@9e0d7b8d25671d64c341c19c0152d693099fb5ba # v4.35.5
with:
languages: python
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@68bde559dea0fdcac2102bfdf6230c5f70eb485e # v4.35.4
uses: github/codeql-action/analyze@9e0d7b8d25671d64c341c19c0152d693099fb5ba # v4.35.5
with:
category: "/language:python"
+1 -2
View File
@@ -15,6 +15,7 @@ from homeassistant.components.sensor import (
SensorEntity,
)
from homeassistant.const import (
ATTR_MODEL,
CONF_MAC,
CONF_NAME,
EVENT_HOMEASSISTANT_STOP,
@@ -30,8 +31,6 @@ from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
_LOGGER = logging.getLogger(__name__)
ATTR_DEVICE = "device"
# pylint: disable-next=home-assistant-duplicate-const
ATTR_MODEL = "model"
BLE_TEMP_HANDLE = 0x24
BLE_TEMP_UUID = "0000ff92-0000-1000-8000-00805f9b34fb"
-4
View File
@@ -38,12 +38,8 @@ PLATFORMS = [
Platform.SENSOR,
]
# pylint: disable-next=home-assistant-duplicate-const
SERVICE_LOCK = "lock"
SERVICE_REMOTE_START = "remote_start"
SERVICE_REMOTE_STOP = "remote_stop"
# pylint: disable-next=home-assistant-duplicate-const
SERVICE_UNLOCK = "unlock"
SERVICE_UNLOCK_SPECIFIC_DOOR = "unlock_specific_door"
ATTR_DOOR = "door"
@@ -4,9 +4,10 @@ import logging
from subarulink.exceptions import SubaruException
from homeassistant.const import SERVICE_UNLOCK
from homeassistant.exceptions import HomeAssistantError
from .const import SERVICE_REMOTE_START, SERVICE_UNLOCK, VEHICLE_NAME, VEHICLE_VIN
from .const import SERVICE_REMOTE_START, VEHICLE_NAME, VEHICLE_VIN
_LOGGER = logging.getLogger(__name__)
+30 -279
View File
@@ -4,8 +4,6 @@
import argparse
from concurrent.futures import ProcessPoolExecutor
from dataclasses import dataclass, field
import hashlib
import json
from math import ceil
import os
from pathlib import Path
@@ -17,15 +15,13 @@ from typing import Final
# place to subdivide to keep each pytest invocation roughly equal in size.
_FAN_OUT_DIRS: Final = frozenset({"components"})
# Cache file format version; bump on any incompatible schema change so old
# caches are ignored rather than misread.
_CACHE_VERSION: Final = 1
class Bucket:
"""Class to hold bucket."""
def __init__(self) -> None:
def __init__(
self,
):
"""Initialize bucket."""
self.total_tests = 0
self._paths: list[str] = []
@@ -87,7 +83,7 @@ class BucketHolder:
def create_ouput_file(self) -> None:
"""Create output file."""
with Path("pytest_buckets.txt").open("w", encoding="utf-8") as file:
with Path("pytest_buckets.txt").open("w") as file:
for idx, bucket in enumerate(self._buckets):
print(f"Bucket {idx + 1} has {bucket.total_tests} tests")
file.write(bucket.get_paths_line())
@@ -220,283 +216,44 @@ def _enumerate_batch_paths(path: Path) -> list[Path]:
return paths
def _hash_file(path: Path) -> str:
"""Return a short content hash for ``path``."""
return hashlib.sha256(path.read_bytes()).hexdigest()[:16]
def collect_tests(path: Path) -> TestFolder:
"""Collect all tests."""
batch_paths = _enumerate_batch_paths(path)
if not batch_paths:
print(f"No eligible test paths found under {path}")
sys.exit(1)
workers = min(len(batch_paths), os.cpu_count() or 1) or 1
# Round-robin chunking keeps batches roughly balanced when path
# ordering correlates with test size.
batches = [batch_paths[i::workers] for i in range(workers)]
def _walk_test_tree(root: Path) -> tuple[list[Path], list[Path]]:
"""Walk ``root`` once and return (test files, conftest files).
Uses ``os.walk`` rather than ``Path.rglob`` because it's ~2x faster on
a 5000-file tree, and we prune hidden/dunder subdirectories instead of
visiting them. Doing both walks in one pass keeps total tree I/O down.
"""
if root.is_file():
if root.name.startswith("test_") and root.suffix == ".py":
return [root], []
return [], []
test_files: list[Path] = []
conftests: list[Path] = []
for dirpath, dirnames, filenames in os.walk(root):
dirnames[:] = [d for d in dirnames if not d.startswith((".", "_"))]
base = Path(dirpath)
for name in filenames:
if name == "conftest.py":
conftests.append(base / name)
elif name.startswith("test_") and name.endswith(".py"):
test_files.append(base / name)
test_files.sort()
conftests.sort()
return test_files, conftests
def _compute_conftest_hash(root: Path, conftests: list[Path]) -> str:
"""Return a hash that changes whenever any conftest.py under ``root`` changes.
Any change to a conftest invalidates the entire test-count cache. This is
coarse but safe: conftests can change fixture parametrization in ways the
cache cannot otherwise detect, so we just re-collect everything.
"""
digest = hashlib.sha256()
for conftest in conftests:
digest.update(str(conftest.relative_to(root)).encode())
digest.update(b"\0")
digest.update(conftest.read_bytes())
digest.update(b"\0")
return digest.hexdigest()
@dataclass
class _CacheEntry:
"""Cached test count for a single file."""
hash: str
count: int
@dataclass
class _Cache:
"""Mapping of test file path → cached entry, plus invalidation key."""
conftest_hash: str
entries: dict[str, _CacheEntry]
@classmethod
def empty(cls, conftest_hash: str = "") -> _Cache:
"""Return a new empty cache."""
return cls(conftest_hash=conftest_hash, entries={})
@classmethod
def load(cls, path: Path, current_conftest_hash: str) -> _Cache:
"""Load cache from ``path`` and invalidate it on schema/conftest drift.
Any failure (missing file, bad JSON, version drift, conftest drift)
returns an empty cache so the script just falls back to a full
collection. This is the self-healing path.
"""
try:
raw = json.loads(path.read_bytes())
except OSError, ValueError:
return cls.empty(current_conftest_hash)
if not isinstance(raw, dict) or raw.get("version") != _CACHE_VERSION:
return cls.empty(current_conftest_hash)
if raw.get("conftest_hash") != current_conftest_hash:
return cls.empty(current_conftest_hash)
files = raw.get("files")
if not isinstance(files, dict):
return cls.empty(current_conftest_hash)
entries: dict[str, _CacheEntry] = {}
for key, value in files.items():
if (
not isinstance(value, dict)
or not isinstance(value.get("hash"), str)
or not isinstance(value.get("count"), int)
):
# Skip malformed entries instead of discarding the whole cache.
continue
entries[key] = _CacheEntry(hash=value["hash"], count=value["count"])
return cls(conftest_hash=current_conftest_hash, entries=entries)
def save(self, path: Path) -> None:
"""Write the cache to ``path``."""
path.write_text(
json.dumps(
{
"version": _CACHE_VERSION,
"conftest_hash": self.conftest_hash,
"files": {
key: {"hash": entry.hash, "count": entry.count}
for key, entry in sorted(self.entries.items())
},
},
indent=2,
ensure_ascii=False,
)
+ "\n",
encoding="utf-8",
)
def _resolve_from_cache(
test_files: list[Path],
cache: _Cache,
root: Path,
) -> tuple[dict[Path, int], list[Path]]:
"""Split ``test_files`` into ``(cached_counts, needs_collection)``.
A file is served from cache when its content hash matches what we
previously stored; otherwise it is queued for re-collection.
"""
cached: dict[Path, int] = {}
misses: list[Path] = []
for file in test_files:
key = str(file.relative_to(root))
entry = cache.entries.get(key)
if entry is None:
misses.append(file)
continue
if entry.hash != _hash_file(file):
misses.append(file)
continue
cached[file] = entry.count
return cached, misses
def _run_collect_batches(paths: list[Path]) -> list[tuple[str, str, int]]:
"""Run pytest --collect-only across ``paths`` using a process pool."""
workers = min(len(paths), os.cpu_count() or 1) or 1
batches = [paths[i::workers] for i in range(workers)]
if workers == 1:
return [_collect_batch(batches[0])]
with ProcessPoolExecutor(max_workers=workers) as executor:
return list(executor.map(_collect_batch, batches))
results = [_collect_batch(batches[0])]
else:
with ProcessPoolExecutor(max_workers=workers) as executor:
results = list(executor.map(_collect_batch, batches))
def _parse_collect_output(stdout: str) -> dict[Path, int]:
"""Parse ``pytest --collect-only -qq`` output into ``{path: count}``."""
counts: dict[Path, int] = {}
for line in stdout.splitlines():
if not line.strip():
continue
file_path, _, total_tests = line.partition(": ")
if not file_path or not total_tests:
raise ValueError(f"Unexpected line: {line}")
counts[Path(file_path)] = int(total_tests)
return counts
def _run_pytest_collect(paths: list[Path]) -> dict[Path, int]:
"""Run pytest --collect-only across ``paths`` and parse the output."""
counts: dict[Path, int] = {}
for stdout, stderr, returncode in _run_collect_batches(paths):
folder = TestFolder(path)
for stdout, stderr, returncode in results:
if returncode != 0:
print("Failed to collect tests:")
print(stderr)
print(stdout)
sys.exit(1)
try:
counts.update(_parse_collect_output(stdout))
except ValueError as err:
print(err)
sys.exit(1)
return counts
for line in stdout.splitlines():
if not line.strip():
continue
file_path, _, total_tests = line.partition(": ")
if not file_path or not total_tests:
print(f"Unexpected line: {line}")
sys.exit(1)
def _collect_tests_uncached(path: Path) -> TestFolder:
"""Collect tests by handing pytest the top-level directories.
Skips the tree walk and per-file hashing; used when no cache file is
requested so the script behaves like the pre-cache implementation.
"""
batch_paths = _enumerate_batch_paths(path)
if not batch_paths:
print(f"No eligible test paths found under {path}")
sys.exit(1)
folder = TestFolder(path)
for file_path, total_tests in _run_pytest_collect(batch_paths).items():
folder.add_test_file(TestFile(total_tests, file_path))
return folder
def _collect_tests_cached(path: Path, cache_path: Path) -> TestFolder:
"""Collect tests using an on-disk cache for incremental updates."""
all_test_files, conftests = _walk_test_tree(path)
if not all_test_files:
print(f"No eligible test paths found under {path}")
sys.exit(1)
conftest_hash = _compute_conftest_hash(path, conftests)
cache = _Cache.load(cache_path, conftest_hash)
cached_counts, missing = _resolve_from_cache(all_test_files, cache, path)
print(
f"Cache: {len(cached_counts)} hits / {len(missing)} misses"
f" / {len(all_test_files)} total"
)
new_counts: dict[Path, int] = {}
if missing:
# On a full cold-cache run, hand pytest the top-level directories
# instead of 5000+ individual file paths: pytest walks dirs much
# faster than it resolves each file argument. Once any cache hits
# exist, use file-level collection so we only re-collect the diff.
if not cached_counts:
collect_paths = _enumerate_batch_paths(path)
else:
collect_paths = missing
new_counts = _run_pytest_collect(collect_paths)
counts: dict[Path, int] = {**cached_counts, **new_counts}
folder = TestFolder(path)
for file_path, total_tests in counts.items():
if total_tests == 0:
# Files with no collected tests (eg helper modules named
# test_init.py with no test functions) shouldn't enter
# bucketing, but we still cache them below as count=0 so
# they don't get re-collected next run.
continue
folder.add_test_file(TestFile(total_tests, file_path))
# Rebuild the cache from scratch on every run so deleted files are
# dropped and re-collected files get a refreshed hash.
missing_set = set(missing)
updated_entries: dict[str, _CacheEntry] = {}
for file in all_test_files:
if file in counts:
count = counts[file]
elif file in missing_set:
# We asked pytest about this file and got no count back,
# so it has no collectible tests; cache it as 0 to avoid
# repeating the work next run.
count = 0
else:
continue
updated_entries[str(file.relative_to(path))] = _CacheEntry(
hash=_hash_file(file), count=count
)
_Cache(conftest_hash=conftest_hash, entries=updated_entries).save(cache_path)
file = TestFile(int(total_tests), Path(file_path))
folder.add_test_file(file)
return folder
def collect_tests(path: Path, cache_path: Path | None = None) -> TestFolder:
"""Collect all tests, using an on-disk cache when ``cache_path`` is set."""
if cache_path is None:
return _collect_tests_uncached(path)
if path.is_file():
# The cache keys on conftest_hash, but a single file root has no
# ancestor conftests to walk and the hash would always be empty,
# which would let stale counts survive conftest edits. Skip the
# cache for the file-root case rather than silently mis-caching.
print(f"--cache ignored: {path} is a single file")
return _collect_tests_uncached(path)
return _collect_tests_cached(path, cache_path)
def main() -> None:
"""Execute script."""
parser = argparse.ArgumentParser(description="Split tests into n buckets.")
@@ -519,17 +276,11 @@ def main() -> None:
help="Path to the test files to split into buckets",
type=Path,
)
parser.add_argument(
"--cache",
help="Path to a JSON file used to cache per-file test counts",
type=Path,
default=None,
)
arguments = parser.parse_args()
print("Collecting tests...")
tests = collect_tests(arguments.path, arguments.cache)
tests = collect_tests(arguments.path)
tests_per_bucket = ceil(tests.total_tests / arguments.bucket_count)
bucket_holder = BucketHolder(tests_per_bucket, arguments.bucket_count)
-383
View File
@@ -1,383 +0,0 @@
"""Tests for the split_tests cache logic."""
import json
from pathlib import Path
from unittest.mock import patch
import pytest
from script import split_tests
@pytest.fixture
def tree(tmp_path: Path) -> Path:
"""Build a small test tree on disk.
Returns the root path containing one conftest, two integrations,
and an unrelated helper module that the splitter should ignore.
"""
(tmp_path / "conftest.py").write_text("# tests/conftest.py\n")
(tmp_path / "common.py").write_text("# helper module\n")
alpha_dir = tmp_path / "components" / "alpha"
alpha_dir.mkdir(parents=True)
(alpha_dir / "conftest.py").write_text("# alpha conftest\n")
(alpha_dir / "test_one.py").write_text("def test_a():\n pass\n")
(alpha_dir / "test_two.py").write_text("def test_b():\n pass\n")
beta_dir = tmp_path / "components" / "beta"
beta_dir.mkdir()
(beta_dir / "test_x.py").write_text("def test_x():\n pass\n")
return tmp_path
def test_iter_eligible_children_filters_helpers(tree: Path) -> None:
"""Helper files like conftest.py and common.py are not collection targets."""
children = split_tests._iter_eligible_children(tree)
names = {p.name for p in children}
assert "common.py" not in names
assert "conftest.py" not in names
# components/ is a dir, gets included.
assert "components" in names
def test_enumerate_batch_paths_fans_out_components(tree: Path) -> None:
"""tests/components fans out one level deeper into per-integration paths."""
paths = split_tests._enumerate_batch_paths(tree)
rel = {p.relative_to(tree).as_posix() for p in paths}
assert rel == {"components/beta", "components/alpha"}
def test_enumerate_batch_paths_for_single_file(tmp_path: Path) -> None:
"""A test file passed directly is returned as-is."""
file = tmp_path / "test_solo.py"
file.write_text("def test_x(): pass\n")
assert split_tests._enumerate_batch_paths(file) == [file]
def _conftest_hash_for(tree: Path) -> str:
"""Compute the conftest hash for ``tree`` (helper for the tests below)."""
_, conftests = split_tests._walk_test_tree(tree)
return split_tests._compute_conftest_hash(tree, conftests)
def test_compute_conftest_hash_changes_when_conftest_changes(tree: Path) -> None:
"""Editing any conftest changes the global cache key."""
before = _conftest_hash_for(tree)
(tree / "components" / "alpha" / "conftest.py").write_text("# changed\n")
after = _conftest_hash_for(tree)
assert before != after
def test_compute_conftest_hash_stable_for_non_conftest_changes(tree: Path) -> None:
"""Test-file edits do not invalidate the global cache key."""
before = _conftest_hash_for(tree)
(tree / "components" / "alpha" / "test_one.py").write_text(
"def test_a():\n pass\n\ndef test_c():\n pass\n"
)
after = _conftest_hash_for(tree)
assert before == after
def test_walk_test_tree_finds_tests_and_conftests(tree: Path) -> None:
"""The walker returns test files and conftest files but no helpers."""
test_files, conftests = split_tests._walk_test_tree(tree)
test_names = {p.name for p in test_files}
conftest_paths = {p.relative_to(tree).as_posix() for p in conftests}
assert test_names == {"test_one.py", "test_two.py", "test_x.py"}
assert conftest_paths == {"conftest.py", "components/alpha/conftest.py"}
def test_walk_test_tree_skips_hidden_and_dunder_dirs(tmp_path: Path) -> None:
"""Hidden/dunder directories are pruned from the walk."""
(tmp_path / "__pycache__").mkdir()
(tmp_path / "__pycache__" / "test_ghost.py").write_text("def test_g(): pass\n")
(tmp_path / ".hidden").mkdir()
(tmp_path / ".hidden" / "test_invisible.py").write_text("def test_h(): pass\n")
(tmp_path / "test_real.py").write_text("def test_r(): pass\n")
test_files, _ = split_tests._walk_test_tree(tmp_path)
assert {p.name for p in test_files} == {"test_real.py"}
def test_walk_test_tree_handles_single_file(tmp_path: Path) -> None:
"""Passing a single test file returns just that file."""
file = tmp_path / "test_solo.py"
file.write_text("def test_x(): pass\n")
assert split_tests._walk_test_tree(file) == ([file], [])
def test_collect_tests_skips_cache_for_single_file_root(tmp_path: Path) -> None:
"""A single-file root cannot validate conftest drift, so caching is disabled.
_walk_test_tree returns no conftests for a file root, which would make
the conftest_hash a constant — letting a stale entry survive a real
conftest change. Better to bypass the cache than mis-cache silently.
"""
cache_path = tmp_path / "cache.json"
file = tmp_path / "test_solo.py"
file.write_text("def test_x(): pass\n")
with (
patch.object(split_tests, "_collect_tests_uncached") as uncached,
patch.object(split_tests, "_collect_tests_cached") as cached,
):
split_tests.collect_tests(file, cache_path)
uncached.assert_called_once_with(file)
cached.assert_not_called()
assert not cache_path.exists()
def test_cache_roundtrip(tmp_path: Path) -> None:
"""A cache survives save → load when the conftest hash matches."""
cache_path = tmp_path / "cache.json"
cache = split_tests._Cache(
conftest_hash="abc",
entries={"tests/alpha/test_a.py": split_tests._CacheEntry(hash="h1", count=5)},
)
cache.save(cache_path)
loaded = split_tests._Cache.load(cache_path, "abc")
assert loaded.entries == cache.entries
assert loaded.conftest_hash == "abc"
def test_cache_load_missing_returns_empty(tmp_path: Path) -> None:
"""A missing cache file degrades gracefully to an empty cache."""
cache = split_tests._Cache.load(tmp_path / "missing.json", "abc")
assert cache.entries == {}
assert cache.conftest_hash == "abc"
def test_cache_load_invalid_json_returns_empty(tmp_path: Path) -> None:
"""Corrupt JSON is treated as a cache miss instead of crashing."""
path = tmp_path / "broken.json"
path.write_text("{not json")
cache = split_tests._Cache.load(path, "abc")
assert cache.entries == {}
def test_cache_load_wrong_version_returns_empty(tmp_path: Path) -> None:
"""An older cache schema is discarded rather than misread."""
path = tmp_path / "old.json"
path.write_text(json.dumps({"version": 0, "conftest_hash": "abc", "files": {}}))
cache = split_tests._Cache.load(path, "abc")
assert cache.entries == {}
def test_cache_load_conftest_drift_returns_empty(tmp_path: Path) -> None:
"""A conftest change invalidates the entire cached set."""
path = tmp_path / "cache.json"
path.write_text(
json.dumps(
{
"version": split_tests._CACHE_VERSION,
"conftest_hash": "old",
"files": {"test_a.py": {"hash": "h1", "count": 3}},
}
)
)
cache = split_tests._Cache.load(path, "new")
assert cache.entries == {}
def test_cache_load_drops_malformed_entries(tmp_path: Path) -> None:
"""Malformed per-file entries are skipped, valid ones are kept."""
path = tmp_path / "cache.json"
path.write_text(
json.dumps(
{
"version": split_tests._CACHE_VERSION,
"conftest_hash": "abc",
"files": {
"good.py": {"hash": "h1", "count": 3},
"bad_count.py": {"hash": "h2", "count": "three"},
"missing_hash.py": {"count": 4},
"not_dict.py": 5,
},
}
)
)
cache = split_tests._Cache.load(path, "abc")
assert set(cache.entries) == {"good.py"}
def test_resolve_from_cache_hits_and_misses(tree: Path) -> None:
"""Files with matching hashes are hits; edited or new files are misses."""
alpha_one = tree / "components" / "alpha" / "test_one.py"
alpha_two = tree / "components" / "alpha" / "test_two.py"
beta_x = tree / "components" / "beta" / "test_x.py"
cache = split_tests._Cache(
conftest_hash="dummy",
entries={
str(alpha_one.relative_to(tree)): split_tests._CacheEntry(
hash=split_tests._hash_file(alpha_one), count=1
),
str(alpha_two.relative_to(tree)): split_tests._CacheEntry(
hash="stale", count=99
),
},
)
cached, missing = split_tests._resolve_from_cache(
[alpha_one, alpha_two, beta_x], cache, tree
)
assert cached == {alpha_one: 1}
assert set(missing) == {alpha_two, beta_x}
def test_collect_tests_warm_cache_skips_pytest(tree: Path) -> None:
"""A warm cache with no diffs should skip the pytest subprocess entirely."""
cache_path = tree / "cache.json"
alpha_one = tree / "components" / "alpha" / "test_one.py"
alpha_two = tree / "components" / "alpha" / "test_two.py"
beta_x = tree / "components" / "beta" / "test_x.py"
split_tests._Cache(
conftest_hash=_conftest_hash_for(tree),
entries={
str(alpha_one.relative_to(tree)): split_tests._CacheEntry(
hash=split_tests._hash_file(alpha_one), count=1
),
str(alpha_two.relative_to(tree)): split_tests._CacheEntry(
hash=split_tests._hash_file(alpha_two), count=2
),
str(beta_x.relative_to(tree)): split_tests._CacheEntry(
hash=split_tests._hash_file(beta_x), count=3
),
},
).save(cache_path)
with patch.object(split_tests, "_run_collect_batches") as run_batches:
folder = split_tests.collect_tests(tree, cache_path)
run_batches.assert_not_called()
assert folder.total_tests == 6
def test_collect_tests_cold_cache_collects_only_missing(tree: Path) -> None:
"""A partial cache should only re-collect the files that changed."""
cache_path = tree / "cache.json"
alpha_one = tree / "components" / "alpha" / "test_one.py"
alpha_two = tree / "components" / "alpha" / "test_two.py"
beta_x = tree / "components" / "beta" / "test_x.py"
split_tests._Cache(
conftest_hash=_conftest_hash_for(tree),
entries={
str(alpha_one.relative_to(tree)): split_tests._CacheEntry(
hash=split_tests._hash_file(alpha_one), count=1
),
},
).save(cache_path)
def fake_run_batches(paths: list[Path]) -> list[tuple[str, str, int]]:
# Re-collected files emit one fake test each so we can verify which
# ones the batched runner was asked for.
return [
(
"\n".join(f"{p}: 1" for p in paths) + "\n",
"",
0,
)
]
with patch.object(
split_tests, "_run_collect_batches", side_effect=fake_run_batches
) as run_batches:
folder = split_tests.collect_tests(tree, cache_path)
assert run_batches.call_count == 1
requested = set(run_batches.call_args.args[0])
assert requested == {alpha_two, beta_x}
assert folder.total_tests == 3
# Cache should now contain entries for every test file.
saved = json.loads(cache_path.read_text())
assert set(saved["files"]) == {
str(alpha_one.relative_to(tree)),
str(alpha_two.relative_to(tree)),
str(beta_x.relative_to(tree)),
}
def test_collect_tests_caches_files_with_no_collected_tests(tree: Path) -> None:
"""Files pytest returns nothing for are cached as 0 so we stop re-collecting them.
Helper modules named test_*.py with no actual test functions look like
test files to the walker but pytest reports no tests for them. We
want the cache to remember that and skip them on subsequent runs.
"""
cache_path = tree / "cache.json"
alpha_one = tree / "components" / "alpha" / "test_one.py"
alpha_two = tree / "components" / "alpha" / "test_two.py"
beta_x = tree / "components" / "beta" / "test_x.py"
# Prime the cache with one hit so collect_tests takes the file-level
# diff path; the cold-cache path hands pytest top-level directories
# rather than individual file paths.
split_tests._Cache(
conftest_hash=_conftest_hash_for(tree),
entries={
str(alpha_one.relative_to(tree)): split_tests._CacheEntry(
hash=split_tests._hash_file(alpha_one), count=1
),
},
).save(cache_path)
def fake_run_batches(paths: list[Path]) -> list[tuple[str, str, int]]:
# Pretend pytest didn't see alpha_two at all.
emitted = [p for p in paths if p != alpha_two]
return [("\n".join(f"{p}: 1" for p in emitted) + "\n", "", 0)]
with patch.object(
split_tests, "_run_collect_batches", side_effect=fake_run_batches
):
split_tests.collect_tests(tree, cache_path)
saved = json.loads(cache_path.read_text())
assert saved["files"][str(alpha_two.relative_to(tree))]["count"] == 0
assert saved["files"][str(alpha_one.relative_to(tree))]["count"] == 1
assert saved["files"][str(beta_x.relative_to(tree))]["count"] == 1
# Re-running with the same content should now be a full cache hit
# even though alpha_two has no tests.
with patch.object(split_tests, "_run_collect_batches") as run_batches:
folder = split_tests.collect_tests(tree, cache_path)
run_batches.assert_not_called()
# alpha_two contributes 0, only alpha_one + beta_x count.
assert folder.total_tests == 2
def test_collect_tests_drops_deleted_files_from_cache(tree: Path) -> None:
"""Files that disappear from disk are dropped from the saved cache."""
cache_path = tree / "cache.json"
alpha_one = tree / "components" / "alpha" / "test_one.py"
ghost_rel = "components/alpha/test_ghost.py"
split_tests._Cache(
conftest_hash=_conftest_hash_for(tree),
entries={
str(alpha_one.relative_to(tree)): split_tests._CacheEntry(
hash=split_tests._hash_file(alpha_one), count=1
),
ghost_rel: split_tests._CacheEntry(hash="dead", count=42),
},
).save(cache_path)
def fake_run_batches(paths: list[Path]) -> list[tuple[str, str, int]]:
return [
(
"\n".join(f"{p}: 1" for p in paths) + "\n",
"",
0,
)
]
with patch.object(
split_tests, "_run_collect_batches", side_effect=fake_run_batches
):
split_tests.collect_tests(tree, cache_path)
saved = json.loads(cache_path.read_text())
assert ghost_rel not in saved["files"]