mirror of
https://github.com/home-assistant/core.git
synced 2026-04-14 21:56:16 +02:00
Compare commits
1 Commits
trigger_ad
...
remove_dep
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
65f073ca15 |
@@ -11,10 +11,21 @@ import threading
|
||||
|
||||
from .backup_restore import restore_backup
|
||||
from .const import REQUIRED_PYTHON_VER, RESTART_EXIT_CODE, __version__
|
||||
from .util.package import is_docker_env, is_virtual_env
|
||||
|
||||
FAULT_LOG_FILENAME = "home-assistant.log.fault"
|
||||
|
||||
|
||||
def validate_environment() -> None:
|
||||
"""Validate that Home Assistant is started from a container or a venv."""
|
||||
if not is_virtual_env() and not is_docker_env():
|
||||
print(
|
||||
"Home Assistant must be run in a Python virtual environment or a container.",
|
||||
file=sys.stderr,
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def validate_os() -> None:
|
||||
"""Validate that Home Assistant is running in a supported operating system."""
|
||||
if not sys.platform.startswith(("darwin", "linux")):
|
||||
@@ -40,8 +51,6 @@ def ensure_config_path(config_dir: str) -> None:
|
||||
"""Validate the configuration directory."""
|
||||
from . import config as config_util # noqa: PLC0415
|
||||
|
||||
lib_dir = os.path.join(config_dir, "deps")
|
||||
|
||||
# Test if configuration directory exists
|
||||
if not os.path.isdir(config_dir):
|
||||
if config_dir != config_util.get_default_config_dir():
|
||||
@@ -65,17 +74,6 @@ def ensure_config_path(config_dir: str) -> None:
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
# Test if library directory exists
|
||||
if not os.path.isdir(lib_dir):
|
||||
try:
|
||||
os.mkdir(lib_dir)
|
||||
except OSError as ex:
|
||||
print(
|
||||
f"Fatal Error: Unable to create library directory {lib_dir}: {ex}",
|
||||
file=sys.stderr,
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def get_arguments() -> argparse.Namespace:
|
||||
"""Get parsed passed in arguments."""
|
||||
@@ -168,6 +166,7 @@ def check_threads() -> None:
|
||||
def main() -> int:
|
||||
"""Start Home Assistant."""
|
||||
validate_python()
|
||||
validate_environment()
|
||||
|
||||
args = get_arguments()
|
||||
|
||||
|
||||
@@ -108,7 +108,7 @@ from .setup import (
|
||||
from .util.async_ import create_eager_task
|
||||
from .util.hass_dict import HassKey
|
||||
from .util.logging import async_activate_log_queue_handler
|
||||
from .util.package import async_get_user_site, is_docker_env, is_virtual_env
|
||||
from .util.package import is_docker_env
|
||||
from .util.system_info import is_official_image
|
||||
|
||||
with contextlib.suppress(ImportError):
|
||||
@@ -353,9 +353,6 @@ async def async_setup_hass(
|
||||
err,
|
||||
)
|
||||
else:
|
||||
if not is_virtual_env():
|
||||
await async_mount_local_lib_path(runtime_config.config_dir)
|
||||
|
||||
if hass.config.safe_mode:
|
||||
_LOGGER.info("Starting in safe mode")
|
||||
|
||||
@@ -704,17 +701,6 @@ class _RotatingFileHandlerWithoutShouldRollOver(RotatingFileHandler):
|
||||
return False
|
||||
|
||||
|
||||
async def async_mount_local_lib_path(config_dir: str) -> str:
|
||||
"""Add local library to Python Path.
|
||||
|
||||
This function is a coroutine.
|
||||
"""
|
||||
deps_dir = os.path.join(config_dir, "deps")
|
||||
if (lib_dir := await async_get_user_site(deps_dir)) not in sys.path:
|
||||
sys.path.insert(0, lib_dir)
|
||||
return deps_dir
|
||||
|
||||
|
||||
def _get_domains(hass: core.HomeAssistant, config: dict[str, Any]) -> set[str]:
|
||||
"""Get domains of components to set up."""
|
||||
# The common config section [homeassistant] could be filtered here,
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
{
|
||||
"common": {
|
||||
"condition_behavior_name": "Condition passes if",
|
||||
"trigger_behavior_name": "Trigger when",
|
||||
"trigger_for_name": "For at least"
|
||||
"trigger_behavior_name": "Trigger when"
|
||||
},
|
||||
"conditions": {
|
||||
"is_detected": {
|
||||
@@ -46,9 +45,6 @@
|
||||
"fields": {
|
||||
"behavior": {
|
||||
"name": "[%key:component::motion::common::trigger_behavior_name%]"
|
||||
},
|
||||
"for": {
|
||||
"name": "[%key:component::motion::common::trigger_for_name%]"
|
||||
}
|
||||
},
|
||||
"name": "Motion cleared"
|
||||
@@ -58,9 +54,6 @@
|
||||
"fields": {
|
||||
"behavior": {
|
||||
"name": "[%key:component::motion::common::trigger_behavior_name%]"
|
||||
},
|
||||
"for": {
|
||||
"name": "[%key:component::motion::common::trigger_for_name%]"
|
||||
}
|
||||
},
|
||||
"name": "Motion detected"
|
||||
|
||||
@@ -9,11 +9,6 @@
|
||||
- first
|
||||
- last
|
||||
- any
|
||||
for:
|
||||
required: true
|
||||
default: 00:00:00
|
||||
selector:
|
||||
duration:
|
||||
|
||||
detected:
|
||||
fields: *trigger_common_fields
|
||||
|
||||
@@ -13,7 +13,6 @@ import logging
|
||||
import operator
|
||||
import os
|
||||
from pathlib import Path
|
||||
import shutil
|
||||
from types import ModuleType
|
||||
from typing import TYPE_CHECKING, Any, Literal, overload
|
||||
|
||||
@@ -32,7 +31,6 @@ from .helpers.typing import ConfigType
|
||||
from .loader import ComponentProtocol, Integration, IntegrationNotFound
|
||||
from .requirements import RequirementsNotFound, async_get_integration_with_requirements
|
||||
from .util.async_ import create_eager_task
|
||||
from .util.package import is_docker_env
|
||||
from .util.yaml import SECRET_YAML, Secrets, YamlTypeError, load_yaml_dict
|
||||
from .util.yaml.objects import NodeStrClass
|
||||
|
||||
@@ -308,12 +306,6 @@ def process_ha_config_upgrade(hass: HomeAssistant) -> None:
|
||||
|
||||
version_obj = AwesomeVersion(conf_version)
|
||||
|
||||
if version_obj < AwesomeVersion("0.50"):
|
||||
# 0.50 introduced persistent deps dir.
|
||||
lib_path = hass.config.path("deps")
|
||||
if os.path.isdir(lib_path):
|
||||
shutil.rmtree(lib_path)
|
||||
|
||||
if version_obj < AwesomeVersion("0.92"):
|
||||
# 0.92 moved google/tts.py to google_translate/tts.py
|
||||
config_path = hass.config.path(YAML_CONFIG_FILE)
|
||||
@@ -330,13 +322,6 @@ def process_ha_config_upgrade(hass: HomeAssistant) -> None:
|
||||
except OSError:
|
||||
_LOGGER.exception("Migrating to google_translate tts failed")
|
||||
|
||||
if version_obj < AwesomeVersion("0.94") and is_docker_env():
|
||||
# In 0.94 we no longer install packages inside the deps folder when
|
||||
# running inside a Docker container.
|
||||
lib_path = hass.config.path("deps")
|
||||
if os.path.isdir(lib_path):
|
||||
shutil.rmtree(lib_path)
|
||||
|
||||
with open(version_path, "w", encoding="utf8") as outp:
|
||||
outp.write(__version__)
|
||||
|
||||
|
||||
@@ -7,7 +7,6 @@ import asyncio
|
||||
from collections import defaultdict
|
||||
from collections.abc import Callable, Coroutine, Iterable, Mapping
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import timedelta
|
||||
import functools
|
||||
import inspect
|
||||
import logging
|
||||
@@ -32,7 +31,6 @@ from homeassistant.const import (
|
||||
CONF_ENABLED,
|
||||
CONF_ENTITY_ID,
|
||||
CONF_EVENT_DATA,
|
||||
CONF_FOR,
|
||||
CONF_ID,
|
||||
CONF_OPTIONS,
|
||||
CONF_PLATFORM,
|
||||
@@ -76,7 +74,6 @@ from .automation import (
|
||||
get_relative_description_key,
|
||||
move_options_fields_to_top_level,
|
||||
)
|
||||
from .event import async_track_same_state
|
||||
from .integration_platform import async_process_integration_platforms
|
||||
from .selector import (
|
||||
NumericThresholdMode,
|
||||
@@ -343,7 +340,6 @@ ENTITY_STATE_TRIGGER_SCHEMA_FIRST_LAST = ENTITY_STATE_TRIGGER_SCHEMA.extend(
|
||||
vol.Required(ATTR_BEHAVIOR, default=BEHAVIOR_ANY): vol.In(
|
||||
[BEHAVIOR_FIRST, BEHAVIOR_LAST, BEHAVIOR_ANY]
|
||||
),
|
||||
vol.Optional(CONF_FOR): cv.positive_time_period_dict,
|
||||
},
|
||||
}
|
||||
)
|
||||
@@ -369,7 +365,6 @@ class EntityTriggerBase(Trigger):
|
||||
if TYPE_CHECKING:
|
||||
assert config.target is not None
|
||||
self._options = config.options or {}
|
||||
self._duration: timedelta | None = self._options.get(CONF_FOR)
|
||||
self._target = config.target
|
||||
|
||||
def entity_filter(self, entities: set[str]) -> set[str]:
|
||||
@@ -399,12 +394,15 @@ class EntityTriggerBase(Trigger):
|
||||
if (state := self._hass.states.get(entity_id)) is not None
|
||||
)
|
||||
|
||||
def count_matches(self, entity_ids: set[str]) -> int:
|
||||
"""Count the number of entity states that match."""
|
||||
return sum(
|
||||
self.is_valid_state(state)
|
||||
for entity_id in entity_ids
|
||||
if (state := self._hass.states.get(entity_id)) is not None
|
||||
def check_one_match(self, entity_ids: set[str]) -> bool:
|
||||
"""Check that only one entity state matches."""
|
||||
return (
|
||||
sum(
|
||||
self.is_valid_state(state)
|
||||
for entity_id in entity_ids
|
||||
if (state := self._hass.states.get(entity_id)) is not None
|
||||
)
|
||||
== 1
|
||||
)
|
||||
|
||||
@override
|
||||
@@ -413,8 +411,7 @@ class EntityTriggerBase(Trigger):
|
||||
) -> CALLBACK_TYPE:
|
||||
"""Attach the trigger to an action runner."""
|
||||
|
||||
behavior: str = self._options.get(ATTR_BEHAVIOR, BEHAVIOR_ANY)
|
||||
unsub_track_same: dict[str, Callable[[], None]] = {}
|
||||
behavior = self._options.get(ATTR_BEHAVIOR)
|
||||
|
||||
@callback
|
||||
def state_change_listener(
|
||||
@@ -426,31 +423,6 @@ class EntityTriggerBase(Trigger):
|
||||
from_state = event.data["old_state"]
|
||||
to_state = event.data["new_state"]
|
||||
|
||||
def state_still_valid(
|
||||
_: str, from_state: State | None, to_state: State | None
|
||||
) -> bool:
|
||||
"""Check if the state is still valid during the duration wait.
|
||||
|
||||
Called by async_track_same_state on each state change to
|
||||
determine whether to cancel the timer.
|
||||
For behavior any, checks the individual entity's state.
|
||||
For behavior first/last, checks the combined state.
|
||||
"""
|
||||
if not from_state or not to_state:
|
||||
return False
|
||||
|
||||
if behavior == BEHAVIOR_LAST:
|
||||
return self.check_all_match(
|
||||
target_state_change_data.targeted_entity_ids
|
||||
)
|
||||
if behavior == BEHAVIOR_FIRST:
|
||||
return (
|
||||
self.count_matches(target_state_change_data.targeted_entity_ids)
|
||||
>= 1
|
||||
)
|
||||
# Behavior any: check the individual entity's state
|
||||
return self.is_valid_state(to_state)
|
||||
|
||||
if not from_state or not to_state:
|
||||
return
|
||||
|
||||
@@ -468,59 +440,25 @@ class EntityTriggerBase(Trigger):
|
||||
):
|
||||
return
|
||||
elif behavior == BEHAVIOR_FIRST:
|
||||
if (
|
||||
self.count_matches(target_state_change_data.targeted_entity_ids)
|
||||
!= 1
|
||||
if not self.check_one_match(
|
||||
target_state_change_data.targeted_entity_ids
|
||||
):
|
||||
return
|
||||
|
||||
@callback
|
||||
def call_action() -> None:
|
||||
"""Call action with right context."""
|
||||
# After a `for` delay, keep the original triggering event payload.
|
||||
# `async_track_same_state` only verifies the state remained valid
|
||||
# for the configured duration before firing the action.
|
||||
run_action(
|
||||
{
|
||||
ATTR_ENTITY_ID: entity_id,
|
||||
"from_state": from_state,
|
||||
"to_state": to_state,
|
||||
},
|
||||
f"state of {entity_id}",
|
||||
event.context,
|
||||
)
|
||||
|
||||
if not self._duration:
|
||||
call_action()
|
||||
return
|
||||
|
||||
subscription_key = entity_id if behavior == BEHAVIOR_ANY else behavior
|
||||
if subscription_key in unsub_track_same:
|
||||
unsub_track_same.pop(subscription_key)()
|
||||
unsub_track_same[subscription_key] = async_track_same_state(
|
||||
self._hass,
|
||||
self._duration,
|
||||
call_action,
|
||||
state_still_valid,
|
||||
entity_ids=entity_id
|
||||
if behavior == BEHAVIOR_ANY
|
||||
else target_state_change_data.targeted_entity_ids,
|
||||
run_action(
|
||||
{
|
||||
ATTR_ENTITY_ID: entity_id,
|
||||
"from_state": from_state,
|
||||
"to_state": to_state,
|
||||
},
|
||||
f"state of {entity_id}",
|
||||
event.context,
|
||||
)
|
||||
|
||||
unsub = async_track_target_selector_state_change_event(
|
||||
return async_track_target_selector_state_change_event(
|
||||
self._hass, self._target, state_change_listener, self.entity_filter
|
||||
)
|
||||
|
||||
@callback
|
||||
def async_remove() -> None:
|
||||
"""Remove state listeners async."""
|
||||
unsub()
|
||||
for async_remove in unsub_track_same.values():
|
||||
async_remove()
|
||||
unsub_track_same.clear()
|
||||
|
||||
return async_remove
|
||||
|
||||
|
||||
class EntityTargetStateTriggerBase(EntityTriggerBase):
|
||||
"""Trigger for entity state changes to a specific state.
|
||||
|
||||
@@ -94,16 +94,12 @@ def async_clear_install_history(hass: HomeAssistant) -> None:
|
||||
_async_get_manager(hass).install_failure_history.clear()
|
||||
|
||||
|
||||
def pip_kwargs(config_dir: str | None) -> dict[str, Any]:
|
||||
def pip_kwargs() -> dict[str, Any]:
|
||||
"""Return keyword arguments for PIP install."""
|
||||
is_docker = pkg_util.is_docker_env()
|
||||
kwargs = {
|
||||
return {
|
||||
"constraints": os.path.join(os.path.dirname(__file__), CONSTRAINT_FILE),
|
||||
"timeout": PIP_TIMEOUT,
|
||||
}
|
||||
if not (config_dir is None or pkg_util.is_virtual_env()) and not is_docker:
|
||||
kwargs["target"] = os.path.join(config_dir, "deps")
|
||||
return kwargs
|
||||
|
||||
|
||||
def _install_with_retry(requirement: str, kwargs: dict[str, Any]) -> bool:
|
||||
@@ -336,7 +332,7 @@ class RequirementsManager:
|
||||
requirements: list[str],
|
||||
) -> None:
|
||||
"""Install a requirement and save failures."""
|
||||
kwargs = pip_kwargs(self.hass.config.config_dir)
|
||||
kwargs = pip_kwargs()
|
||||
installed, failures = await self.hass.async_add_executor_job(
|
||||
_install_requirements_if_missing, requirements, kwargs
|
||||
)
|
||||
|
||||
@@ -11,10 +11,9 @@ import os
|
||||
import sys
|
||||
|
||||
from homeassistant import runner
|
||||
from homeassistant.bootstrap import async_mount_local_lib_path
|
||||
from homeassistant.config import get_default_config_dir
|
||||
from homeassistant.requirements import pip_kwargs
|
||||
from homeassistant.util.package import install_package, is_installed, is_virtual_env
|
||||
from homeassistant.util.package import install_package, is_installed
|
||||
|
||||
# mypy: allow-untyped-defs, disallow-any-generics, no-warn-return-any
|
||||
|
||||
@@ -44,12 +43,7 @@ def run(args: list[str]) -> int:
|
||||
|
||||
script = importlib.import_module(f"homeassistant.scripts.{args[0]}")
|
||||
|
||||
config_dir = extract_config_dir()
|
||||
|
||||
if not is_virtual_env():
|
||||
asyncio.run(async_mount_local_lib_path(config_dir))
|
||||
|
||||
_pip_kwargs = pip_kwargs(config_dir)
|
||||
_pip_kwargs = pip_kwargs()
|
||||
|
||||
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
|
||||
|
||||
|
||||
@@ -149,7 +149,7 @@ def run(script_args: list) -> int:
|
||||
yaml_files = [
|
||||
f
|
||||
for f in glob(os.path.join(config_dir, "**/*.yaml"), recursive=True)
|
||||
if not f.startswith(deps)
|
||||
if not f.startswith(deps) # Avoid scanning legacy deps folder
|
||||
]
|
||||
|
||||
for yfn in sorted(yaml_files):
|
||||
|
||||
@@ -2,13 +2,10 @@
|
||||
|
||||
from collections.abc import Mapping
|
||||
from contextlib import AbstractContextManager, nullcontext as does_not_raise
|
||||
import datetime
|
||||
import io
|
||||
import logging
|
||||
from typing import Any
|
||||
from unittest.mock import ANY, AsyncMock, MagicMock, Mock, call, patch
|
||||
|
||||
from freezegun.api import FrozenDateTimeFactory
|
||||
import pytest
|
||||
from pytest_unordered import unordered
|
||||
import voluptuous as vol
|
||||
@@ -22,12 +19,9 @@ from homeassistant.const import (
|
||||
ATTR_DEVICE_CLASS,
|
||||
ATTR_UNIT_OF_MEASUREMENT,
|
||||
CONF_ENTITY_ID,
|
||||
CONF_FOR,
|
||||
CONF_OPTIONS,
|
||||
CONF_PLATFORM,
|
||||
CONF_TARGET,
|
||||
STATE_OFF,
|
||||
STATE_ON,
|
||||
STATE_UNAVAILABLE,
|
||||
STATE_UNKNOWN,
|
||||
UnitOfTemperature,
|
||||
@@ -47,10 +41,6 @@ from homeassistant.helpers.automation import (
|
||||
move_top_level_schema_fields_to_options,
|
||||
)
|
||||
from homeassistant.helpers.trigger import (
|
||||
ATTR_BEHAVIOR,
|
||||
BEHAVIOR_ANY,
|
||||
BEHAVIOR_FIRST,
|
||||
BEHAVIOR_LAST,
|
||||
DATA_PLUGGABLE_ACTIONS,
|
||||
TRIGGERS,
|
||||
EntityNumericalStateChangedTriggerWithUnitBase,
|
||||
@@ -75,13 +65,7 @@ from homeassistant.setup import async_setup_component
|
||||
from homeassistant.util.unit_conversion import TemperatureConverter
|
||||
from homeassistant.util.yaml.loader import parse_yaml
|
||||
|
||||
from tests.common import (
|
||||
MockModule,
|
||||
MockPlatform,
|
||||
async_fire_time_changed,
|
||||
mock_integration,
|
||||
mock_platform,
|
||||
)
|
||||
from tests.common import MockModule, MockPlatform, mock_integration, mock_platform
|
||||
from tests.typing import WebSocketGenerator
|
||||
|
||||
|
||||
@@ -3096,617 +3080,3 @@ async def test_make_entity_origin_state_trigger(
|
||||
|
||||
# To-state still matches from_state — not valid
|
||||
assert not trig.is_valid_state(from_state)
|
||||
|
||||
|
||||
class _OnOffTrigger(EntityTriggerBase):
|
||||
"""Test trigger that fires when state becomes 'on'."""
|
||||
|
||||
_domain_specs = {"test": DomainSpec()}
|
||||
|
||||
def is_valid_transition(self, from_state: State, to_state: State) -> bool:
|
||||
"""Valid if transitioning from a non-'on' state."""
|
||||
if from_state.state in (STATE_UNAVAILABLE, STATE_UNKNOWN):
|
||||
return False
|
||||
return from_state.state != STATE_ON
|
||||
|
||||
def is_valid_state(self, state: State) -> bool:
|
||||
"""Valid if the state is 'on'."""
|
||||
return state.state == STATE_ON
|
||||
|
||||
|
||||
async def _arm_on_off_trigger(
|
||||
hass: HomeAssistant,
|
||||
entity_ids: list[str],
|
||||
behavior: str,
|
||||
calls: list[dict[str, Any]],
|
||||
duration: dict[str, int] | None,
|
||||
) -> CALLBACK_TYPE:
|
||||
"""Set up _OnOffTrigger via async_initialize_triggers."""
|
||||
|
||||
async def async_get_triggers(
|
||||
hass: HomeAssistant,
|
||||
) -> dict[str, type[Trigger]]:
|
||||
return {"on_off": _OnOffTrigger}
|
||||
|
||||
mock_integration(hass, MockModule("test"))
|
||||
mock_platform(hass, "test.trigger", Mock(async_get_triggers=async_get_triggers))
|
||||
|
||||
options: dict[str, Any] = {ATTR_BEHAVIOR: behavior}
|
||||
if duration is not None:
|
||||
options[CONF_FOR] = duration
|
||||
|
||||
trigger_config = {
|
||||
CONF_PLATFORM: "test.on_off",
|
||||
CONF_TARGET: {CONF_ENTITY_ID: entity_ids},
|
||||
CONF_OPTIONS: options,
|
||||
}
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
@callback
|
||||
def action(run_variables: dict[str, Any], context: Context | None = None) -> None:
|
||||
calls.append(run_variables["trigger"])
|
||||
|
||||
validated_config = await async_validate_trigger_config(hass, [trigger_config])
|
||||
return await async_initialize_triggers(
|
||||
hass,
|
||||
validated_config,
|
||||
action,
|
||||
domain="test",
|
||||
name="test_on_off",
|
||||
log_cb=log.log,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("behavior", [BEHAVIOR_ANY, BEHAVIOR_FIRST, BEHAVIOR_LAST])
|
||||
async def test_entity_trigger_no_duration(hass: HomeAssistant, behavior: str) -> None:
|
||||
"""Test EntityTriggerBase fires immediately without duration."""
|
||||
entity_id = "test.entity_1"
|
||||
hass.states.async_set(entity_id, STATE_OFF)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
calls: list[dict[str, Any]] = []
|
||||
unsub = await _arm_on_off_trigger(hass, [entity_id], behavior, calls, duration=None)
|
||||
|
||||
hass.states.async_set(entity_id, STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 1
|
||||
assert calls[0]["entity_id"] == entity_id
|
||||
|
||||
# Transition back and trigger again
|
||||
calls.clear()
|
||||
hass.states.async_set(entity_id, STATE_OFF)
|
||||
await hass.async_block_till_done()
|
||||
hass.states.async_set(entity_id, STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 1
|
||||
|
||||
unsub()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("behavior", [BEHAVIOR_ANY, BEHAVIOR_FIRST, BEHAVIOR_LAST])
|
||||
async def test_entity_trigger_with_duration(
|
||||
hass: HomeAssistant, freezer: FrozenDateTimeFactory, behavior: str
|
||||
) -> None:
|
||||
"""Test EntityTriggerBase waits for duration before firing."""
|
||||
entity_id = "test.entity_1"
|
||||
hass.states.async_set(entity_id, STATE_OFF)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
calls: list[dict[str, Any]] = []
|
||||
unsub = await _arm_on_off_trigger(
|
||||
hass, [entity_id], behavior, calls, duration={"seconds": 5}
|
||||
)
|
||||
|
||||
# Turn on — should NOT fire immediately
|
||||
hass.states.async_set(entity_id, STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 0
|
||||
|
||||
# Advance time past duration — should fire
|
||||
freezer.tick(datetime.timedelta(seconds=6))
|
||||
async_fire_time_changed(hass)
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 1
|
||||
assert calls[0]["entity_id"] == entity_id
|
||||
|
||||
unsub()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("behavior", [BEHAVIOR_ANY, BEHAVIOR_FIRST, BEHAVIOR_LAST])
|
||||
async def test_entity_trigger_duration_cancelled_on_state_change(
|
||||
hass: HomeAssistant, freezer: FrozenDateTimeFactory, behavior: str
|
||||
) -> None:
|
||||
"""Test that the duration timer is cancelled if state changes back."""
|
||||
entity_id = "test.entity_1"
|
||||
hass.states.async_set(entity_id, STATE_OFF)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
calls: list[dict[str, Any]] = []
|
||||
unsub = await _arm_on_off_trigger(
|
||||
hass, [entity_id], behavior, calls, duration={"seconds": 5}
|
||||
)
|
||||
|
||||
# Turn on, then back off before duration expires
|
||||
hass.states.async_set(entity_id, STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
freezer.tick(datetime.timedelta(seconds=2))
|
||||
async_fire_time_changed(hass)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
hass.states.async_set(entity_id, STATE_OFF)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# Advance past the original duration — should NOT fire
|
||||
freezer.tick(datetime.timedelta(seconds=10))
|
||||
async_fire_time_changed(hass)
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 0
|
||||
|
||||
unsub()
|
||||
|
||||
|
||||
async def test_entity_trigger_duration_any_independent(
|
||||
hass: HomeAssistant, freezer: FrozenDateTimeFactory
|
||||
) -> None:
|
||||
"""Test behavior any tracks per-entity durations independently."""
|
||||
entity_a = "test.entity_a"
|
||||
entity_b = "test.entity_b"
|
||||
hass.states.async_set(entity_a, STATE_OFF)
|
||||
hass.states.async_set(entity_b, STATE_OFF)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
calls: list[dict[str, Any]] = []
|
||||
unsub = await _arm_on_off_trigger(
|
||||
hass, [entity_a, entity_b], BEHAVIOR_ANY, calls, duration={"seconds": 5}
|
||||
)
|
||||
|
||||
# Turn A on
|
||||
hass.states.async_set(entity_a, STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 0
|
||||
|
||||
# Turn B on 2 seconds later
|
||||
freezer.tick(datetime.timedelta(seconds=2))
|
||||
async_fire_time_changed(hass)
|
||||
hass.states.async_set(entity_b, STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 0
|
||||
|
||||
# After 5s from A's turn-on, A should fire
|
||||
freezer.tick(datetime.timedelta(seconds=3))
|
||||
async_fire_time_changed(hass)
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 1
|
||||
assert calls[0]["entity_id"] == entity_a
|
||||
|
||||
# After 5s from B's turn-on (2 more seconds), B should fire
|
||||
freezer.tick(datetime.timedelta(seconds=2))
|
||||
async_fire_time_changed(hass)
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 2
|
||||
assert calls[1]["entity_id"] == entity_b
|
||||
|
||||
unsub()
|
||||
|
||||
|
||||
async def test_entity_trigger_duration_any_entity_off_cancels_only_that_entity(
|
||||
hass: HomeAssistant, freezer: FrozenDateTimeFactory
|
||||
) -> None:
|
||||
"""Test behavior any: turning off one entity doesn't cancel the other's timer."""
|
||||
entity_a = "test.entity_a"
|
||||
entity_b = "test.entity_b"
|
||||
hass.states.async_set(entity_a, STATE_OFF)
|
||||
hass.states.async_set(entity_b, STATE_OFF)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
calls: list[dict[str, Any]] = []
|
||||
unsub = await _arm_on_off_trigger(
|
||||
hass, [entity_a, entity_b], BEHAVIOR_ANY, calls, duration={"seconds": 5}
|
||||
)
|
||||
|
||||
# Turn both on
|
||||
hass.states.async_set(entity_a, STATE_ON)
|
||||
hass.states.async_set(entity_b, STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# Turn A off after 2 seconds — cancels A's timer but not B's
|
||||
freezer.tick(datetime.timedelta(seconds=2))
|
||||
async_fire_time_changed(hass)
|
||||
hass.states.async_set(entity_a, STATE_OFF)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# After 5s total, B should fire but A should not
|
||||
freezer.tick(datetime.timedelta(seconds=3))
|
||||
async_fire_time_changed(hass)
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 1
|
||||
assert calls[0]["entity_id"] == entity_b
|
||||
|
||||
unsub()
|
||||
|
||||
|
||||
async def test_entity_trigger_duration_last_requires_all(
|
||||
hass: HomeAssistant, freezer: FrozenDateTimeFactory
|
||||
) -> None:
|
||||
"""Test behavior last: trigger fires only when ALL entities are on for duration."""
|
||||
entity_a = "test.entity_a"
|
||||
entity_b = "test.entity_b"
|
||||
hass.states.async_set(entity_a, STATE_OFF)
|
||||
hass.states.async_set(entity_b, STATE_OFF)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
calls: list[dict[str, Any]] = []
|
||||
unsub = await _arm_on_off_trigger(
|
||||
hass, [entity_a, entity_b], BEHAVIOR_LAST, calls, duration={"seconds": 5}
|
||||
)
|
||||
|
||||
# Turn only A on — should not start timer (not all match)
|
||||
hass.states.async_set(entity_a, STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
freezer.tick(datetime.timedelta(seconds=6))
|
||||
async_fire_time_changed(hass)
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 0
|
||||
|
||||
# Turn B on — now all match, timer starts
|
||||
hass.states.async_set(entity_b, STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
freezer.tick(datetime.timedelta(seconds=6))
|
||||
async_fire_time_changed(hass)
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 1
|
||||
|
||||
unsub()
|
||||
|
||||
|
||||
async def test_entity_trigger_duration_last_cancelled_when_one_turns_off(
|
||||
hass: HomeAssistant, freezer: FrozenDateTimeFactory
|
||||
) -> None:
|
||||
"""Test behavior last: timer is cancelled when one entity turns off."""
|
||||
entity_a = "test.entity_a"
|
||||
entity_b = "test.entity_b"
|
||||
hass.states.async_set(entity_a, STATE_OFF)
|
||||
hass.states.async_set(entity_b, STATE_OFF)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
calls: list[dict[str, Any]] = []
|
||||
unsub = await _arm_on_off_trigger(
|
||||
hass, [entity_a, entity_b], BEHAVIOR_LAST, calls, duration={"seconds": 5}
|
||||
)
|
||||
|
||||
# Turn both on
|
||||
hass.states.async_set(entity_a, STATE_ON)
|
||||
hass.states.async_set(entity_b, STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# Turn A off after 2 seconds
|
||||
freezer.tick(datetime.timedelta(seconds=2))
|
||||
async_fire_time_changed(hass)
|
||||
hass.states.async_set(entity_a, STATE_OFF)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# Advance past original duration — should NOT fire
|
||||
freezer.tick(datetime.timedelta(seconds=10))
|
||||
async_fire_time_changed(hass)
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 0
|
||||
|
||||
unsub()
|
||||
|
||||
|
||||
async def test_entity_trigger_duration_last_timer_reset(
|
||||
hass: HomeAssistant, freezer: FrozenDateTimeFactory
|
||||
) -> None:
|
||||
"""Test behavior last: timer resets when combined state goes off and back on."""
|
||||
entity_a = "test.entity_a"
|
||||
entity_b = "test.entity_b"
|
||||
hass.states.async_set(entity_a, STATE_OFF)
|
||||
hass.states.async_set(entity_b, STATE_OFF)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
calls: list[dict[str, Any]] = []
|
||||
unsub = await _arm_on_off_trigger(
|
||||
hass, [entity_a, entity_b], BEHAVIOR_LAST, calls, duration={"seconds": 5}
|
||||
)
|
||||
|
||||
# Turn both on — combined state "all on", timer starts
|
||||
hass.states.async_set(entity_a, STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
hass.states.async_set(entity_b, STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# After 2 seconds, B turns off — combined state breaks, timer cancelled
|
||||
freezer.tick(datetime.timedelta(seconds=2))
|
||||
async_fire_time_changed(hass)
|
||||
hass.states.async_set(entity_b, STATE_OFF)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# B turns back on — combined state restored, timer restarts
|
||||
freezer.tick(datetime.timedelta(seconds=1))
|
||||
async_fire_time_changed(hass)
|
||||
hass.states.async_set(entity_b, STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# 4 seconds after restart (not enough) — should NOT fire
|
||||
freezer.tick(datetime.timedelta(seconds=4))
|
||||
async_fire_time_changed(hass)
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 0
|
||||
|
||||
# 1 more second (5 total from restart) — should fire
|
||||
freezer.tick(datetime.timedelta(seconds=1))
|
||||
async_fire_time_changed(hass)
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 1
|
||||
|
||||
unsub()
|
||||
|
||||
|
||||
async def test_entity_trigger_duration_first_fires_when_any_on(
|
||||
hass: HomeAssistant, freezer: FrozenDateTimeFactory
|
||||
) -> None:
|
||||
"""Test behavior first: trigger fires when first entity turns on for duration."""
|
||||
entity_a = "test.entity_a"
|
||||
entity_b = "test.entity_b"
|
||||
hass.states.async_set(entity_a, STATE_OFF)
|
||||
hass.states.async_set(entity_b, STATE_OFF)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
calls: list[dict[str, Any]] = []
|
||||
unsub = await _arm_on_off_trigger(
|
||||
hass, [entity_a, entity_b], BEHAVIOR_FIRST, calls, duration={"seconds": 5}
|
||||
)
|
||||
|
||||
# Turn A on — combined state goes to "at least one on", timer starts
|
||||
hass.states.async_set(entity_a, STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 0
|
||||
|
||||
# Advance past duration — should fire
|
||||
freezer.tick(datetime.timedelta(seconds=6))
|
||||
async_fire_time_changed(hass)
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 1
|
||||
|
||||
unsub()
|
||||
|
||||
|
||||
async def test_entity_trigger_duration_first_not_cancelled_by_second(
|
||||
hass: HomeAssistant, freezer: FrozenDateTimeFactory
|
||||
) -> None:
|
||||
"""Test behavior first: second entity turning on doesn't restart timer."""
|
||||
entity_a = "test.entity_a"
|
||||
entity_b = "test.entity_b"
|
||||
hass.states.async_set(entity_a, STATE_OFF)
|
||||
hass.states.async_set(entity_b, STATE_OFF)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
calls: list[dict[str, Any]] = []
|
||||
unsub = await _arm_on_off_trigger(
|
||||
hass, [entity_a, entity_b], BEHAVIOR_FIRST, calls, duration={"seconds": 5}
|
||||
)
|
||||
|
||||
# Turn A on
|
||||
hass.states.async_set(entity_a, STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# Turn B on 3 seconds later — combined state was already "any on",
|
||||
# so this should NOT restart the timer
|
||||
freezer.tick(datetime.timedelta(seconds=3))
|
||||
async_fire_time_changed(hass)
|
||||
hass.states.async_set(entity_b, STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 0
|
||||
|
||||
# 2 more seconds (5 total from A) — should fire
|
||||
freezer.tick(datetime.timedelta(seconds=2))
|
||||
async_fire_time_changed(hass)
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 1
|
||||
|
||||
unsub()
|
||||
|
||||
|
||||
async def test_entity_trigger_duration_first_not_cancelled_by_partial_off(
|
||||
hass: HomeAssistant, freezer: FrozenDateTimeFactory
|
||||
) -> None:
|
||||
"""Test behavior first: one entity off doesn't cancel if another is still on."""
|
||||
entity_a = "test.entity_a"
|
||||
entity_b = "test.entity_b"
|
||||
hass.states.async_set(entity_a, STATE_OFF)
|
||||
hass.states.async_set(entity_b, STATE_OFF)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
calls: list[dict[str, Any]] = []
|
||||
unsub = await _arm_on_off_trigger(
|
||||
hass, [entity_a, entity_b], BEHAVIOR_FIRST, calls, duration={"seconds": 5}
|
||||
)
|
||||
|
||||
# Turn both on
|
||||
hass.states.async_set(entity_a, STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
hass.states.async_set(entity_b, STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# Turn A off after 2 seconds — combined state still "any on" (B is on)
|
||||
freezer.tick(datetime.timedelta(seconds=2))
|
||||
async_fire_time_changed(hass)
|
||||
hass.states.async_set(entity_a, STATE_OFF)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# Advance past duration — should still fire (combined state never went to "none on")
|
||||
freezer.tick(datetime.timedelta(seconds=4))
|
||||
async_fire_time_changed(hass)
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 1
|
||||
|
||||
unsub()
|
||||
|
||||
|
||||
async def test_entity_trigger_duration_first_cancelled_when_all_off(
|
||||
hass: HomeAssistant, freezer: FrozenDateTimeFactory
|
||||
) -> None:
|
||||
"""Test behavior first: timer cancelled when ALL entities turn off."""
|
||||
entity_a = "test.entity_a"
|
||||
entity_b = "test.entity_b"
|
||||
hass.states.async_set(entity_a, STATE_OFF)
|
||||
hass.states.async_set(entity_b, STATE_OFF)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
calls: list[dict[str, Any]] = []
|
||||
unsub = await _arm_on_off_trigger(
|
||||
hass, [entity_a, entity_b], BEHAVIOR_FIRST, calls, duration={"seconds": 5}
|
||||
)
|
||||
|
||||
# Turn both on
|
||||
hass.states.async_set(entity_a, STATE_ON)
|
||||
hass.states.async_set(entity_b, STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# Turn both off after 2 seconds — combined state goes to "none on"
|
||||
freezer.tick(datetime.timedelta(seconds=2))
|
||||
async_fire_time_changed(hass)
|
||||
hass.states.async_set(entity_a, STATE_OFF)
|
||||
hass.states.async_set(entity_b, STATE_OFF)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# Advance past original duration — should NOT fire
|
||||
freezer.tick(datetime.timedelta(seconds=10))
|
||||
async_fire_time_changed(hass)
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 0
|
||||
|
||||
unsub()
|
||||
|
||||
|
||||
async def test_entity_trigger_duration_any_retrigger_resets_timer(
|
||||
hass: HomeAssistant, freezer: FrozenDateTimeFactory
|
||||
) -> None:
|
||||
"""Test behavior any: turning an entity off and on resets its timer."""
|
||||
entity_id = "test.entity_1"
|
||||
hass.states.async_set(entity_id, STATE_OFF)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
calls: list[dict[str, Any]] = []
|
||||
unsub = await _arm_on_off_trigger(
|
||||
hass, [entity_id], BEHAVIOR_ANY, calls, duration={"seconds": 5}
|
||||
)
|
||||
|
||||
# Turn on
|
||||
hass.states.async_set(entity_id, STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# After 3 seconds, turn off and on again — resets the timer
|
||||
freezer.tick(datetime.timedelta(seconds=3))
|
||||
async_fire_time_changed(hass)
|
||||
hass.states.async_set(entity_id, STATE_OFF)
|
||||
await hass.async_block_till_done()
|
||||
hass.states.async_set(entity_id, STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# 3 more seconds (6 from start, but only 3 from retrigger) — should NOT fire
|
||||
freezer.tick(datetime.timedelta(seconds=3))
|
||||
async_fire_time_changed(hass)
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 0
|
||||
|
||||
# 2 more seconds (5 from retrigger) — should fire
|
||||
freezer.tick(datetime.timedelta(seconds=2))
|
||||
async_fire_time_changed(hass)
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 1
|
||||
|
||||
unsub()
|
||||
|
||||
|
||||
def set_or_remove_state(hass: HomeAssistant, entity_id: str, state: str | None) -> None:
|
||||
"""Helper to set or remove state based on whether state is None."""
|
||||
if state is None:
|
||||
hass.states.async_remove(entity_id)
|
||||
else:
|
||||
hass.states.async_set(entity_id, state)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("behavior", [BEHAVIOR_ANY, BEHAVIOR_FIRST, BEHAVIOR_LAST])
|
||||
@pytest.mark.parametrize(
|
||||
"invalid_state",
|
||||
[STATE_UNAVAILABLE, STATE_UNKNOWN, None],
|
||||
ids=["unavailable", "unknown", "removed"],
|
||||
)
|
||||
async def test_entity_trigger_duration_cancelled_on_invalid_state(
|
||||
hass: HomeAssistant,
|
||||
freezer: FrozenDateTimeFactory,
|
||||
behavior: str,
|
||||
invalid_state: str | None,
|
||||
) -> None:
|
||||
"""Test that the duration timer is cancelled if entity becomes unavailable, unknown, or is removed."""
|
||||
entity_a = "test.entity_a"
|
||||
entity_b = "test.entity_b"
|
||||
set_or_remove_state(hass, entity_a, STATE_OFF)
|
||||
set_or_remove_state(hass, entity_b, STATE_OFF)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
calls: list[dict[str, Any]] = []
|
||||
unsub = await _arm_on_off_trigger(
|
||||
hass, [entity_a, entity_b], behavior, calls, duration={"seconds": 5}
|
||||
)
|
||||
|
||||
# Turn on the entities needed to start the timer
|
||||
set_or_remove_state(hass, entity_a, STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
if behavior == BEHAVIOR_LAST:
|
||||
set_or_remove_state(hass, entity_b, STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# Entity A becomes invalid during the wait
|
||||
freezer.tick(datetime.timedelta(seconds=2))
|
||||
async_fire_time_changed(hass)
|
||||
set_or_remove_state(hass, entity_a, invalid_state)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# Advance past the original duration — should NOT fire
|
||||
freezer.tick(datetime.timedelta(seconds=10))
|
||||
async_fire_time_changed(hass)
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 0
|
||||
|
||||
unsub()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("behavior", [BEHAVIOR_ANY, BEHAVIOR_FIRST, BEHAVIOR_LAST])
|
||||
@pytest.mark.parametrize(
|
||||
"initial_state",
|
||||
[STATE_UNAVAILABLE, STATE_UNKNOWN, None],
|
||||
ids=["unavailable", "unknown", "no_state"],
|
||||
)
|
||||
async def test_entity_trigger_from_invalid_initial_state(
|
||||
hass: HomeAssistant, behavior: str, initial_state: str | None
|
||||
) -> None:
|
||||
"""Test that the trigger does not fire when transitioning from unavailable, unknown, or no state."""
|
||||
entity_id = "test.entity_1"
|
||||
set_or_remove_state(hass, entity_id, initial_state)
|
||||
# If initial_state is None, don't set any state — entity doesn't exist
|
||||
await hass.async_block_till_done()
|
||||
|
||||
calls: list[dict[str, Any]] = []
|
||||
unsub = await _arm_on_off_trigger(hass, [entity_id], behavior, calls, duration=None)
|
||||
|
||||
# Transition to "on" from the invalid initial state
|
||||
set_or_remove_state(hass, entity_id, STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# Should NOT fire — transition from invalid state is rejected
|
||||
assert len(calls) == 0
|
||||
|
||||
# Now transition back to off and then to on — should fire
|
||||
set_or_remove_state(hass, entity_id, STATE_OFF)
|
||||
await hass.async_block_till_done()
|
||||
set_or_remove_state(hass, entity_id, STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 1
|
||||
|
||||
unsub()
|
||||
|
||||
@@ -52,29 +52,6 @@ async def test_requirement_installed_in_venv(hass: HomeAssistant) -> None:
|
||||
)
|
||||
|
||||
|
||||
async def test_requirement_installed_in_deps(hass: HomeAssistant) -> None:
|
||||
"""Test requirement installed in deps directory."""
|
||||
with (
|
||||
patch("os.path.dirname", return_value="ha_package_path"),
|
||||
patch("homeassistant.util.package.is_virtual_env", return_value=False),
|
||||
patch("homeassistant.util.package.is_docker_env", return_value=False),
|
||||
patch(
|
||||
"homeassistant.util.package.install_package", return_value=True
|
||||
) as mock_install,
|
||||
patch.dict(os.environ, env_without_wheel_links(), clear=True),
|
||||
):
|
||||
hass.config.skip_pip = False
|
||||
mock_integration(hass, MockModule("comp", requirements=["package==0.0.1"]))
|
||||
assert await setup.async_setup_component(hass, "comp", {})
|
||||
assert "comp" in hass.config.components
|
||||
assert mock_install.call_args == call(
|
||||
"package==0.0.1",
|
||||
target=hass.config.path("deps"),
|
||||
constraints=os.path.join("ha_package_path", CONSTRAINT_FILE),
|
||||
timeout=60,
|
||||
)
|
||||
|
||||
|
||||
async def test_install_existing_package(hass: HomeAssistant) -> None:
|
||||
"""Test an install attempt on an existing package."""
|
||||
with patch(
|
||||
|
||||
Reference in New Issue
Block a user