Compare commits

..

50 Commits

Author SHA1 Message Date
J. Nick Koston 38f35f9646 Reject single-file root in split_tests instead of crashing later 2026-05-27 14:32:57 -05:00
J. Nick Koston 3064f3128d Truncate fixture-scope hash to 16 chars for consistency with file hash 2026-05-27 14:31:13 -05:00
J. Nick Koston 3114fdd145 Drop bool check in split_tests cache count validation 2026-05-27 14:30:01 -05:00
J. Nick Koston f9ac843e08 Narrow split_tests project-root marker to pyproject.toml 2026-05-27 14:28:37 -05:00
J. Nick Koston f44d77e84f Merge print loop into bucket placement loop in split_tests 2026-05-27 14:27:26 -05:00
J. Nick Koston cccb169e4d Reset split_tests cache version to 1 2026-05-27 14:26:11 -05:00
J. Nick Koston 05534d6dfc Merge branch 'dev' into cache-split-tests 2026-05-26 18:31:42 -05:00
J. Nick Koston 728efa8d6c Merge branch 'dev' into cache-split-tests 2026-05-26 18:24:58 -05:00
J. Nick Koston fa1284e29b Merge branch 'dev' into cache-split-tests 2026-05-26 18:19:51 -05:00
J. Nick Koston bf48806ee9 Merge remote-tracking branch 'upstream/cache-split-tests' into cache-split-tests 2026-05-22 14:48:53 -05:00
J. Nick Koston 2c25c5ad26 another round of copilot comments 2026-05-22 14:48:31 -05:00
J. Nick Koston ff1177dde4 Merge branch 'dev' into cache-split-tests 2026-05-22 14:33:56 -05:00
J. Nick Koston 1cc91cd3b6 another round of copilot 2026-05-22 13:32:08 -05:00
J. Nick Koston ecac38a359 Merge branch 'dev' into cache-split-tests 2026-05-22 13:20:15 -05:00
J. Nick Koston 8301addc94 bot comments 2026-05-22 13:06:55 -05:00
J. Nick Koston 77bc932cf0 will copilot ever end 2026-05-22 12:57:17 -05:00
J. Nick Koston 11903ac62e dry 2026-05-22 12:54:51 -05:00
J. Nick Koston 878761cb41 preen 2026-05-22 12:49:43 -05:00
J. Nick Koston d7bf7df59f preen 2026-05-22 12:49:16 -05:00
J. Nick Koston e5890172a0 preen 2026-05-22 12:48:39 -05:00
J. Nick Koston e9a58cdd20 restore 2026-05-22 12:47:15 -05:00
J. Nick Koston cab7c41a7f more cleanups 2026-05-22 12:37:54 -05:00
J. Nick Koston 277a2d847a more cleanups 2026-05-22 12:30:30 -05:00
J. Nick Koston 7835a4992a simplify 2026-05-22 11:51:05 -05:00
J. Nick Koston 9dc37a2f46 Merge remote-tracking branch 'upstream/cache-split-tests' into cache-split-tests 2026-05-22 11:43:16 -05:00
J. Nick Koston 7534c438c1 fix cache bust 2026-05-22 11:42:22 -05:00
J. Nick Koston 69efa8ee1a Merge branch 'dev' into cache-split-tests 2026-05-22 11:28:43 -05:00
J. Nick Koston 305b5d6e00 preen 2026-05-22 11:00:46 -05:00
J. Nick Koston d94226260b handle bot review comments 2026-05-22 10:40:23 -05:00
J. Nick Koston ecc8e52f3e make bot happy 2026-05-22 10:33:22 -05:00
J. Nick Koston 5771b0c86c Merge remote-tracking branch 'refs/remotes/upstream/cache-split-tests' into cache-split-tests 2026-05-22 10:31:11 -05:00
J. Nick Koston 3e289da366 drop bad copilot suggest 2026-05-22 10:31:00 -05:00
J. Nick Koston 944fb1ef67 Merge branch 'dev' into cache-split-tests 2026-05-22 10:17:10 -05:00
J. Nick Koston 1b6e9f5094 trim 2026-05-22 09:53:51 -05:00
J. Nick Koston b2257caeb7 touch ups 2026-05-22 09:11:01 -05:00
J. Nick Koston 0ec0ea30ac single pass 2026-05-22 09:06:15 -05:00
J. Nick Koston 584b32c8b3 address copilot, cleanups 2026-05-22 09:01:39 -05:00
J. Nick Koston 4033a8b83a Apply suggestions from code review
Co-authored-by: J. Nick Koston <nick+github@koston.org>
2026-05-22 08:53:47 -05:00
J. Nick Koston add8a5f799 Merge branch 'dev' into cache-split-tests 2026-05-22 08:53:27 -05:00
J. Nick Koston 7c137b5c73 cleanup 2026-05-22 08:34:32 -05:00
J. Nick Koston 4a6c5b5a22 cleanups 2026-05-22 07:46:31 -05:00
J. Nick Koston 1009ce4180 Merge branch 'dev' into cache-split-tests 2026-05-21 23:09:44 -05:00
J. Nick Koston 22fb68b7a1 Revert "DNM: test cache, touch cloud manifest only"
This reverts commit a8bc244a7a.
2026-05-21 16:53:19 -05:00
J. Nick Koston 81e06539e6 Revert "DNM: test cache bust, touch cloud conftest"
This reverts commit 7c18b67b2e.
2026-05-21 16:53:19 -05:00
J. Nick Koston 7c18b67b2e DNM: test cache bust, touch cloud conftest 2026-05-21 16:48:05 -05:00
J. Nick Koston a8bc244a7a DNM: test cache, touch cloud manifest only 2026-05-21 16:44:43 -05:00
J. Nick Koston 5975f4b179 Skip cache walking when --cache is not passed
Address Copilot review feedback on the cache PR:

* Split collect_tests into _collect_tests_uncached (the original
  directory-based pre-cache flow) and _collect_tests_cached (walks
  the tree to build per-file hashes).  Without --cache we now skip
  the walk + per-file hash entirely.
* A single-file root has no ancestor conftests to walk, so the
  conftest_hash would always be empty and stale counts could survive
  a real conftest change; bypass the cache for the file-root case.
* Save the cache file with explicit utf-8 encoding and
  ensure_ascii=False.
2026-05-21 16:08:44 -05:00
J. Nick Koston 9ed16b63a3 Cache per-file test counts in split_tests
Persist the result of pytest --collect-only between CI runs as a JSON
file keyed by content hash, so unchanged test files are served from
cache and only edited or new files are re-collected.  The cache is
self-healing:

* Missing, corrupt, or wrong-version files fall back to a full collect.
* Any conftest.py change anywhere under the test root invalidates the
  whole cache, so fixture parametrization shifts cannot silently skew
  counts.
* Files pytest returns nothing for (helper modules named test_*.py with
  no test functions) are cached as zero so they don't get re-collected
  forever.

Walking is done once with os.walk (~2x faster than Path.rglob) and
collects test files plus conftests in a single pass.  When the cache
is fully cold we feed pytest top-level directories rather than
thousands of file paths so cold runs stay as fast as before the cache
landed.

Wire the new --cache flag through the prepare-pytest-full job and back
the cache file with actions/cache so PRs can pick up the latest dev
snapshot via restore-keys.  Local timings: cold 11s, warm with no diff
0.4s, warm with one file edited 2.3s.
2026-05-21 15:56:08 -05:00
J. Nick Koston 8dadaa2f9e Filter fan-out children and fail fast on empty batch list
Only pass directories and test_*.py files to pytest --collect-only so
helpers like tests/components/conftest.py and tests/components/common.py
are not treated as explicit collection targets, and bail out with a
clear error if no eligible paths are found instead of running pytest
with no arguments.
2026-05-21 15:17:42 -05:00
J. Nick Koston 4f98c71586 Run pytest --collect-only in parallel batches in split_tests
cProfile showed 99.6% of split_tests.py wall time was spent in the
single pytest --collect-only subprocess.  Fan out the collection across
``os.cpu_count()`` workers; round-robin chunking keeps each batch
roughly equal, and tests/components is expanded one level deeper so
the ~1000 integration subdirectories distribute evenly.  Local wall
time dropped from ~132s to ~11s on an 18-core box.  Bucket output is
unchanged because we still parse the same pytest -qq output, just
aggregated from multiple invocations.
2026-05-21 15:10:01 -05:00
51 changed files with 1340 additions and 2045 deletions
+38 -1
View File
@@ -853,12 +853,49 @@ jobs:
key: >-
${{ runner.os }}-${{ runner.arch }}-${{ steps.python.outputs.python-version }}-${{
needs.info.outputs.python_cache_key }}
- name: Restore pytest test counts cache
id: cache-pytest-counts
uses: actions/cache/restore@27d5ce7f107fe9357f9df03efb73ab90386fccae # v5.0.5
with:
path: pytest_test_counts.json
# Primary key is a sentinel; restore-keys pick the most recent
# prefix match since the real (content-addressed) key isn't
# known until split_tests.py runs below.
key: >-
pytest-counts-${{ runner.os }}-${{ runner.arch }}-${{
steps.python.outputs.python-version }}-${{
needs.info.outputs.python_cache_key }}-restore-sentinel
restore-keys: |
pytest-counts-${{ runner.os }}-${{ runner.arch }}-${{ steps.python.outputs.python-version }}-${{ needs.info.outputs.python_cache_key }}-
- name: Run split_tests.py
env:
TEST_GROUP_COUNT: ${{ needs.info.outputs.test_group_count }}
run: |
. venv/bin/activate
python -m script.split_tests ${TEST_GROUP_COUNT} tests
python -m script.split_tests \
--cache pytest_test_counts.json \
${TEST_GROUP_COUNT} tests
- name: Hash pytest test counts cache
id: cache-pytest-counts-hash
run: |
echo "hash=$(sha256sum pytest_test_counts.json | cut -d' ' -f1)" \
>> "$GITHUB_OUTPUT"
- name: Save pytest test counts cache
# Content-addressed key: identical content reuses the same entry.
# Skip the save when the restore already matched that hash.
if: >-
!endsWith(
steps.cache-pytest-counts.outputs.cache-matched-key,
steps.cache-pytest-counts-hash.outputs.hash
)
uses: actions/cache/save@27d5ce7f107fe9357f9df03efb73ab90386fccae # v5.0.5
with:
path: pytest_test_counts.json
key: >-
pytest-counts-${{ runner.os }}-${{ runner.arch }}-${{
steps.python.outputs.python-version }}-${{
needs.info.outputs.python_cache_key }}-${{
steps.cache-pytest-counts-hash.outputs.hash }}
- name: Upload pytest_buckets
uses: actions/upload-artifact@043fb46d1a93c77aae656e7c1c64a875d1fc6a0a # v7.0.1
with:
Generated
+2 -2
View File
@@ -2054,8 +2054,8 @@ CLAUDE.md @home-assistant/core
/tests/components/yamaha_musiccast/ @vigonotion @micha91
/homeassistant/components/yandex_transport/ @rishatik92 @devbis
/tests/components/yandex_transport/ @rishatik92 @devbis
/homeassistant/components/yardian/ @aeon-matrix
/tests/components/yardian/ @aeon-matrix
/homeassistant/components/yardian/ @h3l1o5
/tests/components/yardian/ @h3l1o5
/homeassistant/components/yeelight/ @zewelor @shenxn @starkillerOG @alexyao2015
/tests/components/yeelight/ @zewelor @shenxn @starkillerOG @alexyao2015
/homeassistant/components/yeelightsunflower/ @lindsaymarkward
@@ -51,6 +51,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: S3ConfigEntry) -> bool:
translation_key="invalid_bucket_name",
) from err
except ValueError as err:
# pylint: disable-next=home-assistant-exception-translation-key-missing
raise ConfigEntryError(
translation_domain=DOMAIN,
translation_key="invalid_endpoint_url",
+1 -4
View File
@@ -7,7 +7,7 @@
"cannot_connect": "[%key:component::aws_s3::exceptions::cannot_connect::message%]",
"invalid_bucket_name": "[%key:component::aws_s3::exceptions::invalid_bucket_name::message%]",
"invalid_credentials": "[%key:component::aws_s3::exceptions::invalid_credentials::message%]",
"invalid_endpoint_url": "[%key:component::aws_s3::exceptions::invalid_endpoint_url::message%]"
"invalid_endpoint_url": "Invalid endpoint URL. Please make sure it's a valid AWS S3 endpoint URL."
},
"step": {
"user": {
@@ -48,9 +48,6 @@
},
"invalid_credentials": {
"message": "Bucket cannot be accessed using provided combination of access key ID and secret access key."
},
"invalid_endpoint_url": {
"message": "Invalid endpoint URL. Please make sure it's a valid AWS S3 endpoint URL."
}
}
}
@@ -20,7 +20,7 @@
"bluetooth-adapters==2.3.0",
"bluetooth-auto-recovery==1.6.4",
"bluetooth-data-tools==1.29.18",
"dbus-fast==5.0.15",
"habluetooth==6.7.9"
"dbus-fast==5.0.14",
"habluetooth==6.7.4"
]
}
@@ -38,9 +38,6 @@ from homeassistant.const import (
from homeassistant.core import Event, HomeAssistant, ServiceCall, callback
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import config_validation as cv, entity_registry as er
from homeassistant.helpers.entity_platform import (
async_create_platform_config_not_supported_issue,
)
from homeassistant.helpers.event import (
async_track_time_interval,
async_track_utc_time_change,
@@ -382,8 +379,8 @@ async def async_extract_config(
if platform.type == PLATFORM_TYPE_LEGACY:
legacy.append(platform)
else:
async_create_platform_config_not_supported_issue(
hass, platform.name, DOMAIN
raise ValueError(
f"Unable to determine type for {platform.name}: {platform.type}"
)
return legacy
@@ -19,7 +19,7 @@ DEFAULT_STT_PROMPT = "Transcribe the attached audio"
CONF_RECOMMENDED = "recommended"
CONF_CHAT_MODEL = "chat_model"
RECOMMENDED_CHAT_MODEL = "models/gemini-3.1-flash-lite"
RECOMMENDED_CHAT_MODEL = "models/gemini-2.5-flash"
RECOMMENDED_STT_MODEL = RECOMMENDED_CHAT_MODEL
RECOMMENDED_TTS_MODEL = "models/gemini-2.5-flash-preview-tts"
RECOMMENDED_IMAGE_MODEL = "models/gemini-2.5-flash-image"
@@ -2,7 +2,7 @@
from collections.abc import Callable
from dataclasses import dataclass
from typing import Any, override
from typing import Any
from incomfortclient import Heater as InComfortHeater
@@ -97,13 +97,11 @@ class IncomfortBinarySensor(IncomfortBoilerEntity, BinarySensorEntity):
self._attr_unique_id = f"{heater.serial_no}_{description.key}"
@property
@override
def is_on(self) -> bool:
"""Return the status of the sensor."""
return bool(self._heater.status[self.entity_description.value_key])
@property
@override
def extra_state_attributes(self) -> dict[str, Any] | None:
"""Return the device state attributes."""
if (attributes_fn := self.entity_description.extra_state_attributes_fn) is None:
@@ -1,6 +1,6 @@
"""Support for an Intergas boiler via an InComfort/InTouch Lan2RF gateway."""
from typing import Any, override
from typing import Any
from incomfortclient import Heater as InComfortHeater, Room as InComfortRoom
@@ -76,19 +76,16 @@ class InComfortClimate(IncomfortEntity, ClimateEntity):
)
@property
@override
def extra_state_attributes(self) -> dict[str, Any]:
"""Return the device state attributes."""
return {"status": self._room.status}
@property
@override
def current_temperature(self) -> float | None:
"""Return the current temperature."""
return self._room.room_temp
@property
@override
def hvac_action(self) -> HVACAction | None:
"""Return the actual current HVAC action."""
if self._heater.is_burning and self._heater.is_pumping:
@@ -96,7 +93,6 @@ class InComfortClimate(IncomfortEntity, ClimateEntity):
return HVACAction.IDLE
@property
@override
def target_temperature(self) -> float | None:
"""Return the (override)temperature we try to reach.
@@ -110,13 +106,11 @@ class InComfortClimate(IncomfortEntity, ClimateEntity):
return self._room.setpoint
return self._room.override or self._room.setpoint
@override
async def async_set_temperature(self, **kwargs: Any) -> None:
"""Set a new target temperature for this zone."""
temperature: float = kwargs[ATTR_TEMPERATURE]
await self._room.set_override(temperature)
await self.coordinator.async_refresh()
@override
async def async_set_hvac_mode(self, hvac_mode: HVACMode) -> None:
"""Set new target hvac mode."""
@@ -2,7 +2,7 @@
from collections.abc import Mapping
import logging
from typing import Any, override
from typing import Any
from incomfortclient import InvalidGateway, InvalidHeaterList
import voluptuous as vol
@@ -100,7 +100,6 @@ class InComfortConfigFlow(ConfigFlow, domain=DOMAIN):
_discovered_host: str
@override
@staticmethod
@callback
def async_get_options_flow(
@@ -109,7 +108,6 @@ class InComfortConfigFlow(ConfigFlow, domain=DOMAIN):
"""Get the options flow for this handler."""
return InComfortOptionsFlowHandler()
@override
async def async_step_dhcp(
self, discovery_info: DhcpServiceInfo
) -> ConfigFlowResult:
@@ -171,7 +169,6 @@ class InComfortConfigFlow(ConfigFlow, domain=DOMAIN):
description_placeholders={CONF_HOST: self._discovered_host},
)
@override
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
@@ -3,7 +3,7 @@
from dataclasses import dataclass, field
from datetime import timedelta
import logging
from typing import Any, override
from typing import Any
from aiohttp import ClientResponseError
from incomfortclient import (
@@ -74,7 +74,6 @@ class InComfortDataCoordinator(DataUpdateCoordinator[InComfortData]):
)
self.incomfort_data = incomfort_data
@override
async def _async_update_data(self) -> InComfortData:
"""Fetch data from API endpoint."""
try:
+1 -3
View File
@@ -1,7 +1,7 @@
"""Support for an Intergas heater via an InComfort/InTouch Lan2RF gateway."""
from dataclasses import dataclass
from typing import Any, override
from typing import Any
from incomfortclient import Heater as InComfortHeater
@@ -104,13 +104,11 @@ class IncomfortSensor(IncomfortBoilerEntity, SensorEntity):
self._attr_unique_id = f"{heater.serial_no}_{description.key}"
@property
@override
def native_value(self) -> StateType:
"""Return the state of the sensor."""
return self._heater.status[self.entity_description.value_key] # type: ignore [no-any-return]
@property
@override
def extra_state_attributes(self) -> dict[str, Any] | None:
"""Return the device state attributes."""
if (extra_key := self.entity_description.extra_key) is None:
@@ -1,7 +1,7 @@
"""Support for an Intergas boiler via an InComfort/Intouch Lan2RF gateway."""
import logging
from typing import Any, override
from typing import Any
from incomfortclient import Heater as InComfortHeater
@@ -49,13 +49,11 @@ class IncomfortWaterHeater(IncomfortBoilerEntity, WaterHeaterEntity):
self._attr_unique_id = heater.serial_no
@property
@override
def extra_state_attributes(self) -> dict[str, Any]:
"""Return the device state attributes."""
return {k: v for k, v in self._heater.status.items() if k in HEATER_ATTRS}
@property
@override
def current_temperature(self) -> float | None:
"""Return the current temperature."""
if self._heater.is_tapping:
@@ -69,7 +67,6 @@ class IncomfortWaterHeater(IncomfortBoilerEntity, WaterHeaterEntity):
return max(self._heater.heater_temp, self._heater.tap_temp)
@property
@override
def current_operation(self) -> str | None:
"""Return the current operation mode."""
return self._heater.display_text
@@ -8,6 +8,8 @@ import datetime
from functools import partial
from random import random
import voluptuous as vol
from homeassistant.components.labs import (
EventLabsUpdatedData,
async_is_preview_feature_enabled,
@@ -32,7 +34,7 @@ from homeassistant.const import (
UnitOfTemperature,
UnitOfVolume,
)
from homeassistant.core import HomeAssistant
from homeassistant.core import HomeAssistant, ServiceCall, ServiceResponse, callback
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.device_registry import DeviceEntry
from homeassistant.helpers.issue_registry import (
@@ -49,11 +51,9 @@ from homeassistant.util.unit_conversion import (
)
from .const import DATA_BACKUP_AGENT_LISTENERS, DOMAIN
from .services import async_setup_services
COMPONENTS_WITH_DEMO_PLATFORM = [
Platform.BUTTON,
Platform.DEVICE_TRACKER,
Platform.FAN,
Platform.EVENT,
Platform.IMAGE,
@@ -69,6 +69,15 @@ COMPONENTS_WITH_DEMO_PLATFORM = [
CONFIG_SCHEMA = cv.empty_config_schema(DOMAIN)
SCHEMA_SERVICE_TEST_SERVICE_1 = vol.Schema(
{
vol.Required("field_1"): vol.Coerce(int),
vol.Required("field_2"): vol.In(["off", "auto", "cool"]),
vol.Optional("field_3"): vol.Coerce(int),
vol.Optional("field_4"): vol.In(["forwards", "reverse"]),
}
)
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the demo environment."""
@@ -78,7 +87,24 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
)
)
async_setup_services(hass)
@callback
def service_handler(call: ServiceCall | None = None) -> ServiceResponse:
"""Do nothing."""
return None
hass.services.async_register(
DOMAIN,
"test_service_1",
service_handler,
SCHEMA_SERVICE_TEST_SERVICE_1,
description_placeholders={
"meep_1": "foo",
"meep_2": "bar",
"meep_3": "beer",
"meep_4": "milk",
"meep_5": "https://example.com",
},
)
return True
@@ -1,97 +0,0 @@
"""Demo platform that has a couple of fake device trackers."""
from homeassistant.components.device_tracker import (
BaseScannerEntity,
SourceType,
TrackerEntity,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
async def async_setup_entry(
hass: HomeAssistant,
config_entry: ConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up the Everything but the Kitchen Sink config entry."""
async_add_entities(
[
DemoTracker(
unique_id="kitchen_sink_tracker_001",
name="Demo tracker",
latitude=hass.config.latitude,
longitude=hass.config.longitude,
accuracy=10,
),
DemoScanner(
unique_id="kitchen_sink_scanner_001",
name="Demo scanner",
is_connected=True,
),
]
)
class DemoTracker(TrackerEntity):
"""Representation of a demo tracker."""
_attr_should_poll = False
_attr_source_type = SourceType.GPS
def __init__(
self,
*,
unique_id: str,
name: str,
latitude: float | None,
longitude: float | None,
accuracy: float,
) -> None:
"""Initialize the tracker."""
self._attr_unique_id = unique_id
self._attr_name = name
self._attr_latitude = latitude
self._attr_longitude = longitude
self._attr_location_accuracy = accuracy
@callback
def async_set_tracker_location(
self, latitude: float, longitude: float, accuracy: float
) -> None:
"""Update the tracker location."""
self._attr_latitude = latitude
self._attr_longitude = longitude
self._attr_location_accuracy = accuracy
self.async_write_ha_state()
class DemoScanner(BaseScannerEntity):
"""Representation of a demo scanner."""
_attr_should_poll = False
_attr_source_type = SourceType.ROUTER
def __init__(
self,
*,
unique_id: str,
name: str,
is_connected: bool,
) -> None:
"""Initialize the scanner."""
self._attr_unique_id = unique_id
self._attr_name = name
self._is_connected = is_connected
@property
def is_connected(self) -> bool:
"""Return true if the device is connected."""
return self._is_connected
@callback
def async_set_scanner_connected(self, connected: bool) -> None:
"""Update the scanner connected state."""
self._is_connected = connected
self.async_write_ha_state()
@@ -9,12 +9,6 @@
}
},
"services": {
"set_scanner_connected": {
"service": "mdi:lan-connect"
},
"set_tracker_location": {
"service": "mdi:map-marker"
},
"test_service_1": {
"sections": {
"additional_fields": "mdi:test-tube"
@@ -1,72 +0,0 @@
"""Services for the Everything but the Kitchen Sink integration."""
import voluptuous as vol
from homeassistant.components.device_tracker import DOMAIN as DEVICE_TRACKER_DOMAIN
from homeassistant.const import ATTR_LATITUDE, ATTR_LONGITUDE
from homeassistant.core import HomeAssistant, ServiceCall, ServiceResponse, callback
from homeassistant.helpers import config_validation as cv, service
from .const import DOMAIN
SCHEMA_SERVICE_TEST_SERVICE_1 = vol.Schema(
{
vol.Required("field_1"): vol.Coerce(int),
vol.Required("field_2"): vol.In(["off", "auto", "cool"]),
vol.Optional("field_3"): vol.Coerce(int),
vol.Optional("field_4"): vol.In(["forward", "reverse"]),
}
)
SERVICE_TEST_SERVICE_1 = "test_service_1"
SERVICE_SET_TRACKER_LOCATION = "set_tracker_location"
SERVICE_SET_SCANNER_CONNECTED = "set_scanner_connected"
ATTR_ACCURACY = "accuracy"
ATTR_CONNECTED = "connected"
@callback
def async_setup_services(hass: HomeAssistant) -> None:
"""Register services for the Kitchen Sink integration."""
@callback
def service_handler(call: ServiceCall) -> ServiceResponse:
"""Do nothing."""
return None
hass.services.async_register(
DOMAIN,
SERVICE_TEST_SERVICE_1,
service_handler,
SCHEMA_SERVICE_TEST_SERVICE_1,
description_placeholders={
"meep_1": "foo",
"meep_2": "bar",
"meep_3": "beer",
"meep_4": "milk",
"meep_5": "https://example.com",
},
)
service.async_register_platform_entity_service(
hass,
DOMAIN,
SERVICE_SET_TRACKER_LOCATION,
entity_domain=DEVICE_TRACKER_DOMAIN,
schema={
vol.Required(ATTR_LATITUDE): cv.latitude,
vol.Required(ATTR_LONGITUDE): cv.longitude,
vol.Required(ATTR_ACCURACY): vol.All(vol.Coerce(float), vol.Range(min=0)),
},
func="async_set_tracker_location",
)
service.async_register_platform_entity_service(
hass,
DOMAIN,
SERVICE_SET_SCANNER_CONNECTED,
entity_domain=DEVICE_TRACKER_DOMAIN,
schema={vol.Required(ATTR_CONNECTED): cv.boolean},
func="async_set_scanner_connected",
)
@@ -30,44 +30,3 @@ test_service_1:
options:
- "forward"
- "reverse"
set_tracker_location:
target:
entity:
integration: kitchen_sink
domain: device_tracker
fields:
latitude:
required: true
example: 52.379189
selector:
number:
min: -90
max: 90
step: any
longitude:
required: true
example: 4.899431
selector:
number:
min: -180
max: 180
step: any
accuracy:
required: true
example: 10
selector:
number:
min: 0
max: 10000
unit_of_measurement: m
set_scanner_connected:
target:
entity:
integration: kitchen_sink
domain: device_tracker
fields:
connected:
required: true
example: true
selector:
boolean:
@@ -135,34 +135,6 @@
}
},
"services": {
"set_scanner_connected": {
"description": "Sets the connected state of a demo scanner entity.",
"fields": {
"connected": {
"description": "Whether the device should be reported as connected.",
"name": "Connected"
}
},
"name": "Set scanner connected"
},
"set_tracker_location": {
"description": "Sets the location and accuracy of a demo tracker entity.",
"fields": {
"accuracy": {
"description": "Location accuracy in meters.",
"name": "Accuracy"
},
"latitude": {
"description": "Latitude of the new location.",
"name": "Latitude"
},
"longitude": {
"description": "Longitude of the new location.",
"name": "Longitude"
}
},
"name": "Set tracker location"
},
"test_service_1": {
"description": "Fake action for testing {meep_2}",
"fields": {
+73 -103
View File
@@ -1,5 +1,7 @@
"""Support for OPNsense Routers."""
import logging
from aiopnsense import (
OPNsenseBelowMinFirmware,
OPNsenseClient,
@@ -13,16 +15,22 @@ from aiopnsense import (
)
import voluptuous as vol
from homeassistant.config_entries import SOURCE_IMPORT
from homeassistant.const import CONF_API_KEY, CONF_URL, CONF_VERIFY_SSL, Platform
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryError, ConfigEntryNotReady
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.aiohttp_client import async_get_clientsession
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.discovery import load_platform
from homeassistant.helpers.typing import ConfigType
from .const import CONF_API_SECRET, CONF_TRACKER_INTERFACES, DOMAIN
from .types import OPNsenseConfigEntry, OPNsenseRuntimeData
from .const import (
CONF_API_SECRET,
CONF_INTERFACE_CLIENT,
CONF_TRACKER_INTERFACES,
DOMAIN,
OPNSENSE_DATA,
)
_LOGGER = logging.getLogger(__name__)
CONFIG_SCHEMA = vol.Schema(
{
@@ -41,124 +49,86 @@ CONFIG_SCHEMA = vol.Schema(
extra=vol.ALLOW_EXTRA,
)
PLATFORMS = [Platform.DEVICE_TRACKER]
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the OPNsense component."""
if DOMAIN not in config:
return True
"""Set up the opnsense component."""
hass.async_create_task(_async_setup(hass, config))
conf = config[DOMAIN]
url = conf[CONF_URL]
api_key = conf[CONF_API_KEY]
api_secret = conf[CONF_API_SECRET]
verify_ssl = conf[CONF_VERIFY_SSL]
tracker_interfaces = conf[CONF_TRACKER_INTERFACES]
return True
async def _async_setup(hass: HomeAssistant, config: ConfigType) -> None:
"""Set up the OPNsense component from YAML."""
await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_IMPORT},
data=config[DOMAIN],
)
async def async_setup_entry(
hass: HomeAssistant, config_entry: OPNsenseConfigEntry
) -> bool:
"""Set up the OPNsense component from a config entry."""
url = config_entry.data[CONF_URL]
session = async_get_clientsession(
hass, verify_ssl=config_entry.data[CONF_VERIFY_SSL]
)
session = async_get_clientsession(hass, verify_ssl=verify_ssl)
client = OPNsenseClient(
url,
config_entry.data[CONF_API_KEY],
config_entry.data[CONF_API_SECRET],
api_key,
api_secret,
session,
opts={"verify_ssl": config_entry.data[CONF_VERIFY_SSL]},
opts={"verify_ssl": verify_ssl},
)
tracker_interfaces = config_entry.data.get(CONF_TRACKER_INTERFACES, [])
try:
await client.validate()
if tracker_interfaces:
interfaces_resp = await client.get_interfaces()
except OPNsenseUnknownFirmware as err:
raise ConfigEntryError(
translation_domain=DOMAIN,
translation_key="unknown_firmware",
translation_placeholders={"url": url},
) from err
except OPNsenseBelowMinFirmware as err:
raise ConfigEntryError(
translation_domain=DOMAIN,
translation_key="firmware_too_old",
translation_placeholders={"url": url},
) from err
except OPNsenseInvalidURL as err:
raise ConfigEntryError(
translation_domain=DOMAIN,
translation_key="invalid_url",
translation_placeholders={"url": url},
) from err
except OPNsenseTimeoutError as err:
raise ConfigEntryNotReady(
translation_domain=DOMAIN,
translation_key="timeout_connecting",
translation_placeholders={"url": url},
) from err
except OPNsenseSSLError as err:
raise ConfigEntryError(
translation_domain=DOMAIN,
translation_key="ssl_error",
translation_placeholders={"url": url},
) from err
except OPNsenseInvalidAuth as err:
raise ConfigEntryError(
translation_domain=DOMAIN,
translation_key="invalid_auth",
translation_placeholders={"url": url},
) from err
except OPNsensePrivilegeMissing as err:
raise ConfigEntryError(
translation_domain=DOMAIN,
translation_key="privilege_missing",
translation_placeholders={"url": url},
) from err
except OPNsenseConnectionError as err:
raise ConfigEntryNotReady(
translation_domain=DOMAIN,
translation_key="cannot_connect",
translation_placeholders={"url": url},
) from err
except OPNsenseUnknownFirmware:
_LOGGER.error("Error checking the OPNsense firmware version at %s", url)
return False
except OPNsenseBelowMinFirmware:
_LOGGER.error(
"OPNsense Firmware is below the minimum supported version at %s", url
)
return False
except OPNsenseInvalidURL:
_LOGGER.error(
"Invalid URL while connecting to OPNsense API endpoint at %s", url
)
return False
except OPNsenseTimeoutError:
_LOGGER.error("Timeout while connecting to OPNsense API endpoint at %s", url)
return False
except OPNsenseSSLError:
_LOGGER.error(
"Unable to verify SSL while connecting to OPNsense API endpoint at %s", url
)
return False
except OPNsenseInvalidAuth:
_LOGGER.error(
"Authentication failure while connecting to OPNsense API endpoint at %s",
url,
)
return False
except OPNsensePrivilegeMissing:
_LOGGER.error(
"Invalid Permissions while connecting to OPNsense API endpoint at %s",
url,
)
return False
except OPNsenseConnectionError:
_LOGGER.error(
"Connection failure while connecting to OPNsense API endpoint at %s",
url,
)
return False
if tracker_interfaces:
# Verify that specified tracker interfaces are valid
known_interfaces = [
name for ifinfo in interfaces_resp.values() if (name := ifinfo.get("name"))
ifinfo.get("name", "") for ifinfo in interfaces_resp.values()
]
for intf_description in tracker_interfaces:
if intf_description not in known_interfaces:
raise ConfigEntryError(
translation_domain=DOMAIN,
translation_key="tracker_interface_not_found",
translation_placeholders={
"interface": intf_description,
"known": ", ".join(known_interfaces),
},
_LOGGER.error(
"Specified OPNsense tracker interface %s is not found",
intf_description,
)
return False
config_entry.runtime_data = OPNsenseRuntimeData(
client=client,
tracker_interfaces=tracker_interfaces,
)
hass.data[OPNSENSE_DATA] = {
CONF_INTERFACE_CLIENT: client,
CONF_TRACKER_INTERFACES: tracker_interfaces,
}
await hass.config_entries.async_forward_entry_setups(config_entry, PLATFORMS)
load_platform(hass, Platform.DEVICE_TRACKER, DOMAIN, tracker_interfaces, config)
return True
async def async_unload_entry(
hass: HomeAssistant, config_entry: OPNsenseConfigEntry
) -> bool:
"""Unload a config entry."""
return await hass.config_entries.async_unload_platforms(config_entry, PLATFORMS)
@@ -1,315 +0,0 @@
"""Config flow for OPNsense."""
import logging
from typing import Any
from aiopnsense import (
OPNsenseBelowMinFirmware,
OPNsenseClient,
OPNsenseConnectionError,
OPNsenseInvalidAuth,
OPNsenseInvalidURL,
OPNsensePrivilegeMissing,
OPNsenseSSLError,
OPNsenseTimeoutError,
OPNsenseUnknownFirmware,
)
import voluptuous as vol
from homeassistant.config_entries import ConfigFlow, ConfigFlowResult
from homeassistant.const import CONF_API_KEY, CONF_URL, CONF_VERIFY_SSL
from homeassistant.core import DOMAIN as HOMEASSISTANT_DOMAIN
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.issue_registry import (
IssueSeverity,
async_create_issue,
async_delete_issue,
)
from homeassistant.helpers.selector import (
SelectSelector,
SelectSelectorConfig,
SelectSelectorMode,
)
from .const import CONF_API_SECRET, CONF_TRACKER_INTERFACES, DOMAIN
_LOGGER = logging.getLogger(__name__)
STEP_USER_DATA_SCHEMA = vol.Schema(
{
vol.Required(CONF_URL): str,
vol.Required(CONF_API_KEY): str,
vol.Required(CONF_API_SECRET): str,
vol.Required(CONF_VERIFY_SSL, default=True): bool,
}
)
def tracker_interfaces_schema(
interfaces: list[str], selected: list[str] | None = None
) -> vol.Schema:
"""Schema to display available interfaces for device tracking selection."""
return vol.Schema(
{
vol.Optional(
CONF_TRACKER_INTERFACES,
default=selected or [],
): SelectSelector(
SelectSelectorConfig(
options=interfaces, mode=SelectSelectorMode.DROPDOWN, multiple=True
)
),
}
)
class OPNsenseConfigFlow(ConfigFlow, domain=DOMAIN):
"""OPNsense config flow."""
def __init__(self) -> None:
"""Initialize OPNsense config flow."""
self.available_interfaces: list[str] | None = None
self._entry_data: dict[str, Any] = {}
async def _show_setup_form(
self,
user_input: dict[Any, Any] | None = None,
errors: dict[Any, Any] | None = None,
) -> ConfigFlowResult:
"""Show the setup form to the user."""
if user_input is None:
user_input = {}
description_placeholders = {
"doc_url": "https://www.home-assistant.io/integrations/opnsense/"
}
return self.async_show_form(
step_id="user",
data_schema=self.add_suggested_values_to_schema(
STEP_USER_DATA_SCHEMA, user_input
),
errors=errors or {},
description_placeholders=description_placeholders,
)
async def _show_interfaces_form(
self,
user_input: dict[Any, Any],
errors: dict[Any, Any] | None = None,
) -> ConfigFlowResult:
"""Show the tracker interfaces selection form to the user."""
return self.async_show_form(
step_id="interfaces",
data_schema=self.add_suggested_values_to_schema(
tracker_interfaces_schema(
self.available_interfaces or [],
user_input.get(CONF_TRACKER_INTERFACES),
),
user_input,
),
errors=errors or {},
)
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle user step: credentials and connection test."""
errors = {}
if user_input is None:
return await self._show_setup_form(user_input, None)
verify_ssl = user_input[CONF_VERIFY_SSL]
session = async_get_clientsession(self.hass, verify_ssl=verify_ssl)
client = OPNsenseClient(
user_input[CONF_URL],
user_input[CONF_API_KEY],
user_input[CONF_API_SECRET],
session,
opts={"verify_ssl": verify_ssl},
)
try:
await client.validate()
interfaces_resp = await client.get_interfaces()
known_interfaces = [
name
for ifinfo in interfaces_resp.values()
if (name := ifinfo.get("name"))
]
self.available_interfaces = list(known_interfaces)
except OPNsenseInvalidAuth:
errors["base"] = "invalid_auth"
except OPNsensePrivilegeMissing:
errors["base"] = "privilege_missing"
except OPNsenseInvalidURL:
errors["base"] = "invalid_url"
except OPNsenseSSLError:
errors["base"] = "ssl_error"
except OPNsenseConnectionError, OPNsenseTimeoutError:
errors["base"] = "cannot_connect"
except OPNsenseUnknownFirmware:
errors["base"] = "unknown_version"
except OPNsenseBelowMinFirmware:
errors["base"] = "invalid_version"
except Exception:
_LOGGER.exception("Unexpected exception")
errors["base"] = "unknown"
else:
unique_id = await client.get_device_unique_id()
if not unique_id:
return self.async_abort(reason="no_unique_id")
await self.async_set_unique_id(unique_id)
self._abort_if_unique_id_configured()
self._entry_data = user_input
return await self.async_step_interfaces()
return await self._show_setup_form(user_input, errors)
async def async_step_interfaces(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle tracker interface selection step."""
if user_input is None:
return await self._show_interfaces_form({}, None)
if user_input.get(CONF_TRACKER_INTERFACES):
self._entry_data[CONF_TRACKER_INTERFACES] = user_input[
CONF_TRACKER_INTERFACES
]
return self.async_create_entry(
title=self._entry_data[CONF_URL], data=self._entry_data
)
async def async_step_import(self, import_data: dict[str, Any]) -> ConfigFlowResult:
"""Import a Yaml config."""
# Test connection
session = async_get_clientsession(
self.hass, verify_ssl=import_data[CONF_VERIFY_SSL]
)
client = OPNsenseClient(
import_data[CONF_URL],
import_data[CONF_API_KEY],
import_data[CONF_API_SECRET],
session,
opts={"verify_ssl": import_data[CONF_VERIFY_SSL]},
)
try:
await client.validate()
interfaces_resp = await client.get_interfaces()
except OPNsenseInvalidURL:
return self._abort_import(reason="invalid_url")
except OPNsenseInvalidAuth:
return self._abort_import(reason="invalid_auth")
except OPNsensePrivilegeMissing:
return self._abort_import(reason="privilege_missing")
except OPNsenseSSLError:
return self._abort_import(reason="ssl_error")
except OPNsenseConnectionError, OPNsenseTimeoutError:
return self._abort_import(reason="cannot_connect")
except OPNsenseUnknownFirmware:
return self._abort_import(reason="unknown_version")
except OPNsenseBelowMinFirmware:
return self._abort_import(reason="invalid_version")
except Exception: # Allowed in config flows
_LOGGER.exception("Unexpected exception during import")
return self._abort_import(reason="unknown")
async_create_issue(
self.hass,
HOMEASSISTANT_DOMAIN,
f"deprecated_yaml_{DOMAIN}",
breaks_in_ha_version="2026.12.0",
is_fixable=False,
issue_domain=DOMAIN,
severity=IssueSeverity.WARNING,
translation_key="deprecated_yaml",
translation_placeholders={
"domain": DOMAIN,
"integration_title": "OPNsense",
},
)
unique_id = await client.get_device_unique_id()
if not unique_id:
return self._abort_import(reason="no_unique_id")
await self.async_set_unique_id(unique_id)
self._abort_if_unique_id_configured()
# Validate CONF_TRACKER_INTERFACES if present and not empty
verified_data = dict(import_data)
if CONF_TRACKER_INTERFACES in verified_data:
if not verified_data[CONF_TRACKER_INTERFACES]:
verified_data.pop(CONF_TRACKER_INTERFACES)
else:
known_interfaces = [
name
for ifinfo in interfaces_resp.values()
if (name := ifinfo.get("name"))
]
self.available_interfaces = sorted(known_interfaces)
# Abort import if any specified tracker interface is not found
missing = [
intf_description
for intf_description in verified_data[CONF_TRACKER_INTERFACES]
if intf_description not in known_interfaces
]
if missing:
# Create a repair to guide the user
async_create_issue(
self.hass,
DOMAIN,
"import_failed_missing_interfaces",
breaks_in_ha_version="2026.12.0",
is_fixable=False,
severity=IssueSeverity.ERROR,
translation_key="import_failed_missing_interfaces",
translation_placeholders={
"missing": ", ".join(missing),
"found": ", ".join(known_interfaces),
"integration_title": "OPNsense",
},
)
return self.async_abort(
reason="import_failed_missing_interfaces",
description_placeholders={
"missing": ", ".join(missing),
"found": ", ".join(known_interfaces),
"integration_title": "OPNsense",
},
)
# Clear any previous import issues if interfaces are now valid
async_delete_issue(
self.hass,
DOMAIN,
"import_failed_missing_interfaces",
)
return self.async_create_entry(
title=verified_data[CONF_URL], data=verified_data
)
def _abort_import(self, reason: str) -> ConfigFlowResult:
"""Create an issue for import errors and abort the import."""
async_create_issue(
self.hass,
DOMAIN,
f"import_failed_{reason}",
breaks_in_ha_version="2026.12.0",
is_fixable=False,
severity=IssueSeverity.ERROR,
translation_key=f"import_failed_{reason}",
translation_placeholders={
"domain": DOMAIN,
"integration_title": "OPNsense",
},
)
return self.async_abort(
reason=reason,
description_placeholders={
"integration_title": "OPNsense",
},
)
+2 -5
View File
@@ -1,11 +1,8 @@
"""Constants for OPNsense component."""
from datetime import timedelta
DOMAIN = "opnsense"
OPNSENSE_DATA = DOMAIN
CONF_API_SECRET = "api_secret"
CONF_INTERFACE_CLIENT = "interface_client"
CONF_TRACKER_INTERFACES = "tracker_interfaces"
# Update interval for device scanning
SCAN_INTERVAL = timedelta(seconds=30)
@@ -1,80 +0,0 @@
"""Coordinator for OPNsense device tracker updates."""
import logging
from aiopnsense import (
OPNsenseBelowMinFirmware,
OPNsenseClient,
OPNsenseConnectionError,
OPNsenseInvalidAuth,
OPNsenseInvalidURL,
OPNsensePrivilegeMissing,
OPNsenseSSLError,
OPNsenseTimeoutError,
OPNsenseUnknownFirmware,
)
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryError
from homeassistant.helpers.device_registry import format_mac
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import SCAN_INTERVAL
from .types import DeviceDetails, DeviceDetailsByMAC, OPNsenseConfigEntry
_LOGGER = logging.getLogger(__name__)
class OPNsenseDeviceTrackerCoordinator(DataUpdateCoordinator[DeviceDetailsByMAC]):
"""Coordinator for OPNsense device tracker updates."""
def __init__(
self,
hass: HomeAssistant,
config_entry: OPNsenseConfigEntry,
client: OPNsenseClient,
interfaces: list[str],
) -> None:
"""Initialize the coordinator."""
super().__init__(
hass,
_LOGGER,
name="OPNsense Device Tracker",
update_interval=SCAN_INTERVAL,
config_entry=config_entry,
)
self.client = client
self.interfaces = interfaces
self.tracked_devices: set[str] = set()
def _get_mac_addrs(self, devices: list[DeviceDetails]) -> DeviceDetailsByMAC:
"""Create dict with mac address keys from list of devices."""
out_devices: DeviceDetailsByMAC = {}
for device in devices:
if not self.interfaces or device["intf_description"] in self.interfaces:
formatted_mac = format_mac(device["mac"])
out_devices[formatted_mac] = device
return out_devices
async def _async_update_data(self) -> DeviceDetailsByMAC:
"""Fetch data from OPNsense."""
try:
devices = await self.client.get_arp_table(True)
except (
OPNsenseInvalidAuth,
OPNsenseInvalidURL,
OPNsensePrivilegeMissing,
OPNsenseSSLError,
OPNsenseBelowMinFirmware,
OPNsenseUnknownFirmware,
) as err:
raise ConfigEntryError(f"Error with OPNsense configuration: {err}") from err
except (
OPNsenseConnectionError,
OPNsenseTimeoutError,
) as err:
raise UpdateFailed(
f"Error communicating with OPNsense router: {err}"
) from err
return self._get_mac_addrs(devices)
@@ -1,117 +1,71 @@
"""Device tracker support for OPNsense routers."""
from typing import Any
from typing import Any, NewType
from homeassistant.components.device_tracker import ScannerEntity
from aiopnsense import OPNsenseClient
from homeassistant.components.device_tracker import DeviceScanner
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from homeassistant.helpers.typing import ConfigType
from .coordinator import OPNsenseDeviceTrackerCoordinator
from .types import DeviceDetails, OPNsenseConfigEntry
from .const import CONF_INTERFACE_CLIENT, CONF_TRACKER_INTERFACES, OPNSENSE_DATA
DeviceDetails = NewType("DeviceDetails", dict[str, Any])
DeviceDetailsByMAC = NewType("DeviceDetailsByMAC", dict[str, DeviceDetails])
async def async_setup_entry(
hass: HomeAssistant,
entry: OPNsenseConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up device tracker for OPNsense component."""
client = entry.runtime_data.client
interfaces = entry.runtime_data.tracker_interfaces
coordinator = OPNsenseDeviceTrackerCoordinator(hass, entry, client, interfaces)
def _async_add_new_entities() -> None:
"""Add entities for newly discovered devices."""
if not coordinator.data:
return
entities = []
for mac_address in coordinator.data:
if mac_address in coordinator.tracked_devices:
continue
entity = OPNsenseDeviceTrackerEntity(coordinator, mac_address)
coordinator.tracked_devices.add(mac_address)
entities.append(entity)
if entities:
async_add_entities(entities)
entry.async_on_unload(coordinator.async_add_listener(_async_add_new_entities))
# Initial data fetch
await coordinator.async_config_entry_first_refresh()
_async_add_new_entities()
async def async_get_scanner(
hass: HomeAssistant, config: ConfigType
) -> DeviceScanner | None:
"""Configure the OPNsense device_tracker."""
return OPNsenseDeviceScanner(
hass.data[OPNSENSE_DATA][CONF_INTERFACE_CLIENT],
hass.data[OPNSENSE_DATA][CONF_TRACKER_INTERFACES],
)
class OPNsenseDeviceTrackerEntity(
CoordinatorEntity[OPNsenseDeviceTrackerCoordinator], ScannerEntity
):
"""Representation of a tracked device."""
class OPNsenseDeviceScanner(DeviceScanner):
"""This class queries a router running OPNsense."""
def __init__(
self,
coordinator: OPNsenseDeviceTrackerCoordinator,
mac_address: str,
) -> None:
"""Initialize the device tracker entity."""
super().__init__(coordinator)
self._attr_mac_address = mac_address
def __init__(self, client: OPNsenseClient, interfaces: list[str]) -> None:
"""Initialize the scanner."""
self.last_results: dict[str, Any] = {}
self.client = client
self.interfaces = interfaces
@property
def device_data(self) -> DeviceDetails | None:
"""Return device data for current device."""
if self.coordinator.data and self.mac_address in self.coordinator.data:
return self.coordinator.data[self.mac_address]
return None
def _get_mac_addrs(self, devices: list[DeviceDetails]) -> DeviceDetailsByMAC | dict:
"""Create dict with mac address keys from list of devices."""
out_devices = {}
for device in devices:
if not self.interfaces or device["intf_description"] in self.interfaces:
out_devices[device["mac"]] = device
return out_devices
@property
def is_connected(self) -> bool:
"""Return true if the device is connected to the network."""
return (
self.coordinator.data is not None
and self.mac_address in self.coordinator.data
)
async def async_scan_devices(self) -> list[str]:
"""Scan for new devices and return a list with found device IDs."""
await self._async_update_info()
return list(self.last_results)
@property
def name(self) -> str:
"""Return device name."""
device_data = self.device_data
if device_data and device_data.get("hostname"):
return str(device_data["hostname"])
return f"OPNsense {self.mac_address}"
def get_device_name(self, device: str) -> str | None:
"""Return the name of the given device or None if we don't know."""
if device not in self.last_results:
return None
return self.last_results[device].get("hostname") or None
@property
def ip_address(self) -> str | None:
"""Return the primary IP address of the device."""
device_data = self.device_data
if device_data:
return device_data.get("ip")
return None
async def _async_update_info(self) -> bool:
"""Ensure the information from the OPNsense router is up to date.
@property
def hostname(self) -> str | None:
"""Return hostname of the device."""
device_data = self.device_data
if device_data:
hostname = device_data.get("hostname")
return hostname or None
return None
Return boolean if scanning successful.
"""
devices = await self.client.get_arp_table(True)
self.last_results = self._get_mac_addrs(devices)
return True
@property
def extra_state_attributes(self) -> dict[str, Any]:
"""Return the state attributes."""
device_data = self.device_data
if not device_data:
def get_extra_attributes(self, device: str) -> dict[Any, Any]:
"""Return the extra attrs of the given device."""
if device not in self.last_results:
return {}
attrs = {}
if manufacturer := device_data.get("manufacturer"):
attrs["manufacturer"] = manufacturer
if interface := device_data.get("intf_description"):
attrs["interface"] = interface
if expires := device_data.get("expires"):
attrs["expires"] = expires
return attrs
mfg = self.last_results[device].get("manufacturer")
if not mfg:
return {}
return {"manufacturer": mfg}
@@ -2,7 +2,6 @@
"domain": "opnsense",
"name": "OPNsense",
"codeowners": ["@HarlemSquirrel", "@Snuffy2"],
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/opnsense",
"integration_type": "hub",
"iot_class": "local_polling",
@@ -1,117 +0,0 @@
{
"config": {
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]",
"already_in_progress": "[%key:common::config_flow::abort::already_in_progress%]",
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
"import_failed_missing_interfaces": "The following OPNsense tracker interfaces were not found: {missing}. Found interfaces were: {found}. Please manually set up a config entry and remove the YAML config",
"invalid_auth": "[%key:common::config_flow::error::invalid_auth%]",
"invalid_url": "URL is invalid or unreachable",
"invalid_version": "Unsupported OPNsense firmware version",
"no_unique_id": "Could not determine a unique identifier for this OPNsense router. Please ensure the API key has sufficient privileges, and your OPNsense instance has physical ports with a MAC address.",
"privilege_missing": "The API key used does not have sufficient privileges. Please check the integration documentation for required permissions",
"ssl_error": "SSL certificate verification failed",
"unknown": "[%key:common::config_flow::error::unknown%]",
"unknown_version": "The OPNsense firmware version is unknown. Please ensure you are running a supported version of OPNsense and the user has the correct permissions."
},
"error": {
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
"invalid_auth": "[%key:common::config_flow::error::invalid_auth%]",
"invalid_interface": "Interface(s) do not exist",
"invalid_url": "URL is invalid or unreachable",
"invalid_version": "Unsupported OPNsense firmware version",
"privilege_missing": "[%key:component::opnsense::config::abort::privilege_missing%]",
"ssl_error": "SSL certificate verification failed",
"unknown": "[%key:common::config_flow::error::unknown%]",
"unknown_version": "The OPNsense firmware version is unknown. Please ensure you are running a supported version of OPNsense and the user has the correct permissions."
},
"step": {
"interfaces": {
"data": {
"tracker_interfaces": "Interface(s) to use for tracking devices"
},
"description": "Select the OPNsense interfaces to use for tracking devices. If no interfaces are selected then all interfaces will be used for tracking."
},
"user": {
"data": {
"api_key": "[%key:common::config_flow::data::api_key%]",
"api_secret": "API secret",
"url": "[%key:common::config_flow::data::url%]",
"verify_ssl": "[%key:common::config_flow::data::verify_ssl%]"
},
"description": "Set required parameters to connect to your router. For more information, please refer to the [integration documentation]({doc_url})"
}
}
},
"exceptions": {
"cannot_connect": {
"message": "Connection failure while connecting to OPNsense API endpoint at {url}"
},
"firmware_too_old": {
"message": "OPNsense firmware at {url} is below the minimum supported version"
},
"invalid_auth": {
"message": "Authentication failure while connecting to OPNsense API endpoint at {url}"
},
"invalid_url": {
"message": "Invalid URL while connecting to OPNsense API endpoint at {url}"
},
"privilege_missing": {
"message": "The API user connecting to {url} does not have sufficient privileges"
},
"ssl_error": {
"message": "Unable to verify SSL certificate while connecting to OPNsense API endpoint at {url}"
},
"timeout_connecting": {
"message": "Timeout while connecting to OPNsense API endpoint at {url}"
},
"tracker_interface_not_found": {
"message": "Configured tracker interface {interface} is not present on the OPNsense router. Known interfaces: {known}"
},
"unknown_firmware": {
"message": "Could not determine the OPNsense firmware version at {url}"
}
},
"issues": {
"import_failed_cannot_connect": {
"description": "Configuring {integration_title} via YAML is deprecated and will be removed in a future release. While importing your configuration, a connection error occurred. Please correct your YAML configuration and restart Home Assistant, or remove the {domain} key from your configuration and configure the integration via the UI.",
"title": "The {integration_title} YAML configuration is being removed"
},
"import_failed_invalid_auth": {
"description": "Configuring {integration_title} via YAML is deprecated and will be removed in a future release. While importing your configuration, an authentication error occurred. Please correct your YAML configuration and restart Home Assistant, or remove the {domain} key from your configuration and configure the integration via the UI.",
"title": "The {integration_title} YAML configuration is being removed"
},
"import_failed_invalid_url": {
"description": "Configuring {integration_title} via YAML is deprecated and will be removed in a future release. While importing your configuration, an error occurred indicating the URL provided is invalid or unreachable. Please correct your YAML configuration and restart Home Assistant, or remove the {domain} key from your configuration and configure the integration via the UI.",
"title": "The {integration_title} YAML configuration is being removed"
},
"import_failed_invalid_version": {
"description": "Configuring {integration_title} via YAML is deprecated and will be removed in a future release. While importing your configuration, an error occurred indicating the OPNsense firmware version is unsupported. Please correct your YAML configuration and restart Home Assistant, or remove the {domain} key from your configuration and configure the integration via the UI.",
"title": "The {integration_title} YAML configuration is being removed"
},
"import_failed_missing_interfaces": {
"description": "The following OPNsense tracker interfaces were not found: {missing}. Found interfaces were: {found}. Please manually set up a config entry and remove the YAML config",
"title": "The {integration_title} YAML import failed: Missing tracker interfaces"
},
"import_failed_no_unique_id": {
"description": "Configuring {integration_title} via YAML is deprecated and will be removed in a future release. While importing your configuration, a unique identifier for the router could not be determined. Please ensure the API key has sufficient privileges, and your OPNsense instance has physical ports with a MAC address.",
"title": "The {integration_title} YAML configuration is being removed"
},
"import_failed_privilege_missing": {
"description": "Configuring {integration_title} via YAML is deprecated and will be removed in a future release. While importing your configuration, an error occurred indicating the API key used does not have sufficient privileges. Please check the integration documentation for required permissions, correct your YAML configuration, and restart Home Assistant, or remove the {domain} key from your configuration and configure the integration via the UI.",
"title": "The {integration_title} YAML configuration is being removed"
},
"import_failed_ssl_error": {
"description": "Configuring {integration_title} via YAML is deprecated and will be removed in a future release. While importing your configuration, an SSL error occurred. Please correct your YAML configuration and restart Home Assistant, or remove the {domain} key from your configuration and configure the integration via the UI.",
"title": "The {integration_title} YAML configuration is being removed"
},
"import_failed_unknown": {
"description": "Configuring {integration_title} via YAML is deprecated and will be removed in a future release. While importing your configuration, an unknown error occurred. Please correct your YAML configuration and restart Home Assistant, or remove the {domain} key from your configuration and configure the integration via the UI.",
"title": "The {integration_title} YAML configuration is being removed"
},
"import_failed_unknown_version": {
"description": "Configuring {integration_title} via YAML is deprecated and will be removed in a future release. While importing your configuration, an error occurred indicating the OPNsense firmware version is unknown. Please correct your YAML configuration and restart Home Assistant, or remove the {domain} key from your configuration and configure the integration via the UI.",
"title": "The {integration_title} YAML configuration is being removed"
}
}
}
@@ -1,21 +0,0 @@
"""Types for OPNsense routers."""
from dataclasses import dataclass
from typing import Any
from aiopnsense import OPNsenseClient
from homeassistant.config_entries import ConfigEntry
@dataclass(slots=True)
class OPNsenseRuntimeData:
"""Runtime data for OPNsense config entries."""
client: OPNsenseClient
tracker_interfaces: list[str]
type DeviceDetails = dict[str, Any]
type DeviceDetailsByMAC = dict[str, DeviceDetails]
type OPNsenseConfigEntry = ConfigEntry[OPNsenseRuntimeData]
@@ -21,5 +21,5 @@
"documentation": "https://www.home-assistant.io/integrations/qingping",
"integration_type": "device",
"iot_class": "local_push",
"requirements": ["qingping-ble==1.1.5"]
"requirements": ["qingping-ble==1.1.4"]
}
@@ -217,9 +217,9 @@
"home": "[%key:common::entity::button::home::name%]",
"insert": "Insert",
"left": "[%key:common::entity::button::left::name%]",
"lights_kbd_down": "Keyboard backlight brightness down",
"lights_kbd_down": "Keyboasrd backlight brightness down",
"lights_kbd_toggle": "Toggle keyboard backlight",
"lights_kbd_up": "Keyboard backlight brightness up",
"lights_kbd_up": "Keyboard backlight brighness up",
"lights_mon_down": "Display brightness down",
"lights_mon_up": "Display brightness up",
"numpad_0": "NumPad 0",
+1 -2
View File
@@ -3,8 +3,7 @@
from pathlib import Path
from typing import Any
from tuya_device_handlers import TUYA_QUIRKS_REGISTRY
from tuya_device_handlers.devices import register_tuya_quirks
from tuya_device_handlers.devices import TUYA_QUIRKS_REGISTRY, register_tuya_quirks
from tuya_sharing import (
CustomerDevice,
Manager,
@@ -1,7 +1,7 @@
{
"domain": "yardian",
"name": "Yardian",
"codeowners": ["@aeon-matrix"],
"codeowners": ["@h3l1o5"],
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/yardian",
"integration_type": "device",
-1
View File
@@ -541,7 +541,6 @@ FLOWS = {
"opentherm_gw",
"openuv",
"openweathermap",
"opnsense",
"opower",
"oralb",
"orvibo",
+1 -1
View File
@@ -5119,7 +5119,7 @@
"opnsense": {
"name": "OPNsense",
"integration_type": "hub",
"config_flow": true,
"config_flow": false,
"iot_class": "local_polling"
},
"opower": {
+2 -2
View File
@@ -30,12 +30,12 @@ certifi>=2021.5.30
ciso8601==2.3.3
cronsim==2.7
cryptography==48.0.0
dbus-fast==5.0.15
dbus-fast==5.0.14
file-read-backwards==2.0.0
fnv-hash-fast==2.0.3
go2rtc-client==0.4.0
ha-ffmpeg==3.2.2
habluetooth==6.7.9
habluetooth==6.7.4
hass-nabucasa==2.2.0
hassil==3.5.0
home-assistant-bluetooth==2.0.0
+3 -3
View File
@@ -797,7 +797,7 @@ datadog==0.52.0
datapoint==0.12.1
# homeassistant.components.bluetooth
dbus-fast==5.0.15
dbus-fast==5.0.14
# homeassistant.components.debugpy
debugpy==1.8.17
@@ -1213,7 +1213,7 @@ ha-xthings-cloud==1.0.5
habiticalib==0.4.7
# homeassistant.components.bluetooth
habluetooth==6.7.9
habluetooth==6.7.4
# homeassistant.components.hanna
hanna-cloud==0.0.7
@@ -2839,7 +2839,7 @@ qbittorrent-api==2026.5.1
qbusmqttapi==1.5.0
# homeassistant.components.qingping
qingping-ble==1.1.5
qingping-ble==1.1.4
# homeassistant.components.qnap
qnapstats==0.4.0
+397 -62
View File
@@ -2,9 +2,14 @@
"""Helper script to split test into n buckets."""
import argparse
from collections.abc import Iterator
from concurrent.futures import ProcessPoolExecutor
from dataclasses import dataclass, field
from contextlib import suppress
from dataclasses import dataclass, field, replace
import hashlib
import json
from math import ceil
from operator import attrgetter, itemgetter
import os
from pathlib import Path
import subprocess
@@ -15,13 +20,21 @@ from typing import Final
# place to subdivide to keep each pytest invocation roughly equal in size.
_FAN_OUT_DIRS: Final = frozenset({"components"})
# Cache file format version; bump on any incompatible schema change so old
# caches are ignored rather than misread.
_CACHE_VERSION: Final = 1
# Fall back from file-level to directory-level pytest collection when
# misses make up more than this fraction of the tree; past that point
# the per-file argv overhead pytest pays outweighs the cost of letting
# it re-walk dirs and re-collect the hits.
_DIR_LEVEL_MISS_RATIO: Final = 0.3
class Bucket:
"""Class to hold bucket."""
def __init__(
self,
):
def __init__(self) -> None:
"""Initialize bucket."""
self.total_tests = 0
self._paths: list[str] = []
@@ -47,43 +60,55 @@ class BucketHolder:
self._buckets: list[Bucket] = [Bucket() for _ in range(bucket_count)]
def split_tests(self, test_folder: TestFolder) -> None:
"""Split tests into buckets."""
"""Place atomic units via best-fit; oversized ones go to the smallest bucket."""
digits = len(str(test_folder.total_tests))
sorted_tests = sorted(
test_folder.get_all_flatten(), reverse=True, key=lambda x: x.total_tests
)
for tests in sorted_tests:
if tests.added_to_bucket:
# Already added to bucket
continue
by_load = attrgetter("total_tests")
units = sorted(self._atomic_units(test_folder), key=itemgetter(0), reverse=True)
for size, items in units:
fits = [
b
for b in self._buckets
if b.total_tests + size <= self._tests_per_bucket
]
bucket = max(fits, key=by_load) if fits else min(self._buckets, key=by_load)
for item in items:
tag = " (same bucket)" if item is not items[0] else ""
print(f"{item.total_tests:>{digits}} tests in {item.path}{tag}")
bucket.add(item)
print(f"{tests.total_tests:>{digits}} tests in {tests.path}")
smallest_bucket = min(self._buckets, key=lambda x: x.total_tests)
is_file = isinstance(tests, TestFile)
if (
smallest_bucket.total_tests + tests.total_tests < self._tests_per_bucket
) or is_file:
smallest_bucket.add(tests)
# Ensure all files from the same folder are in the same bucket
# to ensure that syrupy correctly identifies unused snapshots
if is_file:
for other_test in tests.parent.children.values():
if other_test is tests or isinstance(other_test, TestFolder):
continue
print(
f"{other_test.total_tests:>{digits}}"
f" tests in {other_test.path}"
" (same bucket)"
)
smallest_bucket.add(other_test)
# verify that all tests are added to a bucket
if not test_folder.added_to_bucket:
raise ValueError("Not all tests are added to a bucket")
def create_ouput_file(self) -> None:
def _atomic_units(
self, folder: TestFolder
) -> Iterator[tuple[int, list[TestFolder | TestFile]]]:
"""Yield ``(size, items)`` placement units.
A folder that fits is one unit; otherwise same-dir files form
a unit only when the folder has syrupy snapshots, else each
file stands alone. Sub-folders recurse independently.
"""
if folder.total_tests <= self._tests_per_bucket:
yield folder.total_tests, [folder]
return
sibling_files = [c for c in folder.children.values() if isinstance(c, TestFile)]
if sibling_files:
if _has_snapshots(folder.path):
yield (
sum(f.total_tests for f in sibling_files),
list(sibling_files),
)
else:
for file in sibling_files:
yield file.total_tests, [file]
for child in folder.children.values():
if isinstance(child, TestFolder):
yield from self._atomic_units(child)
def create_output_file(self) -> None:
"""Create output file."""
with Path("pytest_buckets.txt").open("w") as file:
with Path("pytest_buckets.txt").open("w", encoding="utf-8") as file:
for idx, bucket in enumerate(self._buckets):
print(f"Bucket {idx + 1} has {bucket.total_tests} tests")
file.write(bucket.get_paths_line())
@@ -170,6 +195,15 @@ class TestFolder:
return result
def _has_snapshots(folder_path: Path) -> bool:
"""Return True when ``folder_path/snapshots`` holds ``.ambr`` files.
Same-dir tests must share a pytest run so syrupy can spot unused
snapshots; without snapshots that constraint doesn't apply.
"""
return any((folder_path / "snapshots").glob("*.ambr"))
def _collect_batch(paths: list[Path]) -> tuple[str, str, int]:
"""Run pytest --collect-only on a batch of paths."""
result = subprocess.run(
@@ -216,44 +250,339 @@ def _enumerate_batch_paths(path: Path) -> list[Path]:
return paths
def collect_tests(path: Path) -> TestFolder:
"""Collect all tests."""
batch_paths = _enumerate_batch_paths(path)
if not batch_paths:
print(f"No eligible test paths found under {path}")
sys.exit(1)
workers = min(len(batch_paths), os.cpu_count() or 1) or 1
# Round-robin chunking keeps batches roughly balanced when path
# ordering correlates with test size.
batches = [batch_paths[i::workers] for i in range(workers)]
def _hash_file(path: Path) -> str:
"""Return a short content hash for ``path``."""
return hashlib.sha256(path.read_bytes()).hexdigest()[:16]
def _walk_test_tree(root: Path) -> tuple[list[Path], list[Path]]:
"""Walk ``root`` once and return (test files, fixture files).
Fixtures are every non-``test_*.py`` ``.py``: conftests and helpers
like ``common.py`` that drive parametrize imports. Uses ``os.walk``
(~2x faster than ``Path.rglob`` on this tree) and prunes ``.``/``_``
subdirs.
"""
test_files: list[Path] = []
fixtures: list[Path] = []
for dirpath, dirnames, filenames in os.walk(root):
dirnames[:] = [d for d in dirnames if not d.startswith((".", "_"))]
base = Path(dirpath)
for name in filenames:
if not name.endswith(".py"):
continue
if name.startswith("test_"):
test_files.append(base / name)
else:
fixtures.append(base / name)
test_files.sort()
fixtures.sort()
return test_files, fixtures
_PROJECT_ROOT_MARKER: Final = "pyproject.toml"
def _find_ancestor_fixtures(root: Path) -> list[Path]:
"""Return non-``test_*.py`` Python files above ``root``, up to the project root.
Includes conftests and helper modules (eg ``common.py``); subtree
runs need both so shared ancestor helpers like
``tests/components/common.py`` still invalidate descendants.
Stops at the first ancestor containing ``pyproject.toml`` so we
don't read unrelated ``.py`` files outside the repo or trip on
dirs we can't list.
"""
fixtures: list[Path] = []
current = root.resolve().parent
while True:
with suppress(OSError):
fixtures.extend(
entry
for entry in current.glob("*.py")
if not entry.name.startswith("test_")
)
if (current / _PROJECT_ROOT_MARKER).exists():
break
if current == current.parent:
break
current = current.parent
return fixtures
def _build_fixtures_by_dir(
root: Path, descendants: list[Path]
) -> dict[Path, list[Path]]:
"""Bucket descendants plus ancestor fixtures by resolved parent dir."""
by_dir: dict[Path, list[Path]] = {}
for fixture in (*_find_ancestor_fixtures(root), *descendants):
by_dir.setdefault(fixture.parent.resolve(), []).append(fixture)
return by_dir
def _file_fixture_hash(
test_file: Path,
root: Path,
fixtures_by_dir: dict[Path, list[Path]],
blob_cache: dict[Path, bytes] | None = None,
dir_cache: dict[Path, str] | None = None,
) -> str:
"""Hash every ``.py`` fixture on the test file's ancestor path.
Catches conftests and helper modules (``common.py`` etc.) at any
level so parametrize imports from shared helpers invalidate
descendants, while sibling subtrees stay warm. Pass shared
``blob_cache``/``dir_cache`` dicts to memoize across many files.
"""
test_dir = test_file.parent.resolve()
if dir_cache is not None and (cached := dir_cache.get(test_dir)) is not None:
return cached
relevant: list[Path] = []
current = test_dir
while True:
relevant.extend(fixtures_by_dir.get(current, ()))
parent = current.parent
if parent == current:
break
current = parent
relevant.sort()
digest = hashlib.sha256()
for fixture in relevant:
blob = blob_cache.get(fixture) if blob_cache is not None else None
if blob is None:
# relpath keeps the hash machine-stable across ancestor paths.
blob = (
os.path.relpath(fixture, root).encode()
+ b"\0"
+ fixture.read_bytes()
+ b"\0"
)
if blob_cache is not None:
blob_cache[fixture] = blob
digest.update(blob)
result = digest.hexdigest()[:16]
if dir_cache is not None:
dir_cache[test_dir] = result
return result
@dataclass
class _CacheEntry:
"""Cached test count plus its scope hash for a single file."""
hash: str
fixture_hash: str
count: int
@dataclass
class _Cache:
"""Mapping of test file path → cached entry."""
entries: dict[str, _CacheEntry]
@classmethod
def load(cls, path: Path) -> _Cache:
"""Load cache; any drift (missing, bad, version, malformed) returns empty."""
try:
raw = json.loads(path.read_bytes())
except OSError, ValueError:
raw = None
if not (
isinstance(raw, dict)
and raw.get("version") == _CACHE_VERSION
and isinstance(raw.get("files"), dict)
):
return cls(entries={})
entries: dict[str, _CacheEntry] = {}
for key, value in raw["files"].items():
if not isinstance(value, dict):
continue
hash_value = value.get("hash")
fixture_hash = value.get("fixture_hash")
count = value.get("count")
if (
not isinstance(hash_value, str)
or not isinstance(fixture_hash, str)
or not isinstance(count, int)
or count < 0
):
continue
entries[key] = _CacheEntry(
hash=hash_value, fixture_hash=fixture_hash, count=count
)
return cls(entries=entries)
def save(self, path: Path) -> None:
"""Write the cache to ``path``, creating parent dirs as needed."""
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(
json.dumps(
{
"version": _CACHE_VERSION,
"files": {
key: {
"hash": entry.hash,
"fixture_hash": entry.fixture_hash,
"count": entry.count,
}
for key, entry in sorted(self.entries.items())
},
},
indent=2,
ensure_ascii=False,
)
+ "\n",
encoding="utf-8",
)
def _resolve_entries(
test_files: list[Path],
cache: _Cache,
root: Path,
fixtures_by_dir: dict[Path, list[Path]],
) -> tuple[dict[Path, _CacheEntry], list[Path]]:
"""Build an entry for every file; return ``(entries, misses)``.
Hits reuse the stored entry; misses get fresh hashes with a
count=0 placeholder for the caller to fill in after pytest runs.
Shared caches memoize fixture blobs and per-dir hashes so each
fixture file is read once and each unique dir hashed once.
"""
blob_cache: dict[Path, bytes] = {}
dir_cache: dict[Path, str] = {}
entries: dict[Path, _CacheEntry] = {}
misses: list[Path] = []
for file in test_files:
file_hash = _hash_file(file)
fixture_hash = _file_fixture_hash(
file, root, fixtures_by_dir, blob_cache, dir_cache
)
cached = cache.entries.get(str(file.relative_to(root)))
if (
cached is not None
and cached.hash == file_hash
and cached.fixture_hash == fixture_hash
):
entries[file] = cached
else:
entries[file] = _CacheEntry(
hash=file_hash, fixture_hash=fixture_hash, count=0
)
misses.append(file)
return entries, misses
def _run_collect_batches(paths: list[Path]) -> list[tuple[str, str, int]]:
"""Run pytest --collect-only across ``paths`` using a process pool."""
workers = min(len(paths), os.cpu_count() or 1) or 1
batches = [paths[i::workers] for i in range(workers)]
if workers == 1:
results = [_collect_batch(batches[0])]
else:
with ProcessPoolExecutor(max_workers=workers) as executor:
results = list(executor.map(_collect_batch, batches))
return [_collect_batch(batches[0])]
with ProcessPoolExecutor(max_workers=workers) as executor:
return list(executor.map(_collect_batch, batches))
folder = TestFolder(path)
for stdout, stderr, returncode in results:
def _parse_collect_output(stdout: str) -> dict[Path, int]:
"""Parse ``pytest --collect-only -qq`` output into ``{path: count}``."""
counts: dict[Path, int] = {}
for line in stdout.splitlines():
if not line.strip():
continue
file_path, _, total_tests = line.partition(": ")
if not file_path or not total_tests:
raise ValueError(f"Unexpected line: {line}")
counts[Path(file_path)] = int(total_tests)
return counts
def _run_pytest_collect(paths: list[Path]) -> dict[Path, int]:
"""Run pytest --collect-only across ``paths`` and parse the output."""
counts: dict[Path, int] = {}
for stdout, stderr, returncode in _run_collect_batches(paths):
if returncode != 0:
print("Failed to collect tests:")
print(stderr)
print(stdout)
sys.exit(1)
for line in stdout.splitlines():
if not line.strip():
continue
file_path, _, total_tests = line.partition(": ")
if not file_path or not total_tests:
print(f"Unexpected line: {line}")
sys.exit(1)
# Surface stderr from successful runs too; pytest puts deprecation
# and import warnings here that would otherwise vanish.
if stderr.strip():
sys.stderr.write(stderr)
try:
counts.update(_parse_collect_output(stdout))
except ValueError as err:
print(err)
sys.exit(1)
return counts
file = TestFile(int(total_tests), Path(file_path))
folder.add_test_file(file)
def _build_folder(root: Path, counts: dict[Path, int]) -> TestFolder:
"""Build a ``TestFolder`` from ``{path: count}``; zero-count files are skipped."""
folder = TestFolder(root)
for file_path, count in counts.items():
if count:
folder.add_test_file(TestFile(count, file_path))
return folder
def _exit_if_empty(paths: list[Path], root: Path) -> None:
"""Exit with a clear message when no eligible test paths were found."""
if not paths:
print(f"No eligible test paths found under {root}")
sys.exit(1)
def _collect_tests_uncached(path: Path) -> TestFolder:
"""Hand pytest the top-level dirs; the pre-cache path when ``--cache`` is unset."""
batch_paths = _enumerate_batch_paths(path)
_exit_if_empty(batch_paths, path)
return _build_folder(path, _run_pytest_collect(batch_paths))
def _collect_tests_cached(path: Path, cache_path: Path) -> TestFolder:
"""Collect tests using an on-disk cache for incremental updates."""
all_test_files, fixtures = _walk_test_tree(path)
_exit_if_empty(all_test_files, path)
fixtures_by_dir = _build_fixtures_by_dir(path, fixtures)
cache = _Cache.load(cache_path)
entries, misses = _resolve_entries(all_test_files, cache, path, fixtures_by_dir)
hits = len(all_test_files) - len(misses)
print(f"Cache: {hits} hits / {len(misses)} misses / {len(all_test_files)} total")
if misses:
# Past _DIR_LEVEL_MISS_RATIO the per-file argv overhead beats
# re-walking the dirs, so fall back to dir-level collection.
if not hits or len(misses) > len(all_test_files) * _DIR_LEVEL_MISS_RATIO:
collect_paths = _enumerate_batch_paths(path)
else:
collect_paths = misses
new_counts = _run_pytest_collect(collect_paths)
# Files pytest returned no count for stay at 0; cached so they
# aren't re-collected next run.
for file in misses:
entries[file] = replace(entries[file], count=new_counts.get(file, 0))
_Cache(entries={str(f.relative_to(path)): e for f, e in entries.items()}).save(
cache_path
)
return _build_folder(path, {f: e.count for f, e in entries.items()})
def collect_tests(path: Path, cache_path: Path | None = None) -> TestFolder:
"""Collect all tests, using an on-disk cache when ``cache_path`` is set."""
if path.is_file():
# TestFolder requires a directory root; a single file has no
# parent to anchor against and CI never invokes us this way.
print(f"Expected a directory, got file: {path}")
sys.exit(1)
if cache_path is None:
return _collect_tests_uncached(path)
return _collect_tests_cached(path, cache_path)
def main() -> None:
"""Execute script."""
parser = argparse.ArgumentParser(description="Split tests into n buckets.")
@@ -276,11 +605,17 @@ def main() -> None:
help="Path to the test files to split into buckets",
type=Path,
)
parser.add_argument(
"--cache",
help="Path to a JSON file used to cache per-file test counts",
type=Path,
default=None,
)
arguments = parser.parse_args()
print("Collecting tests...")
tests = collect_tests(arguments.path)
tests = collect_tests(arguments.path, arguments.cache)
tests_per_bucket = ceil(tests.total_tests / arguments.bucket_count)
bucket_holder = BucketHolder(tests_per_bucket, arguments.bucket_count)
@@ -290,7 +625,7 @@ def main() -> None:
print(f"Total tests: {tests.total_tests}")
print(f"Estimated tests per bucket: {tests_per_bucket}")
bucket_holder.create_ouput_file()
bucket_holder.create_output_file()
if __name__ == "__main__":
-3
View File
@@ -2,7 +2,6 @@
import asyncio
from datetime import timedelta
import sys
import time
from typing import Any
from unittest.mock import AsyncMock, MagicMock, Mock, patch
@@ -100,7 +99,6 @@ async def test_setup_and_stop(
assert len(mock_bleak_scanner_start.mock_calls) == 1
@pytest.mark.skipif(sys.platform != "linux", reason="Requires Linux BlueZ scanner")
@pytest.mark.parametrize(
"options",
[{CONF_MODE: "passive"}, {CONF_PASSIVE: True}],
@@ -163,7 +161,6 @@ async def test_setup_and_stop_passive(
}
@pytest.mark.skipif(sys.platform != "linux", reason="Requires Linux BlueZ scanner")
async def test_setup_and_stop_old_bluez(
hass: HomeAssistant,
mock_bleak_scanner_start: MagicMock,
+1 -63
View File
@@ -28,7 +28,7 @@ from homeassistant.const import (
)
from homeassistant.core import HomeAssistant, State, callback
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import discovery, issue_registry as ir
from homeassistant.helpers import discovery
from homeassistant.helpers.discovery import DiscoveryInfoType
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.json import JSONEncoder
@@ -821,65 +821,3 @@ async def test_modern_platform_setup(hass: HomeAssistant) -> None:
"in_zones": [],
"source_type": SourceType.ROUTER,
}
async def test_unsupported_legacy_config_creates_issue(
hass: HomeAssistant,
issue_registry: ir.IssueRegistry,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Test unsupported legacy config creates issue."""
integration_domain = "test"
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
hass.async_create_task(
discovery.async_load_platform(
hass, "device_tracker", integration_domain, {}, config
)
)
return True
mock_integration(
hass,
MockModule(integration_domain, async_setup=async_setup),
)
mock_platform(
hass,
f"{integration_domain}.device_tracker",
MockPlatform(),
)
await async_setup_component(hass, "homeassistant", {})
await async_setup_component(
hass,
device_tracker.DOMAIN,
{device_tracker.DOMAIN: {"platform": integration_domain, "something": "value"}},
)
await async_setup_component(hass, integration_domain, {})
await hass.async_block_till_done()
await hass.async_start()
await hass.async_block_till_done()
assert len(hass.states.async_all(device_tracker.DOMAIN)) == 0
assert (
f"The {integration_domain} platform for the {device_tracker.DOMAIN} integration does not support platform"
" setup, please remove it from your config" in caplog.text
)
issue = issue_registry.async_get_issue(
"homeassistant",
f"platform_integration_no_support_{device_tracker.DOMAIN}_{integration_domain}",
)
assert issue
assert issue.issue_domain == integration_domain
assert issue.learn_more_url is None
assert issue.translation_key == "platform_setup_not_supported"
assert issue.severity == ir.IssueSeverity.ERROR
assert issue.translation_placeholders == {
"platform_domain": device_tracker.DOMAIN,
"integration_domain": integration_domain,
"platform_key": f"platform: {integration_domain}",
"yaml_example": f"```yaml\n{device_tracker.DOMAIN}:\n - platform: {integration_domain}\n```",
}
@@ -18,7 +18,7 @@
}),
'ulid-conversation': dict({
'data': dict({
'chat_model': 'models/gemini-3.1-flash-lite',
'chat_model': 'models/gemini-2.5-flash',
'dangerous_block_threshold': 'BLOCK_MEDIUM_AND_ABOVE',
'harassment_block_threshold': 'BLOCK_MEDIUM_AND_ABOVE',
'hate_block_threshold': 'BLOCK_MEDIUM_AND_ABOVE',
@@ -21,7 +21,7 @@
'labels': set({
}),
'manufacturer': 'Google',
'model': 'gemini-3.1-flash-lite',
'model': 'gemini-2.5-flash',
'model_id': None,
'name': 'Google AI Conversation',
'name_by_user': None,
@@ -50,7 +50,7 @@
'labels': set({
}),
'manufacturer': 'Google',
'model': 'gemini-3.1-flash-lite',
'model': 'gemini-2.5-flash',
'model_id': None,
'name': 'Google AI STT',
'name_by_user': None,
@@ -108,7 +108,7 @@
'labels': set({
}),
'manufacturer': 'Google',
'model': 'gemini-3.1-flash-lite',
'model': 'gemini-2.5-flash',
'model_id': None,
'name': 'Google AI Task',
'name_by_user': None,
@@ -51,7 +51,7 @@ def get_models_pager():
model_25_flash = Mock(
supported_actions=["generateContent"],
)
model_25_flash.name = "models/gemini-3.1-flash-lite"
model_25_flash.name = "models/gemini-2.5-flash"
model_20_flash = Mock(
supported_actions=["generateContent"],
@@ -21,7 +21,7 @@ from . import API_ERROR_500, CLIENT_ERROR_BAD_REQUEST
from tests.common import MockConfigEntry
TEST_CHAT_MODEL = "models/gemini-3.1-flash-lite"
TEST_CHAT_MODEL = "models/gemini-2.5-flash"
TEST_PROMPT = "Please transcribe the audio."
@@ -1,57 +0,0 @@
# serializer version: 1
# name: test_states
set({
StateSnapshot({
'attributes': ReadOnlyDict({
'editable': True,
'friendly_name': 'test home',
'icon': 'mdi:home',
'latitude': 32.87336,
'longitude': -117.22743,
'passive': False,
'persons': list([
]),
'radius': 100,
}),
'context': <ANY>,
'entity_id': 'zone.home',
'last_changed': <ANY>,
'last_reported': <ANY>,
'last_updated': <ANY>,
'state': '0',
}),
StateSnapshot({
'attributes': ReadOnlyDict({
'friendly_name': 'Demo scanner',
'in_zones': list([
'zone.home',
]),
'source_type': <SourceType.ROUTER: 'router'>,
}),
'context': <ANY>,
'entity_id': 'device_tracker.demo_scanner',
'last_changed': <ANY>,
'last_reported': <ANY>,
'last_updated': <ANY>,
'state': 'home',
}),
StateSnapshot({
'attributes': ReadOnlyDict({
'friendly_name': 'Demo tracker',
'gps_accuracy': 10,
'in_zones': list([
'zone.home',
]),
'latitude': 32.87336,
'longitude': -117.22743,
'source_type': <SourceType.GPS: 'gps'>,
}),
'context': <ANY>,
'entity_id': 'device_tracker.demo_tracker',
'last_changed': <ANY>,
'last_reported': <ANY>,
'last_updated': <ANY>,
'state': 'home',
}),
})
# ---
@@ -1,146 +0,0 @@
"""The tests for the kitchen_sink device_tracker platform."""
from collections.abc import Generator
from unittest.mock import patch
import pytest
from syrupy.assertion import SnapshotAssertion
from homeassistant.components.device_tracker import ATTR_SOURCE_TYPE, SourceType
from homeassistant.components.kitchen_sink import DOMAIN
from homeassistant.components.kitchen_sink.services import (
ATTR_ACCURACY,
ATTR_CONNECTED,
SERVICE_SET_SCANNER_CONNECTED,
SERVICE_SET_TRACKER_LOCATION,
)
from homeassistant.const import (
ATTR_ENTITY_ID,
ATTR_GPS_ACCURACY,
ATTR_LATITUDE,
ATTR_LONGITUDE,
STATE_HOME,
STATE_NOT_HOME,
Platform,
)
from homeassistant.core import HomeAssistant
from homeassistant.setup import async_setup_component
TRACKER_ENTITY_ID = "device_tracker.demo_tracker"
SCANNER_ENTITY_ID = "device_tracker.demo_scanner"
@pytest.fixture
def device_tracker_only() -> Generator[None]:
"""Enable only the device_tracker platform."""
with patch(
"homeassistant.components.kitchen_sink.COMPONENTS_WITH_DEMO_PLATFORM",
[Platform.DEVICE_TRACKER],
):
yield
@pytest.fixture(autouse=True)
async def setup_comp(hass: HomeAssistant, device_tracker_only: None) -> None:
"""Set up demo component."""
hass.config.latitude = 32.87336
hass.config.longitude = -117.22743
assert await async_setup_component(hass, DOMAIN, {DOMAIN: {}})
await hass.async_block_till_done()
async def test_states(hass: HomeAssistant, snapshot: SnapshotAssertion) -> None:
"""Test the expected device_tracker entities are added."""
states = hass.states.async_all()
assert set(states) == snapshot
async def test_set_tracker_location(hass: HomeAssistant) -> None:
"""Test the set_tracker_location service updates tracker attributes."""
state = hass.states.get(TRACKER_ENTITY_ID)
assert state is not None
assert state.attributes[ATTR_SOURCE_TYPE] == SourceType.GPS
await hass.services.async_call(
DOMAIN,
SERVICE_SET_TRACKER_LOCATION,
{
ATTR_ENTITY_ID: TRACKER_ENTITY_ID,
ATTR_LATITUDE: 12.34,
ATTR_LONGITUDE: 56.78,
ATTR_ACCURACY: 42,
},
blocking=True,
)
state = hass.states.get(TRACKER_ENTITY_ID)
assert state.attributes[ATTR_LATITUDE] == 12.34
assert state.attributes[ATTR_LONGITUDE] == 56.78
assert state.attributes[ATTR_GPS_ACCURACY] == 42
assert state.state == STATE_NOT_HOME
async def test_set_scanner_connected(hass: HomeAssistant) -> None:
"""Test the set_scanner_connected service updates scanner state."""
state = hass.states.get(SCANNER_ENTITY_ID)
assert state is not None
assert state.state == STATE_HOME
assert state.attributes[ATTR_SOURCE_TYPE] == SourceType.ROUTER
await hass.services.async_call(
DOMAIN,
SERVICE_SET_SCANNER_CONNECTED,
{ATTR_ENTITY_ID: SCANNER_ENTITY_ID, ATTR_CONNECTED: False},
blocking=True,
)
state = hass.states.get(SCANNER_ENTITY_ID)
assert state.state == STATE_NOT_HOME
await hass.services.async_call(
DOMAIN,
SERVICE_SET_SCANNER_CONNECTED,
{ATTR_ENTITY_ID: SCANNER_ENTITY_ID, ATTR_CONNECTED: True},
blocking=True,
)
state = hass.states.get(SCANNER_ENTITY_ID)
assert state.state == STATE_HOME
async def test_set_tracker_location_on_scanner_raises(hass: HomeAssistant) -> None:
"""Calling set_tracker_location on the scanner surfaces an AttributeError.
The service is registered for the device_tracker domain and dispatches by
method name, so targeting the scanner (which has no async_set_tracker_location)
bubbles up the missing-attribute error from the entity.
"""
with pytest.raises(
AttributeError,
match="'DemoScanner' object has no attribute 'async_set_tracker_location'",
):
await hass.services.async_call(
DOMAIN,
SERVICE_SET_TRACKER_LOCATION,
{
ATTR_ENTITY_ID: SCANNER_ENTITY_ID,
ATTR_LATITUDE: 12.34,
ATTR_LONGITUDE: 56.78,
ATTR_ACCURACY: 42,
},
blocking=True,
)
async def test_set_scanner_connected_on_tracker_raises(hass: HomeAssistant) -> None:
"""Calling set_scanner_connected on the tracker surfaces an AttributeError."""
with pytest.raises(
AttributeError,
match="'DemoTracker' object has no attribute 'async_set_scanner_connected'",
):
await hass.services.async_call(
DOMAIN,
SERVICE_SET_SCANNER_CONNECTED,
{ATTR_ENTITY_ID: TRACKER_ENTITY_ID, ATTR_CONNECTED: False},
blocking=True,
)
+1 -1
View File
@@ -369,7 +369,7 @@ async def test_service(
await hass.services.async_call(
DOMAIN,
"test_service_1",
{"field_1": 1, "field_2": "auto", "field_3": 1, "field_4": "forward"},
{"field_1": 1, "field_2": "auto", "field_3": 1, "field_4": "forwards"},
blocking=True,
)
-59
View File
@@ -1,59 +0,0 @@
"""OPNsense session fixtures."""
from collections.abc import Generator
from unittest.mock import AsyncMock, patch
import pytest
from homeassistant.components.opnsense.const import DOMAIN
from homeassistant.core import HomeAssistant
from .const import ARP, CONFIG_DATA, INTERFACES
from tests.common import MockConfigEntry
CONF_OPNSENSE_CLIENT = "opnsense_client"
@pytest.fixture
def mock_config_entry(
hass: HomeAssistant, mock_opnsense_client: AsyncMock
) -> MockConfigEntry:
"""Return the default mocked config entry."""
mock_config_entry = MockConfigEntry(
domain=DOMAIN,
data=CONFIG_DATA,
unique_id="mocked_unique_id",
)
mock_config_entry.add_to_hass(hass)
return mock_config_entry
@pytest.fixture
def mock_opnsense_client() -> Generator[AsyncMock]:
"""Override OPNsenseClient in both config_flow and component."""
with (
patch(
"homeassistant.components.opnsense.config_flow.OPNsenseClient",
autospec=True,
) as mock_client,
patch(
"homeassistant.components.opnsense.OPNsenseClient",
new=mock_client,
),
):
client = mock_client.return_value
client.get_host_firmware_version.return_value = "25.7.8"
client.get_arp_table.return_value = ARP
client.get_interfaces.return_value = INTERFACES
client.get_device_unique_id.return_value = "mocked_unique_id"
yield client
@pytest.fixture
def mock_setup_entry() -> Generator[None]:
"""Mock setting up a config entry."""
with patch(
"homeassistant.components.opnsense.async_setup_entry", return_value=True
):
yield
-42
View File
@@ -1,42 +0,0 @@
"""Constants for opnsense tests."""
from homeassistant.components.opnsense.const import (
CONF_API_SECRET,
CONF_TRACKER_INTERFACES,
)
from homeassistant.const import CONF_API_KEY, CONF_URL, CONF_VERIFY_SSL
TITLE = "OPNsense"
CONFIG_DATA = {
CONF_URL: "http://router.lan/api",
CONF_API_KEY: "key",
CONF_API_SECRET: "secret",
CONF_VERIFY_SSL: False,
}
CONFIG_DATA_IMPORT = {
CONF_URL: "http://router.lan/api",
CONF_API_KEY: "key",
CONF_API_SECRET: "secret",
CONF_VERIFY_SSL: False,
CONF_TRACKER_INTERFACES: ["LAN"],
}
ARP = [
{
"hostname": "",
"intf": "igb1",
"intf_description": "LAN",
"ip": "192.168.0.123",
"mac": "ff:ff:ff:ff:ff:ff",
"manufacturer": "",
},
{
"hostname": "Desktop",
"intf": "igb1",
"intf_description": "LAN",
"ip": "192.168.0.167",
"mac": "ff:ff:ff:ff:ff:fe",
"manufacturer": "OEM",
},
]
INTERFACES = {"igb0": {"name": "WAN"}, "igb1": {"name": "LAN"}}
@@ -1,325 +0,0 @@
"""Tests for the OPNsense config flow."""
from unittest.mock import AsyncMock
from aiopnsense import (
OPNsenseBelowMinFirmware,
OPNsenseConnectionError,
OPNsenseInvalidAuth,
OPNsenseInvalidURL,
OPNsensePrivilegeMissing,
OPNsenseTimeoutError,
)
import pytest
from homeassistant.components.opnsense import OPNsenseSSLError, OPNsenseUnknownFirmware
from homeassistant.components.opnsense.const import CONF_TRACKER_INTERFACES, DOMAIN
from homeassistant.config_entries import SOURCE_IMPORT, SOURCE_USER
from homeassistant.const import CONF_URL, CONF_VERIFY_SSL
from homeassistant.core import DOMAIN as HOMEASSISTANT_DOMAIN, HomeAssistant
from homeassistant.data_entry_flow import FlowResultType
from homeassistant.helpers import issue_registry as ir
from .const import CONFIG_DATA, CONFIG_DATA_IMPORT
# Constants for test values
TEST_URL = "http://router.lan/api"
@pytest.mark.usefixtures("mock_setup_entry")
async def test_user(hass: HomeAssistant, mock_opnsense_client: AsyncMock) -> None:
"""Test user config."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result.get("type") is FlowResultType.FORM
assert result.get("step_id") == "user"
# Submit user step, should go to interfaces step
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
user_input=CONFIG_DATA,
)
assert result.get("type") is FlowResultType.FORM
assert result.get("step_id") == "interfaces"
# Submit interfaces step (simulate user selecting all interfaces)
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
user_input={CONF_TRACKER_INTERFACES: []},
)
assert result.get("type") is FlowResultType.CREATE_ENTRY
assert result.get("title") == CONFIG_DATA[CONF_URL]
assert result.get("data") == CONFIG_DATA
assert result["result"].unique_id == "mocked_unique_id"
@pytest.mark.usefixtures("mock_setup_entry")
@pytest.mark.parametrize(
("exc", "expected"),
[
(OPNsenseInvalidAuth, "invalid_auth"),
(OPNsensePrivilegeMissing, "privilege_missing"),
(OPNsenseInvalidURL, "invalid_url"),
(OPNsenseSSLError, "ssl_error"),
(OPNsenseConnectionError, "cannot_connect"),
(OPNsenseTimeoutError, "cannot_connect"),
(OPNsenseUnknownFirmware, "unknown_version"),
(OPNsenseBelowMinFirmware, "invalid_version"),
],
)
async def test_user_exceptions(
hass: HomeAssistant,
mock_opnsense_client: AsyncMock,
exc: type[Exception],
expected: str,
) -> None:
"""Test all exception branches in async_step_user."""
mock_opnsense_client.validate.side_effect = exc
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input=CONFIG_DATA
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
assert result["errors"] == {"base": expected}
mock_opnsense_client.validate.side_effect = None
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input=CONFIG_DATA
)
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
user_input={},
)
assert result["type"] is FlowResultType.CREATE_ENTRY
@pytest.mark.usefixtures("mock_setup_entry", "mock_config_entry")
async def test_user_unique_id_already_configured(
hass: HomeAssistant, mock_opnsense_client: AsyncMock
) -> None:
"""Test user flow aborts when unique ID is already configured."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
assert result.get("type") is FlowResultType.FORM
assert result.get("step_id") == "user"
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input=CONFIG_DATA
)
assert result.get("type") is FlowResultType.ABORT
assert result.get("reason") == "already_configured"
@pytest.mark.usefixtures("mock_setup_entry")
async def test_user_no_unique_id_aborts(
hass: HomeAssistant, mock_opnsense_client: AsyncMock
) -> None:
"""Test that the user flow aborts if the router has no unique id."""
mock_opnsense_client.get_device_unique_id.return_value = None
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_USER},
)
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
user_input={**CONFIG_DATA, CONF_URL: TEST_URL},
)
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "no_unique_id"
@pytest.mark.usefixtures("mock_setup_entry")
async def test_on_unknown_error(
hass: HomeAssistant, mock_opnsense_client: AsyncMock
) -> None:
"""Test when we have unknown errors."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_USER},
)
assert result.get("type") is FlowResultType.FORM
assert result.get("step_id") == "user"
mock_opnsense_client.validate.side_effect = TypeError
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
user_input=CONFIG_DATA,
)
assert result.get("type") is FlowResultType.FORM
assert result.get("errors") == {"base": "unknown"}
mock_opnsense_client.validate.side_effect = None
# Submit user step, should go to interfaces step
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
user_input=CONFIG_DATA,
)
assert result.get("type") is FlowResultType.FORM
assert result.get("step_id") == "interfaces"
# Submit interfaces step
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
user_input={CONF_TRACKER_INTERFACES: []},
)
assert result.get("type") is FlowResultType.CREATE_ENTRY
@pytest.mark.usefixtures("mock_setup_entry")
async def test_interfaces_step_with_tracker_interfaces(
hass: HomeAssistant, mock_opnsense_client: AsyncMock
) -> None:
"""Test interfaces step with tracker_interfaces in user_input (covering the missing branch)."""
# Patch the client to return interfaces
mock_opnsense_client.return_value.get_device_unique_id.return_value = (
"unique_id_789"
)
mock_opnsense_client.return_value.get_interfaces.return_value = {
"LAN": {"name": "LAN"},
"WAN": {"name": "WAN"},
}
# Go through user step
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
user_input={**CONFIG_DATA, CONF_VERIFY_SSL: True},
)
# Now submit interfaces step with tracker_interfaces
result = await hass.config_entries.flow.async_configure(
result["flow_id"],
user_input={CONF_TRACKER_INTERFACES: ["LAN", "WAN"]},
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["data"][CONF_TRACKER_INTERFACES] == ["LAN", "WAN"]
@pytest.mark.usefixtures("mock_setup_entry")
async def test_import(hass: HomeAssistant, mock_opnsense_client: AsyncMock) -> None:
"""Test import step."""
mock_opnsense_client.return_value.get_device_unique_id.return_value = (
"unique_id_123"
)
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_IMPORT},
data=CONFIG_DATA_IMPORT,
)
assert result.get("type") is FlowResultType.CREATE_ENTRY
assert result.get("title") == CONFIG_DATA_IMPORT[CONF_URL]
@pytest.mark.usefixtures(
"mock_opnsense_client", "mock_setup_entry", "mock_config_entry"
)
async def test_import_unique_id_already_configured(
hass: HomeAssistant, issue_registry: ir.IssueRegistry
) -> None:
"""Test import step when unique ID is already configured."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_IMPORT},
data=CONFIG_DATA_IMPORT,
)
assert result.get("type") is FlowResultType.ABORT
assert result.get("reason") == "already_configured"
# The deprecation issue must still be created so the YAML block gets removed
issue = issue_registry.async_get_issue(
HOMEASSISTANT_DOMAIN, f"deprecated_yaml_{DOMAIN}"
)
assert issue is not None
assert issue.translation_key == "deprecated_yaml"
@pytest.mark.usefixtures("mock_setup_entry")
async def test_import_no_unique_id_aborts(
hass: HomeAssistant,
mock_opnsense_client: AsyncMock,
issue_registry: ir.IssueRegistry,
) -> None:
"""Test that the import flow aborts and raises a repair if no unique id."""
mock_opnsense_client.get_device_unique_id.return_value = None
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_IMPORT},
data=CONFIG_DATA_IMPORT,
)
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "no_unique_id"
assert issue_registry.async_get_issue(DOMAIN, "import_failed_no_unique_id")
@pytest.mark.usefixtures("mock_setup_entry")
@pytest.mark.parametrize(
("exc", "reason"),
[
(OPNsenseInvalidURL, "invalid_url"),
(OPNsenseInvalidAuth, "invalid_auth"),
(OPNsensePrivilegeMissing, "privilege_missing"),
(OPNsenseSSLError, "ssl_error"),
(OPNsenseConnectionError, "cannot_connect"),
(OPNsenseTimeoutError, "cannot_connect"),
(OPNsenseUnknownFirmware, "unknown_version"),
(OPNsenseBelowMinFirmware, "invalid_version"),
(Exception, "unknown"),
],
)
async def test_import_exceptions(
hass: HomeAssistant,
mock_opnsense_client: AsyncMock,
issue_registry: ir.IssueRegistry,
exc: type[Exception],
reason: str,
) -> None:
"""Test all exception branches in async_step_import."""
mock_opnsense_client.validate.side_effect = exc
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_IMPORT},
data=CONFIG_DATA_IMPORT,
)
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == reason
assert issue_registry.async_get_issue(DOMAIN, f"import_failed_{reason}")
@pytest.mark.usefixtures("mock_opnsense_client", "mock_setup_entry")
async def test_import_empty_tracker_interfaces(hass: HomeAssistant) -> None:
"""Test import with empty CONF_TRACKER_INTERFACES (should pop the key)."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_IMPORT},
data={**CONFIG_DATA_IMPORT, CONF_TRACKER_INTERFACES: []},
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert CONF_TRACKER_INTERFACES not in result["data"]
@pytest.mark.usefixtures("mock_setup_entry")
async def test_import_missing_interfaces(
hass: HomeAssistant,
mock_opnsense_client: AsyncMock,
issue_registry: ir.IssueRegistry,
) -> None:
"""Test import with missing tracker interfaces (should create issue and abort)."""
mock_opnsense_client.get_interfaces.return_value = {"LAN": {"name": "LAN"}}
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_IMPORT},
data={**CONFIG_DATA_IMPORT, CONF_TRACKER_INTERFACES: ["MISSING"]},
)
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "import_failed_missing_interfaces"
assert issue_registry.async_get_issue(DOMAIN, "import_failed_missing_interfaces")
+53 -152
View File
@@ -1,169 +1,70 @@
"""The tests for the opnsense device tracker platform."""
from datetime import timedelta
from unittest import mock
from aiopnsense import OPNsenseConnectionError
from freezegun.api import FrozenDateTimeFactory
import pytest
from homeassistant.components import device_tracker
from homeassistant.components.opnsense import OPNsenseRuntimeData
from homeassistant.components.opnsense.const import DOMAIN
from homeassistant.const import STATE_UNAVAILABLE
from homeassistant.components import opnsense
from homeassistant.components.device_tracker import legacy
from homeassistant.components.opnsense import CONF_API_SECRET, DOMAIN
from homeassistant.const import CONF_API_KEY, CONF_URL, CONF_VERIFY_SSL
from homeassistant.core import HomeAssistant
from homeassistant.helpers import entity_registry as er
from tests.common import MockConfigEntry, async_fire_time_changed
from homeassistant.setup import async_setup_component
@pytest.mark.usefixtures("mock_opnsense_client")
async def test_device_tracker_setup(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
entity_registry: er.EntityRegistry,
@pytest.fixture(name="mocked_opnsense")
def mocked_opnsense():
"""Mock for aiopnsense.OPNsenseClient."""
with mock.patch.object(opnsense, "OPNsenseClient") as mocked_opn:
yield mocked_opn
async def test_get_scanner(
hass: HomeAssistant, mocked_opnsense, mock_device_tracker_conf: list[legacy.Device]
) -> None:
"""Test device tracker platform setup."""
# Setup the integration
assert await hass.config_entries.async_setup(mock_config_entry.entry_id)
await hass.async_block_till_done()
# Check that device tracker entities are created
device_tracker_entities = er.async_entries_for_config_entry(
entity_registry, mock_config_entry.entry_id
)
device_tracker_entities = [
entity
for entity in device_tracker_entities
if entity.domain == device_tracker.DOMAIN
"""Test creating an opnsense scanner."""
opnsense_client = mock.AsyncMock()
mocked_opnsense.return_value = opnsense_client
opnsense_client.get_arp_table.return_value = [
{
"hostname": "",
"intf": "igb1",
"intf_description": "LAN",
"ip": "192.168.0.123",
"mac": "ff:ff:ff:ff:ff:ff",
"manufacturer": "",
},
{
"hostname": "Desktop",
"intf": "igb1",
"intf_description": "LAN",
"ip": "192.168.0.167",
"mac": "ff:ff:ff:ff:ff:fe",
"manufacturer": "OEM",
},
]
# Should have 2 devices from ARP table
assert len(device_tracker_entities) == 2
# Check the unique IDs are correct
entity_unique_ids = {entity.unique_id for entity in device_tracker_entities}
assert "ff:ff:ff:ff:ff:ff" in entity_unique_ids
assert "ff:ff:ff:ff:ff:fe" in entity_unique_ids
@pytest.mark.usefixtures("mock_opnsense_client")
async def test_device_tracker_states(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
entity_registry: er.EntityRegistry,
) -> None:
"""Test device tracker entity states and attributes."""
# Setup the integration
assert await hass.config_entries.async_setup(mock_config_entry.entry_id)
await hass.async_block_till_done()
device_tracker_entities = er.async_entries_for_config_entry(
entity_registry, mock_config_entry.entry_id
)
device_tracker_entities = [
entity
for entity in device_tracker_entities
if entity.domain == device_tracker.DOMAIN
]
entity_ids_by_unique_id = {
entity.unique_id: entity.entity_id for entity in device_tracker_entities
opnsense_client.get_interfaces.return_value = {
"wan": {"name": "WAN"},
"lan": {"name": "LAN"},
}
# Enable entities (device trackers are disabled by default)
entity_registry.async_update_entity(
entity_ids_by_unique_id["ff:ff:ff:ff:ff:ff"], disabled_by=None
)
entity_registry.async_update_entity(
entity_ids_by_unique_id["ff:ff:ff:ff:ff:fe"], disabled_by=None
)
# Reload the config entry to activate the enabled entities
await hass.config_entries.async_reload(mock_config_entry.entry_id)
await hass.async_block_till_done()
# Test first device (no hostname)
entity_id_1 = entity_ids_by_unique_id["ff:ff:ff:ff:ff:ff"]
state_1 = hass.states.get(entity_id_1)
assert state_1 is not None
assert state_1.state == "home" # Should be connected since it's in ARP table
assert state_1.attributes.get("ip") == "192.168.0.123"
assert state_1.attributes.get("mac") == "ff:ff:ff:ff:ff:ff"
assert state_1.attributes.get("interface") == "LAN"
# Test second device (with hostname and manufacturer)
entity_id_2 = entity_ids_by_unique_id["ff:ff:ff:ff:ff:fe"]
state_2 = hass.states.get(entity_id_2)
assert state_2 is not None
assert state_2.state == "home" # Should be connected since it's in ARP table
assert state_2.attributes.get("ip") == "192.168.0.167"
assert state_2.attributes.get("mac") == "ff:ff:ff:ff:ff:fe"
assert state_2.attributes.get("interface") == "LAN"
assert state_2.attributes.get("manufacturer") == "OEM"
async def test_device_tracker_with_interfaces_filter(
hass: HomeAssistant,
mock_opnsense_client: mock.AsyncMock,
entity_registry: er.EntityRegistry,
) -> None:
"""Test device tracker with interface filtering."""
# Create config entry with interface filtering
mock_config_entry = MockConfigEntry(
domain=DOMAIN,
data={
"url": "http://router.lan/api",
"api_key": "key",
"api_secret": "secret",
"verify_ssl": False,
"tracker_interfaces": ["WAN"], # Filter to only WAN interface
result = await async_setup_component(
hass,
DOMAIN,
{
DOMAIN: {
CONF_URL: "https://fake_host_fun/api",
CONF_API_KEY: "fake_key",
CONF_API_SECRET: "fake_secret",
CONF_VERIFY_SSL: False,
}
},
)
mock_config_entry.runtime_data = OPNsenseRuntimeData(
client=mock_opnsense_client.return_value,
tracker_interfaces=["WAN"],
)
mock_config_entry.add_to_hass(hass)
# Setup the integration
assert await hass.config_entries.async_setup(mock_config_entry.entry_id)
await hass.async_block_till_done()
# Check that no device tracker entities are created (since all devices are on LAN)
device_tracker_entities = er.async_entries_for_config_entry(
entity_registry, mock_config_entry.entry_id
)
device_tracker_entities = [
entity
for entity in device_tracker_entities
if entity.domain == device_tracker.DOMAIN
]
assert len(device_tracker_entities) == 0
@pytest.mark.usefixtures("entity_registry_enabled_by_default")
async def test_device_tracker_coordinator_update_failure(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
mock_opnsense_client: mock.AsyncMock,
freezer: FrozenDateTimeFactory,
) -> None:
"""Test coordinator wraps client errors as UpdateFailed."""
assert await hass.config_entries.async_setup(mock_config_entry.entry_id)
await hass.async_block_till_done()
assert hass.states.get("device_tracker.desktop").state != STATE_UNAVAILABLE
mock_opnsense_client.get_arp_table.side_effect = OPNsenseConnectionError(
"connection failed"
)
freezer.tick(timedelta(seconds=30))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert hass.states.get("device_tracker.desktop").state == STATE_UNAVAILABLE
assert mock_opnsense_client.get_arp_table.call_count == 2
assert result
device_1 = hass.states.get("device_tracker.desktop")
assert device_1 is not None
assert device_1.state == "home"
device_2 = hass.states.get("device_tracker.ff_ff_ff_ff_ff_ff")
assert device_2.state == "home"
-88
View File
@@ -1,88 +0,0 @@
"""Tests for the opnsense integration setup."""
from unittest import mock
from aiopnsense import (
OPNsenseBelowMinFirmware,
OPNsenseConnectionError,
OPNsenseInvalidAuth,
OPNsenseInvalidURL,
OPNsensePrivilegeMissing,
OPNsenseSSLError,
OPNsenseTimeoutError,
OPNsenseUnknownFirmware,
)
import pytest
from homeassistant.components.opnsense.const import (
CONF_API_SECRET,
CONF_TRACKER_INTERFACES,
DOMAIN,
)
from homeassistant.config_entries import ConfigEntryState
from homeassistant.const import CONF_API_KEY, CONF_URL, CONF_VERIFY_SSL
from homeassistant.core import HomeAssistant
from tests.common import MockConfigEntry
@pytest.mark.parametrize(
("exc", "expected_state", "expected_translation_key"),
[
(OPNsenseUnknownFirmware, ConfigEntryState.SETUP_ERROR, "unknown_firmware"),
(OPNsenseBelowMinFirmware, ConfigEntryState.SETUP_ERROR, "firmware_too_old"),
(OPNsenseInvalidURL, ConfigEntryState.SETUP_ERROR, "invalid_url"),
(OPNsenseTimeoutError, ConfigEntryState.SETUP_RETRY, "timeout_connecting"),
(OPNsenseSSLError, ConfigEntryState.SETUP_ERROR, "ssl_error"),
(OPNsenseInvalidAuth, ConfigEntryState.SETUP_ERROR, "invalid_auth"),
(OPNsensePrivilegeMissing, ConfigEntryState.SETUP_ERROR, "privilege_missing"),
(OPNsenseConnectionError, ConfigEntryState.SETUP_RETRY, "cannot_connect"),
],
)
async def test_setup_entry_exceptions(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
mock_opnsense_client: mock.AsyncMock,
exc: type[Exception],
expected_state: ConfigEntryState,
expected_translation_key: str,
) -> None:
"""Test async_setup_entry surfaces translation-keyed errors."""
mock_opnsense_client.validate.side_effect = exc
await hass.config_entries.async_setup(mock_config_entry.entry_id)
await hass.async_block_till_done()
assert mock_config_entry.state is expected_state
assert mock_config_entry.error_reason_translation_key == expected_translation_key
assert mock_config_entry.error_reason_translation_placeholders == {
"url": mock_config_entry.data[CONF_URL]
}
async def test_setup_entry_tracker_interface_not_found(
hass: HomeAssistant,
mock_opnsense_client: mock.AsyncMock,
) -> None:
"""Test async_setup_entry rejects unknown tracker interfaces."""
entry = MockConfigEntry(
domain=DOMAIN,
data={
CONF_URL: "http://router.lan/api",
CONF_API_KEY: "key",
CONF_API_SECRET: "secret",
CONF_VERIFY_SSL: False,
CONF_TRACKER_INTERFACES: ["NOPE"],
},
)
entry.add_to_hass(hass)
await hass.config_entries.async_setup(entry.entry_id)
await hass.async_block_till_done()
assert entry.state is ConfigEntryState.SETUP_ERROR
assert entry.error_reason_translation_key == "tracker_interface_not_found"
assert entry.error_reason_translation_placeholders == {
"interface": "NOPE",
"known": "WAN, LAN",
}
+659
View File
@@ -0,0 +1,659 @@
"""Tests for the split_tests cache logic."""
from collections.abc import Callable
import json
from pathlib import Path
from unittest.mock import patch
import pytest
from script import split_tests
@pytest.fixture
def tree(tmp_path: Path) -> Path:
"""Build a tree: root conftest, two integrations, a ``common.py`` helper."""
# Bound the ancestor-fixture walk so it doesn't escape tmp_path.
(tmp_path / "pyproject.toml").write_text("")
(tmp_path / "conftest.py").write_text("# tests/conftest.py\n")
(tmp_path / "common.py").write_text("# helper module\n")
alpha_dir = tmp_path / "components" / "alpha"
alpha_dir.mkdir(parents=True)
(alpha_dir / "conftest.py").write_text("# alpha conftest\n")
(alpha_dir / "test_one.py").write_text("def test_a():\n pass\n")
(alpha_dir / "test_two.py").write_text("def test_b():\n pass\n")
beta_dir = tmp_path / "components" / "beta"
beta_dir.mkdir()
(beta_dir / "test_x.py").write_text("def test_x():\n pass\n")
return tmp_path
def test_iter_eligible_children_filters_helpers(tree: Path) -> None:
"""Helper files like conftest.py and common.py are not collection targets."""
children = split_tests._iter_eligible_children(tree)
names = {p.name for p in children}
assert "common.py" not in names
assert "conftest.py" not in names
# components/ is a dir, gets included.
assert "components" in names
def test_enumerate_batch_paths_fans_out_components(tree: Path) -> None:
"""tests/components fans out one level deeper into per-integration paths."""
paths = split_tests._enumerate_batch_paths(tree)
rel = {p.relative_to(tree).as_posix() for p in paths}
assert rel == {"components/beta", "components/alpha"}
def test_enumerate_batch_paths_for_single_file(tmp_path: Path) -> None:
"""A test file passed directly is returned as-is."""
file = tmp_path / "test_solo.py"
file.write_text("def test_x(): pass\n")
assert split_tests._enumerate_batch_paths(file) == [file]
def _fixture_hash_for(tree: Path, file: Path) -> str:
"""Compute the fixture scope hash for ``file`` rooted at ``tree``."""
_, fixtures = split_tests._walk_test_tree(tree)
fixtures_by_dir = split_tests._build_fixtures_by_dir(tree, fixtures)
return split_tests._file_fixture_hash(file, tree, fixtures_by_dir)
def _prime_cache(
cache_path: Path,
tree: Path,
hits: dict[Path, int] | None = None,
extra_entries: dict[str, split_tests._CacheEntry] | None = None,
) -> None:
"""Save a cache for ``tree`` keyed on real file and fixture hashes.
``hits`` maps file cached count (hashed for real, so the next
run resolves as a hit). ``extra_entries`` injects raw entries
whose path may not exist on disk (eg ghost files).
"""
entries: dict[str, split_tests._CacheEntry] = {
str(file.relative_to(tree)): split_tests._CacheEntry(
hash=split_tests._hash_file(file),
fixture_hash=_fixture_hash_for(tree, file),
count=count,
)
for file, count in (hits or {}).items()
}
if extra_entries:
entries.update(extra_entries)
split_tests._Cache(entries=entries).save(cache_path)
def _echo_one_test_each(
skip: set[Path] | None = None,
) -> Callable[[list[Path]], list[tuple[str, str, int]]]:
"""Fake ``_run_collect_batches``: 1 test per path; ``skip`` paths drop out."""
skip = skip or set()
def fake(paths: list[Path]) -> list[tuple[str, str, int]]:
emitted = [p for p in paths if p not in skip]
return [("\n".join(f"{p}: 1" for p in emitted) + "\n", "", 0)]
return fake
def test_file_fixture_hash_changes_when_ancestor_conftest_changes(tree: Path) -> None:
"""A conftest edit in the file's ancestor chain busts that file's hash."""
alpha_one = tree / "components" / "alpha" / "test_one.py"
before = _fixture_hash_for(tree, alpha_one)
# Same-dir conftest is an ancestor of alpha_one.
(tree / "components" / "alpha" / "conftest.py").write_text("# changed\n")
after = _fixture_hash_for(tree, alpha_one)
assert before != after
def test_file_fixture_hash_changes_when_same_dir_helper_changes(tree: Path) -> None:
"""A non-conftest helper in the same dir busts the file's hash."""
alpha_dir = tree / "components" / "alpha"
(alpha_dir / "common.py").write_text("# helper v1\n")
alpha_one = alpha_dir / "test_one.py"
before = _fixture_hash_for(tree, alpha_one)
(alpha_dir / "common.py").write_text("# helper v2\n")
after = _fixture_hash_for(tree, alpha_one)
assert before != after
def test_file_fixture_hash_isolated_from_sibling_dir(tree: Path) -> None:
"""A helper change in a sibling subtree leaves this file's hash alone."""
alpha_one = tree / "components" / "alpha" / "test_one.py"
before = _fixture_hash_for(tree, alpha_one)
# beta is a sibling of alpha (not an ancestor), so its helper edit
# must not affect alpha_one's fixture hash.
(tree / "components" / "beta" / "common.py").write_text("# beta v2\n")
after = _fixture_hash_for(tree, alpha_one)
assert before == after
def test_file_fixture_hash_changes_when_ancestor_helper_changes(tree: Path) -> None:
"""A helper edit anywhere on the ancestor path busts the file's hash.
Test files often import VALUES for ``@pytest.mark.parametrize`` from
shared helpers like ``tests/components/common.py``; any ancestor
``.py`` change has to invalidate descendants so cached counts don't
drift after edits to those sources.
"""
alpha_one = tree / "components" / "alpha" / "test_one.py"
# Seed a shared helper one level up from alpha.
components_common = tree / "components" / "common.py"
components_common.write_text("# helper v1\n")
before = _fixture_hash_for(tree, alpha_one)
components_common.write_text("# helper v2\n")
after = _fixture_hash_for(tree, alpha_one)
assert before != after
def test_file_fixture_hash_stable_for_test_changes(tree: Path) -> None:
"""Test-file edits do not invalidate the file's fixture hash."""
alpha_one = tree / "components" / "alpha" / "test_one.py"
before = _fixture_hash_for(tree, alpha_one)
alpha_one.write_text("def test_a():\n pass\n\ndef test_c():\n pass\n")
after = _fixture_hash_for(tree, alpha_one)
assert before == after
def test_find_ancestor_fixtures_stops_at_project_root(tmp_path: Path) -> None:
"""A project-root marker bounds the ancestor walk."""
project = tmp_path / "project"
project.mkdir()
(project / "pyproject.toml").write_text("")
(project / "common.py").write_text("# included\n")
nested = project / "tests" / "x"
nested.mkdir(parents=True)
# Above the project root: must NOT be picked up.
(tmp_path / "outside.py").write_text("# excluded\n")
found = {p.name for p in split_tests._find_ancestor_fixtures(nested)}
assert "common.py" in found
assert "outside.py" not in found
def test_find_ancestor_fixtures_walks_through_gaps(tmp_path: Path) -> None:
"""Ancestor conftests + helpers are collected across intermediate gaps."""
(tmp_path / "pyproject.toml").write_text("") # bound the walk
nested = tmp_path / "a" / "b" / "c"
nested.mkdir(parents=True)
# ``a/b`` has no fixtures, but ``a`` has both a conftest and a helper.
(tmp_path / "a" / "conftest.py").write_text("# a\n")
(tmp_path / "a" / "common.py").write_text("# a helper\n")
(tmp_path / "a" / "b" / "c" / "conftest.py").write_text("# c\n")
found = {
p.relative_to(tmp_path).as_posix()
for p in split_tests._find_ancestor_fixtures(nested)
}
# The walk starts at ``nested.parent`` (a/b); a/b/c/conftest.py is
# not an ancestor. Both ``a/conftest.py`` and ``a/common.py`` must
# be found despite a/b having no fixtures of its own.
assert "a/conftest.py" in found
assert "a/common.py" in found
assert "a/b/c/conftest.py" not in found
def test_file_fixture_hash_picks_up_ancestor_helper_above_root(
tmp_path: Path,
) -> None:
"""An ancestor non-conftest helper above root still busts descendant hashes.
A subtree run on ``components/`` must still invalidate when a shared
helper one level up (eg ``tests/components/common.py``) changes.
"""
(tmp_path / "pyproject.toml").write_text("") # bound the walk
(tmp_path / "common.py").write_text("# v1\n")
subtree = tmp_path / "components"
subtree.mkdir()
test_file = subtree / "test_x.py"
test_file.write_text("def test_x(): pass\n")
before = _fixture_hash_for(subtree, test_file)
(tmp_path / "common.py").write_text("# v2\n")
after = _fixture_hash_for(subtree, test_file)
assert before != after
def test_file_fixture_hash_picks_up_ancestor_conftest_across_gap(
tmp_path: Path,
) -> None:
"""An ancestor conftest across a gap still busts the descendant's hash."""
(tmp_path / "pyproject.toml").write_text("") # bound the walk
nested = tmp_path / "a" / "b"
nested.mkdir(parents=True)
(tmp_path / "a" / "conftest.py").write_text("# v1\n")
test_file = nested / "test_x.py"
test_file.write_text("def test_x(): pass\n")
before = _fixture_hash_for(nested, test_file)
(tmp_path / "a" / "conftest.py").write_text("# v2\n")
after = _fixture_hash_for(nested, test_file)
assert before != after
def test_file_fixture_hash_includes_ancestor_above_root(tmp_path: Path) -> None:
"""An ancestor conftest above root must still scope a subtree file."""
(tmp_path / "pyproject.toml").write_text("") # bound the walk
(tmp_path / "conftest.py").write_text("# parent\n")
subtree = tmp_path / "components"
subtree.mkdir()
test_file = subtree / "test_x.py"
test_file.write_text("def test_x(): pass\n")
before = _fixture_hash_for(subtree, test_file)
(tmp_path / "conftest.py").write_text("# parent changed\n")
after = _fixture_hash_for(subtree, test_file)
assert before != after
def test_walk_test_tree_separates_tests_from_fixtures(tree: Path) -> None:
"""The walker returns test_*.py files and every other .py as fixtures."""
test_files, fixtures = split_tests._walk_test_tree(tree)
test_names = {p.name for p in test_files}
fixture_paths = {p.relative_to(tree).as_posix() for p in fixtures}
assert test_names == {"test_one.py", "test_two.py", "test_x.py"}
assert fixture_paths == {
"conftest.py",
"common.py",
"components/alpha/conftest.py",
}
def test_walk_test_tree_skips_hidden_and_dunder_dirs(tmp_path: Path) -> None:
"""Hidden/dunder directories are pruned from the walk."""
(tmp_path / "__pycache__").mkdir()
(tmp_path / "__pycache__" / "test_ghost.py").write_text("def test_g(): pass\n")
(tmp_path / ".hidden").mkdir()
(tmp_path / ".hidden" / "test_invisible.py").write_text("def test_h(): pass\n")
(tmp_path / "test_real.py").write_text("def test_r(): pass\n")
test_files, _ = split_tests._walk_test_tree(tmp_path)
assert {p.name for p in test_files} == {"test_real.py"}
def test_collect_tests_rejects_single_file_root(tmp_path: Path) -> None:
"""A file root has no parent to anchor a TestFolder, so we exit early."""
file = tmp_path / "test_solo.py"
file.write_text("def test_x(): pass\n")
with pytest.raises(SystemExit):
split_tests.collect_tests(file)
def test_cache_roundtrip(tmp_path: Path) -> None:
"""A cache survives save → load."""
cache_path = tmp_path / "cache.json"
cache = split_tests._Cache(
entries={
"tests/alpha/test_a.py": split_tests._CacheEntry(
hash="h1", fixture_hash="f1", count=5
)
},
)
cache.save(cache_path)
loaded = split_tests._Cache.load(cache_path)
assert loaded.entries == cache.entries
def test_cache_load_missing_returns_empty(tmp_path: Path) -> None:
"""A missing cache file degrades gracefully to an empty cache."""
cache = split_tests._Cache.load(tmp_path / "missing.json")
assert cache.entries == {}
def test_cache_load_invalid_json_returns_empty(tmp_path: Path) -> None:
"""Corrupt JSON is treated as a cache miss instead of crashing."""
path = tmp_path / "broken.json"
path.write_text("{not json")
cache = split_tests._Cache.load(path)
assert cache.entries == {}
def test_cache_load_wrong_version_returns_empty(tmp_path: Path) -> None:
"""An older cache schema is discarded rather than misread."""
path = tmp_path / "old.json"
path.write_text(json.dumps({"version": 0, "files": {}}))
cache = split_tests._Cache.load(path)
assert cache.entries == {}
def test_cache_load_drops_malformed_entries(tmp_path: Path) -> None:
"""Malformed per-file entries are skipped, valid ones are kept."""
path = tmp_path / "cache.json"
path.write_text(
json.dumps(
{
"version": split_tests._CACHE_VERSION,
"files": {
"good.py": {"hash": "h1", "fixture_hash": "f1", "count": 3},
"bad_count.py": {
"hash": "h2",
"fixture_hash": "f2",
"count": "three",
},
"missing_hash.py": {"fixture_hash": "f3", "count": 4},
"missing_fixture_hash.py": {"hash": "h4", "count": 4},
"not_dict.py": 5,
"negative_count.py": {
"hash": "h5",
"fixture_hash": "f5",
"count": -1,
},
},
}
)
)
cache = split_tests._Cache.load(path)
assert set(cache.entries) == {"good.py"}
def test_cache_save_creates_parent_dir(tmp_path: Path) -> None:
"""Save mkdirs missing parent dirs so ``--cache foo/bar.json`` works."""
cache_path = tmp_path / "nested" / "subdir" / "cache.json"
split_tests._Cache(entries={}).save(cache_path)
assert cache_path.is_file()
def _resolve(
test_files: list[Path], cache: split_tests._Cache, tree: Path
) -> tuple[dict[Path, split_tests._CacheEntry], list[Path]]:
"""Run resolve_entries with a freshly indexed fixtures_by_dir."""
_, fixtures = split_tests._walk_test_tree(tree)
return split_tests._resolve_entries(
test_files,
cache,
tree,
split_tests._build_fixtures_by_dir(tree, fixtures),
)
def test_resolve_entries_hits_and_misses(tree: Path) -> None:
"""Files with matching content + fixture hashes are hits."""
alpha_one = tree / "components" / "alpha" / "test_one.py"
alpha_two = tree / "components" / "alpha" / "test_two.py"
beta_x = tree / "components" / "beta" / "test_x.py"
alpha_one_hash = split_tests._hash_file(alpha_one)
alpha_one_fixture = _fixture_hash_for(tree, alpha_one)
cache = split_tests._Cache(
entries={
str(alpha_one.relative_to(tree)): split_tests._CacheEntry(
hash=alpha_one_hash, fixture_hash=alpha_one_fixture, count=1
),
str(alpha_two.relative_to(tree)): split_tests._CacheEntry(
hash="stale", fixture_hash=alpha_one_fixture, count=99
),
},
)
entries, misses = _resolve([alpha_one, alpha_two, beta_x], cache, tree)
# Hit: cached entry passed through verbatim.
assert entries[alpha_one] == split_tests._CacheEntry(
hash=alpha_one_hash, fixture_hash=alpha_one_fixture, count=1
)
# Misses: fresh hashes plus a count=0 placeholder.
assert set(misses) == {alpha_two, beta_x}
assert entries[alpha_two].count == 0
assert entries[alpha_two].hash == split_tests._hash_file(alpha_two)
assert entries[beta_x].count == 0
assert entries[beta_x].hash == split_tests._hash_file(beta_x)
def test_resolve_entries_misses_on_fixture_drift(tree: Path) -> None:
"""A file with unchanged content but changed scope counts as a miss."""
alpha_one = tree / "components" / "alpha" / "test_one.py"
cache = split_tests._Cache(
entries={
str(alpha_one.relative_to(tree)): split_tests._CacheEntry(
hash=split_tests._hash_file(alpha_one),
fixture_hash="stale-fixture-hash",
count=1,
),
},
)
_, misses = _resolve([alpha_one], cache, tree)
assert misses == [alpha_one]
def test_resolve_entries_isolates_unrelated_dirs(tree: Path) -> None:
"""Editing a helper in one dir leaves files in other dirs as hits."""
alpha_dir = tree / "components" / "alpha"
beta_dir = tree / "components" / "beta"
# Helpers per dir, so a change in alpha doesn't bust beta.
(alpha_dir / "common.py").write_text("# alpha helper v1\n")
(beta_dir / "common.py").write_text("# beta helper v1\n")
alpha_one = alpha_dir / "test_one.py"
beta_x = beta_dir / "test_x.py"
# Snapshot cache entries with the v1 fixture state.
cache = split_tests._Cache(
entries={
str(alpha_one.relative_to(tree)): split_tests._CacheEntry(
hash=split_tests._hash_file(alpha_one),
fixture_hash=_fixture_hash_for(tree, alpha_one),
count=1,
),
str(beta_x.relative_to(tree)): split_tests._CacheEntry(
hash=split_tests._hash_file(beta_x),
fixture_hash=_fixture_hash_for(tree, beta_x),
count=2,
),
},
)
# Now bust beta's helper; alpha's scope is unchanged, beta's isn't.
(beta_dir / "common.py").write_text("# beta helper v2\n")
_, misses = _resolve([alpha_one, beta_x], cache, tree)
assert misses == [beta_x]
def test_collect_tests_hashes_each_file_once(tree: Path) -> None:
"""Hits reuse the stored hash, misses reuse the resolve-time hash.
Guards against regressing the double-read on cache-miss rebuilds:
each test file should pass through _hash_file at most once per run.
"""
cache_path = tree / "cache.json"
alpha_one = tree / "components" / "alpha" / "test_one.py"
# Prime with one hit so we exercise the file-level (not directory-level) miss path.
_prime_cache(cache_path, tree, hits={alpha_one: 1})
real_hash = split_tests._hash_file
counts: dict[Path, int] = {}
def counting_hash(path: Path) -> str:
counts[path] = counts.get(path, 0) + 1
return real_hash(path)
# Pin the threshold so the tiny tree stays on the file-level path.
with (
patch.object(split_tests, "_DIR_LEVEL_MISS_RATIO", 1.0),
patch.object(split_tests, "_hash_file", side_effect=counting_hash),
patch.object(
split_tests, "_run_collect_batches", side_effect=_echo_one_test_each()
),
):
split_tests.collect_tests(tree, cache_path)
assert all(n == 1 for n in counts.values()), counts
def test_collect_tests_warm_cache_skips_pytest(tree: Path) -> None:
"""A warm cache with no diffs should skip the pytest subprocess entirely."""
cache_path = tree / "cache.json"
alpha_one = tree / "components" / "alpha" / "test_one.py"
alpha_two = tree / "components" / "alpha" / "test_two.py"
beta_x = tree / "components" / "beta" / "test_x.py"
_prime_cache(cache_path, tree, hits={alpha_one: 1, alpha_two: 2, beta_x: 3})
with patch.object(split_tests, "_run_collect_batches") as run_batches:
folder = split_tests.collect_tests(tree, cache_path)
run_batches.assert_not_called()
assert folder.total_tests == 6
def test_collect_tests_cold_cache_collects_only_missing(tree: Path) -> None:
"""A partial cache should only re-collect the files that changed."""
cache_path = tree / "cache.json"
alpha_one = tree / "components" / "alpha" / "test_one.py"
alpha_two = tree / "components" / "alpha" / "test_two.py"
beta_x = tree / "components" / "beta" / "test_x.py"
_prime_cache(cache_path, tree, hits={alpha_one: 1})
with (
patch.object(split_tests, "_DIR_LEVEL_MISS_RATIO", 1.0),
patch.object(
split_tests, "_run_collect_batches", side_effect=_echo_one_test_each()
) as run_batches,
):
folder = split_tests.collect_tests(tree, cache_path)
assert run_batches.call_count == 1
requested = set(run_batches.call_args.args[0])
assert requested == {alpha_two, beta_x}
assert folder.total_tests == 3
# Cache should now contain entries for every test file.
saved = json.loads(cache_path.read_text())
assert set(saved["files"]) == {
str(alpha_one.relative_to(tree)),
str(alpha_two.relative_to(tree)),
str(beta_x.relative_to(tree)),
}
def test_collect_tests_falls_back_to_dirs_when_misses_dominate(tree: Path) -> None:
"""Heavy misses should switch back to dir-level invocation."""
cache_path = tree / "cache.json"
alpha_one = tree / "components" / "alpha" / "test_one.py"
_prime_cache(cache_path, tree, hits={alpha_one: 1})
# 2 misses / 3 total = 67% miss, above the 30% default threshold; this
# also covers the new-directory PR case (mostly-new test files).
with patch.object(
split_tests, "_run_collect_batches", side_effect=_echo_one_test_each()
) as run_batches:
split_tests.collect_tests(tree, cache_path)
# We expect the dir-level batch paths, not the individual miss files.
requested = set(run_batches.call_args.args[0])
assert requested == set(split_tests._enumerate_batch_paths(tree))
def test_collect_tests_caches_files_with_no_collected_tests(tree: Path) -> None:
"""Files pytest returns nothing for are cached as 0 so we stop re-collecting them.
Helper modules named test_*.py with no actual test functions look like
test files to the walker but pytest reports no tests for them. We
want the cache to remember that and skip them on subsequent runs.
"""
cache_path = tree / "cache.json"
alpha_one = tree / "components" / "alpha" / "test_one.py"
alpha_two = tree / "components" / "alpha" / "test_two.py"
beta_x = tree / "components" / "beta" / "test_x.py"
# Prime the cache with one hit so collect_tests takes the file-level
# diff path; the cold-cache path hands pytest top-level directories
# rather than individual file paths.
_prime_cache(cache_path, tree, hits={alpha_one: 1})
with (
patch.object(split_tests, "_DIR_LEVEL_MISS_RATIO", 1.0),
patch.object(
split_tests,
"_run_collect_batches",
side_effect=_echo_one_test_each(skip={alpha_two}),
),
):
split_tests.collect_tests(tree, cache_path)
saved = json.loads(cache_path.read_text())
assert saved["files"][str(alpha_two.relative_to(tree))]["count"] == 0
assert saved["files"][str(alpha_one.relative_to(tree))]["count"] == 1
assert saved["files"][str(beta_x.relative_to(tree))]["count"] == 1
# Re-running with the same content should now be a full cache hit
# even though alpha_two has no tests.
with patch.object(split_tests, "_run_collect_batches") as run_batches:
folder = split_tests.collect_tests(tree, cache_path)
run_batches.assert_not_called()
# alpha_two contributes 0, only alpha_one + beta_x count.
assert folder.total_tests == 2
def test_collect_tests_drops_deleted_files_from_cache(tree: Path) -> None:
"""Files that disappear from disk are dropped from the saved cache."""
cache_path = tree / "cache.json"
alpha_one = tree / "components" / "alpha" / "test_one.py"
ghost_rel = "components/alpha/test_ghost.py"
_prime_cache(
cache_path,
tree,
hits={alpha_one: 1},
extra_entries={
ghost_rel: split_tests._CacheEntry(
hash="dead", fixture_hash="dead", count=42
)
},
)
with (
patch.object(split_tests, "_DIR_LEVEL_MISS_RATIO", 1.0),
patch.object(
split_tests, "_run_collect_batches", side_effect=_echo_one_test_each()
),
):
split_tests.collect_tests(tree, cache_path)
saved = json.loads(cache_path.read_text())
assert ghost_rel not in saved["files"]
def _build_folder(tree: Path, counts: dict[Path, int]) -> split_tests.TestFolder:
"""Build a TestFolder for ``tree`` populated with ``counts``."""
folder = split_tests.TestFolder(tree)
for path, n in counts.items():
folder.add_test_file(split_tests.TestFile(n, path))
return folder
def test_split_tests_keeps_siblings_together_when_snapshots_present(
tmp_path: Path,
) -> None:
"""Same-dir files stay together when the folder has syrupy snapshots."""
one = tmp_path / "alpha" / "test_one.py"
two = tmp_path / "alpha" / "test_two.py"
one.parent.mkdir(parents=True)
one.touch()
two.touch()
# Add a snapshot so the syrupy constraint kicks in.
snapshots = tmp_path / "alpha" / "snapshots"
snapshots.mkdir()
(snapshots / "test_one.ambr").write_text("")
folder = _build_folder(tmp_path, {one: 60, two: 60})
holder = split_tests.BucketHolder(tests_per_bucket=50, bucket_count=3)
holder.split_tests(folder)
# Both files must end up in one bucket; the other two stay empty.
sizes = sorted(b.total_tests for b in holder._buckets)
assert sizes == [0, 0, 120]
def test_split_tests_splits_siblings_when_no_snapshots(tmp_path: Path) -> None:
"""Same-dir files split freely across buckets when no snapshots exist."""
one = tmp_path / "alpha" / "test_one.py"
two = tmp_path / "alpha" / "test_two.py"
one.parent.mkdir(parents=True)
one.touch()
two.touch()
# No snapshots dir → free to split.
folder = _build_folder(tmp_path, {one: 60, two: 60})
holder = split_tests.BucketHolder(tests_per_bucket=70, bucket_count=2)
holder.split_tests(folder)
sizes = sorted(b.total_tests for b in holder._buckets)
assert sizes == [60, 60]