Compare commits

..

10 Commits

Author SHA1 Message Date
J. Nick Koston 22fb68b7a1 Revert "DNM: test cache, touch cloud manifest only"
This reverts commit a8bc244a7a.
2026-05-21 16:53:19 -05:00
J. Nick Koston 81e06539e6 Revert "DNM: test cache bust, touch cloud conftest"
This reverts commit 7c18b67b2e.
2026-05-21 16:53:19 -05:00
J. Nick Koston 7c18b67b2e DNM: test cache bust, touch cloud conftest 2026-05-21 16:48:05 -05:00
J. Nick Koston a8bc244a7a DNM: test cache, touch cloud manifest only 2026-05-21 16:44:43 -05:00
J. Nick Koston 5975f4b179 Skip cache walking when --cache is not passed
Address Copilot review feedback on the cache PR:

* Split collect_tests into _collect_tests_uncached (the original
  directory-based pre-cache flow) and _collect_tests_cached (walks
  the tree to build per-file hashes).  Without --cache we now skip
  the walk + per-file hash entirely.
* A single-file root has no ancestor conftests to walk, so the
  conftest_hash would always be empty and stale counts could survive
  a real conftest change; bypass the cache for the file-root case.
* Save the cache file with explicit utf-8 encoding and
  ensure_ascii=False.
2026-05-21 16:08:44 -05:00
J. Nick Koston 9ed16b63a3 Cache per-file test counts in split_tests
Persist the result of pytest --collect-only between CI runs as a JSON
file keyed by content hash, so unchanged test files are served from
cache and only edited or new files are re-collected.  The cache is
self-healing:

* Missing, corrupt, or wrong-version files fall back to a full collect.
* Any conftest.py change anywhere under the test root invalidates the
  whole cache, so fixture parametrization shifts cannot silently skew
  counts.
* Files pytest returns nothing for (helper modules named test_*.py with
  no test functions) are cached as zero so they don't get re-collected
  forever.

Walking is done once with os.walk (~2x faster than Path.rglob) and
collects test files plus conftests in a single pass.  When the cache
is fully cold we feed pytest top-level directories rather than
thousands of file paths so cold runs stay as fast as before the cache
landed.

Wire the new --cache flag through the prepare-pytest-full job and back
the cache file with actions/cache so PRs can pick up the latest dev
snapshot via restore-keys.  Local timings: cold 11s, warm with no diff
0.4s, warm with one file edited 2.3s.
2026-05-21 15:56:08 -05:00
J. Nick Koston 8dadaa2f9e Filter fan-out children and fail fast on empty batch list
Only pass directories and test_*.py files to pytest --collect-only so
helpers like tests/components/conftest.py and tests/components/common.py
are not treated as explicit collection targets, and bail out with a
clear error if no eligible paths are found instead of running pytest
with no arguments.
2026-05-21 15:17:42 -05:00
J. Nick Koston 4f98c71586 Run pytest --collect-only in parallel batches in split_tests
cProfile showed 99.6% of split_tests.py wall time was spent in the
single pytest --collect-only subprocess.  Fan out the collection across
``os.cpu_count()`` workers; round-robin chunking keeps each batch
roughly equal, and tests/components is expanded one level deeper so
the ~1000 integration subdirectories distribute evenly.  Local wall
time dropped from ~132s to ~11s on an 18-core box.  Bucket output is
unchanged because we still parse the same pytest -qq output, just
aggregated from multiple invocations.
2026-05-21 15:10:01 -05:00
Abílio Costa bffb0417cc Instruct agents to run prek after doing changes (#171757) 2026-05-21 20:16:26 +01:00
G Johansson 8b8c687fc3 Remove not needed exception handling in dnsip (#171758) 2026-05-21 20:58:32 +02:00
8 changed files with 778 additions and 135 deletions
+1
View File
@@ -25,6 +25,7 @@ This repository contains the core of Home Assistant, a Python 3 based home autom
- When entering a new environment or worktree, run `script/setup` to set up the virtual environment with all development dependencies (pylint, pre-commit hooks, etc.). This is required before committing.
- .vscode/tasks.json contains useful commands used for development.
- After finishing a code session, run `uv run prek run --all-files` to check for linting and formatting issues.
## Python Syntax Notes
+12 -1
View File
@@ -917,12 +917,23 @@ jobs:
key: >-
${{ runner.os }}-${{ runner.arch }}-${{ steps.python.outputs.python-version }}-${{
needs.info.outputs.python_cache_key }}
- name: Restore pytest test counts cache
uses: actions/cache@27d5ce7f107fe9357f9df03efb73ab90386fccae # v5.0.5
with:
path: pytest_test_counts.json
key: >-
pytest-counts-${{ runner.os }}-${{ runner.arch }}-${{
steps.python.outputs.python-version }}-${{ github.sha }}
restore-keys: |
pytest-counts-${{ runner.os }}-${{ runner.arch }}-${{ steps.python.outputs.python-version }}-
- name: Run split_tests.py
env:
TEST_GROUP_COUNT: ${{ needs.info.outputs.test_group_count }}
run: |
. venv/bin/activate
python -m script.split_tests ${TEST_GROUP_COUNT} tests
python -m script.split_tests \
--cache pytest_test_counts.json \
${TEST_GROUP_COUNT} tests
- name: Upload pytest_buckets
uses: actions/upload-artifact@043fb46d1a93c77aae656e7c1c64a875d1fc6a0a # v7.0.1
with:
+1
View File
@@ -15,6 +15,7 @@ This repository contains the core of Home Assistant, a Python 3 based home autom
- When entering a new environment or worktree, run `script/setup` to set up the virtual environment with all development dependencies (pylint, pre-commit hooks, etc.). This is required before committing.
- .vscode/tasks.json contains useful commands used for development.
- After finishing a code session, run `uv run prek run --all-files` to check for linting and formatting issues.
## Python Syntax Notes
+1 -6
View File
@@ -6,7 +6,6 @@ import logging
import aiodns
from aiodns.error import DNSError
from pycares import AresError
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_PORT
@@ -78,11 +77,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: DnsIPConfigEntry) -> boo
) from err
errors = [
result
for result in results
if isinstance(
result, (TimeoutError, DNSError, AresError, asyncio.CancelledError)
)
result for result in results if isinstance(result, (TimeoutError, DNSError))
]
if errors and len(errors) == len(results):
await _close_resolvers()
+328 -20
View File
@@ -2,20 +2,30 @@
"""Helper script to split test into n buckets."""
import argparse
from concurrent.futures import ProcessPoolExecutor
from dataclasses import dataclass, field
import hashlib
import json
from math import ceil
import os
from pathlib import Path
import subprocess
import sys
from typing import Final
# tests/components has ~1000 sub-directories, which makes it the natural
# place to subdivide to keep each pytest invocation roughly equal in size.
_FAN_OUT_DIRS: Final = frozenset({"components"})
# Cache file format version; bump on any incompatible schema change so old
# caches are ignored rather than misread.
_CACHE_VERSION: Final = 1
class Bucket:
"""Class to hold bucket."""
def __init__(
self,
):
def __init__(self) -> None:
"""Initialize bucket."""
self.total_tests = 0
self._paths: list[str] = []
@@ -77,7 +87,7 @@ class BucketHolder:
def create_ouput_file(self) -> None:
"""Create output file."""
with Path("pytest_buckets.txt").open("w") as file:
with Path("pytest_buckets.txt").open("w", encoding="utf-8") as file:
for idx, bucket in enumerate(self._buckets):
print(f"Bucket {idx + 1} has {bucket.total_tests} tests")
file.write(bucket.get_paths_line())
@@ -164,37 +174,329 @@ class TestFolder:
return result
def collect_tests(path: Path) -> TestFolder:
"""Collect all tests."""
def _collect_batch(paths: list[Path]) -> tuple[str, str, int]:
"""Run pytest --collect-only on a batch of paths."""
result = subprocess.run(
["pytest", "--collect-only", "-qq", "-p", "no:warnings", path],
["pytest", "--collect-only", "-qq", "-p", "no:warnings", *map(str, paths)],
check=False,
capture_output=True,
text=True,
)
return result.stdout, result.stderr, result.returncode
if result.returncode != 0:
print("Failed to collect tests:")
print(result.stderr)
print(result.stdout)
sys.exit(1)
folder = TestFolder(path)
def _iter_eligible_children(path: Path) -> list[Path]:
"""Return immediate children of ``path`` that pytest should collect.
for line in result.stdout.splitlines():
Filters out hidden/dunder entries, non-``test_*.py`` files (so helper
modules like ``conftest.py`` and ``common.py`` are not passed as
explicit collection targets), and pycache-style directories.
"""
children: list[Path] = []
for entry in sorted(path.iterdir()):
if entry.name.startswith((".", "_")):
continue
if entry.is_dir() or (entry.suffix == ".py" and entry.name.startswith("test_")):
children.append(entry)
return children
def _enumerate_batch_paths(path: Path) -> list[Path]:
"""Return the child paths to run pytest --collect-only over.
Files are returned as-is. Directories are expanded one level deep, with
a second level of expansion for entries named in ``_FAN_OUT_DIRS`` so the
enormous ``tests/components`` tree fans out into per-integration paths.
"""
if path.is_file():
return [path]
paths: list[Path] = []
for entry in _iter_eligible_children(path):
if entry.is_dir() and entry.name in _FAN_OUT_DIRS:
paths.extend(_iter_eligible_children(entry))
else:
paths.append(entry)
return paths
def _hash_file(path: Path) -> str:
"""Return a short content hash for ``path``."""
return hashlib.sha256(path.read_bytes()).hexdigest()[:16]
def _walk_test_tree(root: Path) -> tuple[list[Path], list[Path]]:
"""Walk ``root`` once and return (test files, conftest files).
Uses ``os.walk`` rather than ``Path.rglob`` because it's ~2x faster on
a 5000-file tree, and we prune hidden/dunder subdirectories instead of
visiting them. Doing both walks in one pass keeps total tree I/O down.
"""
if root.is_file():
if root.name.startswith("test_") and root.suffix == ".py":
return [root], []
return [], []
test_files: list[Path] = []
conftests: list[Path] = []
for dirpath, dirnames, filenames in os.walk(root):
dirnames[:] = [d for d in dirnames if not d.startswith((".", "_"))]
base = Path(dirpath)
for name in filenames:
if name == "conftest.py":
conftests.append(base / name)
elif name.startswith("test_") and name.endswith(".py"):
test_files.append(base / name)
test_files.sort()
conftests.sort()
return test_files, conftests
def _compute_conftest_hash(root: Path, conftests: list[Path]) -> str:
"""Return a hash that changes whenever any conftest.py under ``root`` changes.
Any change to a conftest invalidates the entire test-count cache. This is
coarse but safe: conftests can change fixture parametrization in ways the
cache cannot otherwise detect, so we just re-collect everything.
"""
digest = hashlib.sha256()
for conftest in conftests:
digest.update(str(conftest.relative_to(root)).encode())
digest.update(b"\0")
digest.update(conftest.read_bytes())
digest.update(b"\0")
return digest.hexdigest()
@dataclass
class _CacheEntry:
"""Cached test count for a single file."""
hash: str
count: int
@dataclass
class _Cache:
"""Mapping of test file path → cached entry, plus invalidation key."""
conftest_hash: str
entries: dict[str, _CacheEntry]
@classmethod
def empty(cls, conftest_hash: str = "") -> _Cache:
"""Return a new empty cache."""
return cls(conftest_hash=conftest_hash, entries={})
@classmethod
def load(cls, path: Path, current_conftest_hash: str) -> _Cache:
"""Load cache from ``path`` and invalidate it on schema/conftest drift.
Any failure (missing file, bad JSON, version drift, conftest drift)
returns an empty cache so the script just falls back to a full
collection. This is the self-healing path.
"""
try:
raw = json.loads(path.read_bytes())
except OSError, ValueError:
return cls.empty(current_conftest_hash)
if not isinstance(raw, dict) or raw.get("version") != _CACHE_VERSION:
return cls.empty(current_conftest_hash)
if raw.get("conftest_hash") != current_conftest_hash:
return cls.empty(current_conftest_hash)
files = raw.get("files")
if not isinstance(files, dict):
return cls.empty(current_conftest_hash)
entries: dict[str, _CacheEntry] = {}
for key, value in files.items():
if (
not isinstance(value, dict)
or not isinstance(value.get("hash"), str)
or not isinstance(value.get("count"), int)
):
# Skip malformed entries instead of discarding the whole cache.
continue
entries[key] = _CacheEntry(hash=value["hash"], count=value["count"])
return cls(conftest_hash=current_conftest_hash, entries=entries)
def save(self, path: Path) -> None:
"""Write the cache to ``path``."""
path.write_text(
json.dumps(
{
"version": _CACHE_VERSION,
"conftest_hash": self.conftest_hash,
"files": {
key: {"hash": entry.hash, "count": entry.count}
for key, entry in sorted(self.entries.items())
},
},
indent=2,
ensure_ascii=False,
)
+ "\n",
encoding="utf-8",
)
def _resolve_from_cache(
test_files: list[Path],
cache: _Cache,
root: Path,
) -> tuple[dict[Path, int], list[Path]]:
"""Split ``test_files`` into ``(cached_counts, needs_collection)``.
A file is served from cache when its content hash matches what we
previously stored; otherwise it is queued for re-collection.
"""
cached: dict[Path, int] = {}
misses: list[Path] = []
for file in test_files:
key = str(file.relative_to(root))
entry = cache.entries.get(key)
if entry is None:
misses.append(file)
continue
if entry.hash != _hash_file(file):
misses.append(file)
continue
cached[file] = entry.count
return cached, misses
def _run_collect_batches(paths: list[Path]) -> list[tuple[str, str, int]]:
"""Run pytest --collect-only across ``paths`` using a process pool."""
workers = min(len(paths), os.cpu_count() or 1) or 1
batches = [paths[i::workers] for i in range(workers)]
if workers == 1:
return [_collect_batch(batches[0])]
with ProcessPoolExecutor(max_workers=workers) as executor:
return list(executor.map(_collect_batch, batches))
def _parse_collect_output(stdout: str) -> dict[Path, int]:
"""Parse ``pytest --collect-only -qq`` output into ``{path: count}``."""
counts: dict[Path, int] = {}
for line in stdout.splitlines():
if not line.strip():
continue
file_path, _, total_tests = line.partition(": ")
if not path or not total_tests:
print(f"Unexpected line: {line}")
sys.exit(1)
if not file_path or not total_tests:
raise ValueError(f"Unexpected line: {line}")
counts[Path(file_path)] = int(total_tests)
return counts
file = TestFile(int(total_tests), Path(file_path))
folder.add_test_file(file)
def _run_pytest_collect(paths: list[Path]) -> dict[Path, int]:
"""Run pytest --collect-only across ``paths`` and parse the output."""
counts: dict[Path, int] = {}
for stdout, stderr, returncode in _run_collect_batches(paths):
if returncode != 0:
print("Failed to collect tests:")
print(stderr)
print(stdout)
sys.exit(1)
try:
counts.update(_parse_collect_output(stdout))
except ValueError as err:
print(err)
sys.exit(1)
return counts
def _collect_tests_uncached(path: Path) -> TestFolder:
"""Collect tests by handing pytest the top-level directories.
Skips the tree walk and per-file hashing; used when no cache file is
requested so the script behaves like the pre-cache implementation.
"""
batch_paths = _enumerate_batch_paths(path)
if not batch_paths:
print(f"No eligible test paths found under {path}")
sys.exit(1)
folder = TestFolder(path)
for file_path, total_tests in _run_pytest_collect(batch_paths).items():
folder.add_test_file(TestFile(total_tests, file_path))
return folder
def _collect_tests_cached(path: Path, cache_path: Path) -> TestFolder:
"""Collect tests using an on-disk cache for incremental updates."""
all_test_files, conftests = _walk_test_tree(path)
if not all_test_files:
print(f"No eligible test paths found under {path}")
sys.exit(1)
conftest_hash = _compute_conftest_hash(path, conftests)
cache = _Cache.load(cache_path, conftest_hash)
cached_counts, missing = _resolve_from_cache(all_test_files, cache, path)
print(
f"Cache: {len(cached_counts)} hits / {len(missing)} misses"
f" / {len(all_test_files)} total"
)
new_counts: dict[Path, int] = {}
if missing:
# On a full cold-cache run, hand pytest the top-level directories
# instead of 5000+ individual file paths: pytest walks dirs much
# faster than it resolves each file argument. Once any cache hits
# exist, use file-level collection so we only re-collect the diff.
if not cached_counts:
collect_paths = _enumerate_batch_paths(path)
else:
collect_paths = missing
new_counts = _run_pytest_collect(collect_paths)
counts: dict[Path, int] = {**cached_counts, **new_counts}
folder = TestFolder(path)
for file_path, total_tests in counts.items():
if total_tests == 0:
# Files with no collected tests (eg helper modules named
# test_init.py with no test functions) shouldn't enter
# bucketing, but we still cache them below as count=0 so
# they don't get re-collected next run.
continue
folder.add_test_file(TestFile(total_tests, file_path))
# Rebuild the cache from scratch on every run so deleted files are
# dropped and re-collected files get a refreshed hash.
missing_set = set(missing)
updated_entries: dict[str, _CacheEntry] = {}
for file in all_test_files:
if file in counts:
count = counts[file]
elif file in missing_set:
# We asked pytest about this file and got no count back,
# so it has no collectible tests; cache it as 0 to avoid
# repeating the work next run.
count = 0
else:
continue
updated_entries[str(file.relative_to(path))] = _CacheEntry(
hash=_hash_file(file), count=count
)
_Cache(conftest_hash=conftest_hash, entries=updated_entries).save(cache_path)
return folder
def collect_tests(path: Path, cache_path: Path | None = None) -> TestFolder:
"""Collect all tests, using an on-disk cache when ``cache_path`` is set."""
if cache_path is None:
return _collect_tests_uncached(path)
if path.is_file():
# The cache keys on conftest_hash, but a single file root has no
# ancestor conftests to walk and the hash would always be empty,
# which would let stale counts survive conftest edits. Skip the
# cache for the file-root case rather than silently mis-caching.
print(f"--cache ignored: {path} is a single file")
return _collect_tests_uncached(path)
return _collect_tests_cached(path, cache_path)
def main() -> None:
"""Execute script."""
parser = argparse.ArgumentParser(description="Split tests into n buckets.")
@@ -217,11 +519,17 @@ def main() -> None:
help="Path to the test files to split into buckets",
type=Path,
)
parser.add_argument(
"--cache",
help="Path to a JSON file used to cache per-file test counts",
type=Path,
default=None,
)
arguments = parser.parse_args()
print("Collecting tests...")
tests = collect_tests(arguments.path)
tests = collect_tests(arguments.path, arguments.cache)
tests_per_bucket = ceil(tests.total_tests / arguments.bucket_count)
bucket_holder = BucketHolder(tests_per_bucket, arguments.bucket_count)
-4
View File
@@ -1,10 +1,8 @@
"""Test for DNS IP integration Init."""
import asyncio
from unittest.mock import patch
from aiodns.error import DNSError
from pycares import AresError
import pytest
from homeassistant.components.dnsip.const import (
@@ -180,8 +178,6 @@ async def test_migrate_error_from_future(hass: HomeAssistant) -> None:
[
TimeoutError(),
DNSError(),
AresError(),
asyncio.CancelledError(),
],
)
async def test_setup_dns_error(hass: HomeAssistant, error: Exception) -> None:
+52 -104
View File
@@ -1,10 +1,7 @@
"""The tests the History component websocket_api."""
import asyncio
from collections.abc import Callable, Iterator
from contextlib import contextmanager
from datetime import timedelta
from typing import Any
from unittest.mock import ANY, patch
from freezegun import freeze_time
@@ -12,12 +9,7 @@ import pytest
from homeassistant.components import history
from homeassistant.components.history import websocket_api
from homeassistant.const import (
EVENT_HOMEASSISTANT_FINAL_WRITE,
EVENT_STATE_CHANGED,
STATE_OFF,
STATE_ON,
)
from homeassistant.const import EVENT_HOMEASSISTANT_FINAL_WRITE, STATE_OFF, STATE_ON
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.event import async_track_state_change_event
from homeassistant.setup import async_setup_component
@@ -43,27 +35,6 @@ def listeners_without_writes(listeners: dict[str, int]) -> dict[str, int]:
}
@contextmanager
def assert_no_listener_leak(hass: HomeAssistant) -> Iterator[None]:
"""Capture bus listeners on entry, assert no leak on exit.
EVENT_STATE_CHANGED is excluded because unrelated components can
asynchronously add or remove state_changed listeners during a test.
"""
excluded = {EVENT_HOMEASSISTANT_FINAL_WRITE, EVENT_STATE_CHANGED}
def _snapshot() -> dict[str, int]:
return {
key: value
for key, value in hass.bus.async_listeners().items()
if key not in excluded
}
before = _snapshot()
yield
assert _snapshot() == before
@pytest.mark.usefixtures("hass_history")
def test_setup() -> None:
"""Test setup method of history."""
@@ -1600,28 +1571,7 @@ async def test_overflow_queue(
"""Test overflowing the history stream queue."""
now = dt_util.utcnow()
wanted_entities = ["sensor.two", "sensor.four", "sensor.one"]
unsub_calls = 0
def spy_track_state_change_event(*args: Any, **kwargs: Any) -> Callable[[], None]:
nonlocal unsub_calls
real_unsub = async_track_state_change_event(*args, **kwargs)
def wrapped_unsub() -> None:
nonlocal unsub_calls
unsub_calls += 1
real_unsub()
return wrapped_unsub
with (
patch.object(websocket_api, "MAX_PENDING_HISTORY_STATES", 5),
patch.object(
websocket_api,
"async_track_state_change_event",
spy_track_state_change_event,
),
):
with patch.object(websocket_api, "MAX_PENDING_HISTORY_STATES", 5):
await async_setup_component(
hass,
"history",
@@ -1645,63 +1595,61 @@ async def test_overflow_queue(
await async_wait_recording_done(hass)
client = await hass_ws_client()
init_listeners = hass.bus.async_listeners()
with assert_no_listener_leak(hass):
await client.send_json(
{
"id": 1,
"type": "history/stream",
"entity_ids": wanted_entities,
"start_time": now.isoformat(),
"include_start_time_state": True,
"significant_changes_only": False,
"no_attributes": True,
"minimal_response": True,
}
)
response = await client.receive_json()
assert response["success"]
assert response["id"] == 1
assert response["type"] == "result"
response = await client.receive_json()
first_end_time = sensor_two_last_updated_timestamp
assert response == {
"event": {
"end_time": pytest.approx(first_end_time),
"start_time": pytest.approx(now.timestamp()),
"states": {
"sensor.one": [
{
"lu": pytest.approx(sensor_one_last_updated_timestamp),
"s": "on",
}
],
"sensor.two": [
{
"lu": pytest.approx(sensor_two_last_updated_timestamp),
"s": "off",
}
],
},
},
await client.send_json(
{
"id": 1,
"type": "event",
"type": "history/stream",
"entity_ids": wanted_entities,
"start_time": now.isoformat(),
"include_start_time_state": True,
"significant_changes_only": False,
"no_attributes": True,
"minimal_response": True,
}
)
response = await client.receive_json()
assert response["success"]
assert response["id"] == 1
assert response["type"] == "result"
await async_recorder_block_till_done(hass)
# Overflow the queue
for val in range(10):
hass.states.async_set(
"sensor.one", str(val), attributes={"any": "attr"}
)
hass.states.async_set(
"sensor.two", str(val), attributes={"any": "attr"}
)
await async_recorder_block_till_done(hass)
response = await client.receive_json()
first_end_time = sensor_two_last_updated_timestamp
assert unsub_calls == 1
assert response == {
"event": {
"end_time": pytest.approx(first_end_time),
"start_time": pytest.approx(now.timestamp()),
"states": {
"sensor.one": [
{
"lu": pytest.approx(sensor_one_last_updated_timestamp),
"s": "on",
}
],
"sensor.two": [
{
"lu": pytest.approx(sensor_two_last_updated_timestamp),
"s": "off",
}
],
},
},
"id": 1,
"type": "event",
}
await async_recorder_block_till_done(hass)
# Overflow the queue
for val in range(10):
hass.states.async_set("sensor.one", str(val), attributes={"any": "attr"})
hass.states.async_set("sensor.two", str(val), attributes={"any": "attr"})
await async_recorder_block_till_done(hass)
assert listeners_without_writes(
hass.bus.async_listeners()
) == listeners_without_writes(init_listeners)
@pytest.mark.usefixtures("recorder_mock")
+383
View File
@@ -0,0 +1,383 @@
"""Tests for the split_tests cache logic."""
import json
from pathlib import Path
from unittest.mock import patch
import pytest
from script import split_tests
@pytest.fixture
def tree(tmp_path: Path) -> Path:
"""Build a small test tree on disk.
Returns the root path containing one conftest, two integrations,
and an unrelated helper module that the splitter should ignore.
"""
(tmp_path / "conftest.py").write_text("# tests/conftest.py\n")
(tmp_path / "common.py").write_text("# helper module\n")
alpha_dir = tmp_path / "components" / "alpha"
alpha_dir.mkdir(parents=True)
(alpha_dir / "conftest.py").write_text("# alpha conftest\n")
(alpha_dir / "test_one.py").write_text("def test_a():\n pass\n")
(alpha_dir / "test_two.py").write_text("def test_b():\n pass\n")
beta_dir = tmp_path / "components" / "beta"
beta_dir.mkdir()
(beta_dir / "test_x.py").write_text("def test_x():\n pass\n")
return tmp_path
def test_iter_eligible_children_filters_helpers(tree: Path) -> None:
"""Helper files like conftest.py and common.py are not collection targets."""
children = split_tests._iter_eligible_children(tree)
names = {p.name for p in children}
assert "common.py" not in names
assert "conftest.py" not in names
# components/ is a dir, gets included.
assert "components" in names
def test_enumerate_batch_paths_fans_out_components(tree: Path) -> None:
"""tests/components fans out one level deeper into per-integration paths."""
paths = split_tests._enumerate_batch_paths(tree)
rel = {p.relative_to(tree).as_posix() for p in paths}
assert rel == {"components/beta", "components/alpha"}
def test_enumerate_batch_paths_for_single_file(tmp_path: Path) -> None:
"""A test file passed directly is returned as-is."""
file = tmp_path / "test_solo.py"
file.write_text("def test_x(): pass\n")
assert split_tests._enumerate_batch_paths(file) == [file]
def _conftest_hash_for(tree: Path) -> str:
"""Compute the conftest hash for ``tree`` (helper for the tests below)."""
_, conftests = split_tests._walk_test_tree(tree)
return split_tests._compute_conftest_hash(tree, conftests)
def test_compute_conftest_hash_changes_when_conftest_changes(tree: Path) -> None:
"""Editing any conftest changes the global cache key."""
before = _conftest_hash_for(tree)
(tree / "components" / "alpha" / "conftest.py").write_text("# changed\n")
after = _conftest_hash_for(tree)
assert before != after
def test_compute_conftest_hash_stable_for_non_conftest_changes(tree: Path) -> None:
"""Test-file edits do not invalidate the global cache key."""
before = _conftest_hash_for(tree)
(tree / "components" / "alpha" / "test_one.py").write_text(
"def test_a():\n pass\n\ndef test_c():\n pass\n"
)
after = _conftest_hash_for(tree)
assert before == after
def test_walk_test_tree_finds_tests_and_conftests(tree: Path) -> None:
"""The walker returns test files and conftest files but no helpers."""
test_files, conftests = split_tests._walk_test_tree(tree)
test_names = {p.name for p in test_files}
conftest_paths = {p.relative_to(tree).as_posix() for p in conftests}
assert test_names == {"test_one.py", "test_two.py", "test_x.py"}
assert conftest_paths == {"conftest.py", "components/alpha/conftest.py"}
def test_walk_test_tree_skips_hidden_and_dunder_dirs(tmp_path: Path) -> None:
"""Hidden/dunder directories are pruned from the walk."""
(tmp_path / "__pycache__").mkdir()
(tmp_path / "__pycache__" / "test_ghost.py").write_text("def test_g(): pass\n")
(tmp_path / ".hidden").mkdir()
(tmp_path / ".hidden" / "test_invisible.py").write_text("def test_h(): pass\n")
(tmp_path / "test_real.py").write_text("def test_r(): pass\n")
test_files, _ = split_tests._walk_test_tree(tmp_path)
assert {p.name for p in test_files} == {"test_real.py"}
def test_walk_test_tree_handles_single_file(tmp_path: Path) -> None:
"""Passing a single test file returns just that file."""
file = tmp_path / "test_solo.py"
file.write_text("def test_x(): pass\n")
assert split_tests._walk_test_tree(file) == ([file], [])
def test_collect_tests_skips_cache_for_single_file_root(tmp_path: Path) -> None:
"""A single-file root cannot validate conftest drift, so caching is disabled.
_walk_test_tree returns no conftests for a file root, which would make
the conftest_hash a constant — letting a stale entry survive a real
conftest change. Better to bypass the cache than mis-cache silently.
"""
cache_path = tmp_path / "cache.json"
file = tmp_path / "test_solo.py"
file.write_text("def test_x(): pass\n")
with (
patch.object(split_tests, "_collect_tests_uncached") as uncached,
patch.object(split_tests, "_collect_tests_cached") as cached,
):
split_tests.collect_tests(file, cache_path)
uncached.assert_called_once_with(file)
cached.assert_not_called()
assert not cache_path.exists()
def test_cache_roundtrip(tmp_path: Path) -> None:
"""A cache survives save → load when the conftest hash matches."""
cache_path = tmp_path / "cache.json"
cache = split_tests._Cache(
conftest_hash="abc",
entries={"tests/alpha/test_a.py": split_tests._CacheEntry(hash="h1", count=5)},
)
cache.save(cache_path)
loaded = split_tests._Cache.load(cache_path, "abc")
assert loaded.entries == cache.entries
assert loaded.conftest_hash == "abc"
def test_cache_load_missing_returns_empty(tmp_path: Path) -> None:
"""A missing cache file degrades gracefully to an empty cache."""
cache = split_tests._Cache.load(tmp_path / "missing.json", "abc")
assert cache.entries == {}
assert cache.conftest_hash == "abc"
def test_cache_load_invalid_json_returns_empty(tmp_path: Path) -> None:
"""Corrupt JSON is treated as a cache miss instead of crashing."""
path = tmp_path / "broken.json"
path.write_text("{not json")
cache = split_tests._Cache.load(path, "abc")
assert cache.entries == {}
def test_cache_load_wrong_version_returns_empty(tmp_path: Path) -> None:
"""An older cache schema is discarded rather than misread."""
path = tmp_path / "old.json"
path.write_text(json.dumps({"version": 0, "conftest_hash": "abc", "files": {}}))
cache = split_tests._Cache.load(path, "abc")
assert cache.entries == {}
def test_cache_load_conftest_drift_returns_empty(tmp_path: Path) -> None:
"""A conftest change invalidates the entire cached set."""
path = tmp_path / "cache.json"
path.write_text(
json.dumps(
{
"version": split_tests._CACHE_VERSION,
"conftest_hash": "old",
"files": {"test_a.py": {"hash": "h1", "count": 3}},
}
)
)
cache = split_tests._Cache.load(path, "new")
assert cache.entries == {}
def test_cache_load_drops_malformed_entries(tmp_path: Path) -> None:
"""Malformed per-file entries are skipped, valid ones are kept."""
path = tmp_path / "cache.json"
path.write_text(
json.dumps(
{
"version": split_tests._CACHE_VERSION,
"conftest_hash": "abc",
"files": {
"good.py": {"hash": "h1", "count": 3},
"bad_count.py": {"hash": "h2", "count": "three"},
"missing_hash.py": {"count": 4},
"not_dict.py": 5,
},
}
)
)
cache = split_tests._Cache.load(path, "abc")
assert set(cache.entries) == {"good.py"}
def test_resolve_from_cache_hits_and_misses(tree: Path) -> None:
"""Files with matching hashes are hits; edited or new files are misses."""
alpha_one = tree / "components" / "alpha" / "test_one.py"
alpha_two = tree / "components" / "alpha" / "test_two.py"
beta_x = tree / "components" / "beta" / "test_x.py"
cache = split_tests._Cache(
conftest_hash="dummy",
entries={
str(alpha_one.relative_to(tree)): split_tests._CacheEntry(
hash=split_tests._hash_file(alpha_one), count=1
),
str(alpha_two.relative_to(tree)): split_tests._CacheEntry(
hash="stale", count=99
),
},
)
cached, missing = split_tests._resolve_from_cache(
[alpha_one, alpha_two, beta_x], cache, tree
)
assert cached == {alpha_one: 1}
assert set(missing) == {alpha_two, beta_x}
def test_collect_tests_warm_cache_skips_pytest(tree: Path) -> None:
"""A warm cache with no diffs should skip the pytest subprocess entirely."""
cache_path = tree / "cache.json"
alpha_one = tree / "components" / "alpha" / "test_one.py"
alpha_two = tree / "components" / "alpha" / "test_two.py"
beta_x = tree / "components" / "beta" / "test_x.py"
split_tests._Cache(
conftest_hash=_conftest_hash_for(tree),
entries={
str(alpha_one.relative_to(tree)): split_tests._CacheEntry(
hash=split_tests._hash_file(alpha_one), count=1
),
str(alpha_two.relative_to(tree)): split_tests._CacheEntry(
hash=split_tests._hash_file(alpha_two), count=2
),
str(beta_x.relative_to(tree)): split_tests._CacheEntry(
hash=split_tests._hash_file(beta_x), count=3
),
},
).save(cache_path)
with patch.object(split_tests, "_run_collect_batches") as run_batches:
folder = split_tests.collect_tests(tree, cache_path)
run_batches.assert_not_called()
assert folder.total_tests == 6
def test_collect_tests_cold_cache_collects_only_missing(tree: Path) -> None:
"""A partial cache should only re-collect the files that changed."""
cache_path = tree / "cache.json"
alpha_one = tree / "components" / "alpha" / "test_one.py"
alpha_two = tree / "components" / "alpha" / "test_two.py"
beta_x = tree / "components" / "beta" / "test_x.py"
split_tests._Cache(
conftest_hash=_conftest_hash_for(tree),
entries={
str(alpha_one.relative_to(tree)): split_tests._CacheEntry(
hash=split_tests._hash_file(alpha_one), count=1
),
},
).save(cache_path)
def fake_run_batches(paths: list[Path]) -> list[tuple[str, str, int]]:
# Re-collected files emit one fake test each so we can verify which
# ones the batched runner was asked for.
return [
(
"\n".join(f"{p}: 1" for p in paths) + "\n",
"",
0,
)
]
with patch.object(
split_tests, "_run_collect_batches", side_effect=fake_run_batches
) as run_batches:
folder = split_tests.collect_tests(tree, cache_path)
assert run_batches.call_count == 1
requested = set(run_batches.call_args.args[0])
assert requested == {alpha_two, beta_x}
assert folder.total_tests == 3
# Cache should now contain entries for every test file.
saved = json.loads(cache_path.read_text())
assert set(saved["files"]) == {
str(alpha_one.relative_to(tree)),
str(alpha_two.relative_to(tree)),
str(beta_x.relative_to(tree)),
}
def test_collect_tests_caches_files_with_no_collected_tests(tree: Path) -> None:
"""Files pytest returns nothing for are cached as 0 so we stop re-collecting them.
Helper modules named test_*.py with no actual test functions look like
test files to the walker but pytest reports no tests for them. We
want the cache to remember that and skip them on subsequent runs.
"""
cache_path = tree / "cache.json"
alpha_one = tree / "components" / "alpha" / "test_one.py"
alpha_two = tree / "components" / "alpha" / "test_two.py"
beta_x = tree / "components" / "beta" / "test_x.py"
# Prime the cache with one hit so collect_tests takes the file-level
# diff path; the cold-cache path hands pytest top-level directories
# rather than individual file paths.
split_tests._Cache(
conftest_hash=_conftest_hash_for(tree),
entries={
str(alpha_one.relative_to(tree)): split_tests._CacheEntry(
hash=split_tests._hash_file(alpha_one), count=1
),
},
).save(cache_path)
def fake_run_batches(paths: list[Path]) -> list[tuple[str, str, int]]:
# Pretend pytest didn't see alpha_two at all.
emitted = [p for p in paths if p != alpha_two]
return [("\n".join(f"{p}: 1" for p in emitted) + "\n", "", 0)]
with patch.object(
split_tests, "_run_collect_batches", side_effect=fake_run_batches
):
split_tests.collect_tests(tree, cache_path)
saved = json.loads(cache_path.read_text())
assert saved["files"][str(alpha_two.relative_to(tree))]["count"] == 0
assert saved["files"][str(alpha_one.relative_to(tree))]["count"] == 1
assert saved["files"][str(beta_x.relative_to(tree))]["count"] == 1
# Re-running with the same content should now be a full cache hit
# even though alpha_two has no tests.
with patch.object(split_tests, "_run_collect_batches") as run_batches:
folder = split_tests.collect_tests(tree, cache_path)
run_batches.assert_not_called()
# alpha_two contributes 0, only alpha_one + beta_x count.
assert folder.total_tests == 2
def test_collect_tests_drops_deleted_files_from_cache(tree: Path) -> None:
"""Files that disappear from disk are dropped from the saved cache."""
cache_path = tree / "cache.json"
alpha_one = tree / "components" / "alpha" / "test_one.py"
ghost_rel = "components/alpha/test_ghost.py"
split_tests._Cache(
conftest_hash=_conftest_hash_for(tree),
entries={
str(alpha_one.relative_to(tree)): split_tests._CacheEntry(
hash=split_tests._hash_file(alpha_one), count=1
),
ghost_rel: split_tests._CacheEntry(hash="dead", count=42),
},
).save(cache_path)
def fake_run_batches(paths: list[Path]) -> list[tuple[str, str, int]]:
return [
(
"\n".join(f"{p}: 1" for p in paths) + "\n",
"",
0,
)
]
with patch.object(
split_tests, "_run_collect_batches", side_effect=fake_run_batches
):
split_tests.collect_tests(tree, cache_path)
saved = json.loads(cache_path.read_text())
assert ghost_rel not in saved["files"]