mirror of
https://github.com/home-assistant/core.git
synced 2026-04-16 14:46:15 +02:00
Compare commits
11 Commits
python-3.1
...
dependabot
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
dc3778bfbe | ||
|
|
a9becca321 | ||
|
|
0043a307f0 | ||
|
|
dfb1819800 | ||
|
|
12018cf9f4 | ||
|
|
70368c622e | ||
|
|
743aef05be | ||
|
|
49e5b03c08 | ||
|
|
6bc3fcef36 | ||
|
|
e3e87185c5 | ||
|
|
6d83b73cbb |
@@ -186,15 +186,11 @@ If `CHANGE_TYPE` IS "Breaking change" or "Deprecation", keep the `## Breaking ch
|
||||
|
||||
## Step 10: Push Branch and Create PR
|
||||
|
||||
```bash
|
||||
# Get branch name and GitHub username
|
||||
BRANCH=$(git branch --show-current)
|
||||
PUSH_REMOTE=$(git config "branch.$BRANCH.remote" 2>/dev/null || git remote | head -1)
|
||||
GITHUB_USER=$(gh api user --jq .login 2>/dev/null || git remote get-url "$PUSH_REMOTE" | sed -E 's#.*[:/]([^/]+)/([^/]+)(\.git)?$#\1#')
|
||||
Push the branch with upstream tracking, and create a PR against `home-assistant/core` with the generated title and body:
|
||||
|
||||
```bash
|
||||
# Create PR (gh pr create pushes the branch automatically)
|
||||
gh pr create --repo home-assistant/core --base dev \
|
||||
--head "$GITHUB_USER:$BRANCH" \
|
||||
--draft \
|
||||
--title "TITLE_HERE" \
|
||||
--body "$(cat <<'EOF'
|
||||
|
||||
44
.github/renovate.json
vendored
44
.github/renovate.json
vendored
@@ -78,6 +78,50 @@
|
||||
"enabled": true,
|
||||
"labels": ["dependency", "core"]
|
||||
},
|
||||
{
|
||||
"description": "Common Python utilities (allowlisted)",
|
||||
"matchPackageNames": [
|
||||
"astral",
|
||||
"atomicwrites-homeassistant",
|
||||
"audioop-lts",
|
||||
"awesomeversion",
|
||||
"bcrypt",
|
||||
"ciso8601",
|
||||
"cronsim",
|
||||
"defusedxml",
|
||||
"fnv-hash-fast",
|
||||
"getmac",
|
||||
"ical",
|
||||
"ifaddr",
|
||||
"lru-dict",
|
||||
"mutagen",
|
||||
"propcache",
|
||||
"pyserial",
|
||||
"python-slugify",
|
||||
"PyTurboJPEG",
|
||||
"securetar",
|
||||
"standard-aifc",
|
||||
"standard-telnetlib",
|
||||
"ulid-transform",
|
||||
"url-normalize",
|
||||
"xmltodict"
|
||||
],
|
||||
"enabled": true,
|
||||
"labels": ["dependency"]
|
||||
},
|
||||
{
|
||||
"description": "Home Assistant ecosystem packages (core-maintained, no cooldown)",
|
||||
"matchPackageNames": [
|
||||
"hassil",
|
||||
"home-assistant-bluetooth",
|
||||
"home-assistant-frontend",
|
||||
"home-assistant-intents",
|
||||
"infrared-protocols"
|
||||
],
|
||||
"enabled": true,
|
||||
"minimumReleaseAge": null,
|
||||
"labels": ["dependency", "core"]
|
||||
},
|
||||
{
|
||||
"description": "Test dependencies (allowlisted)",
|
||||
"matchPackageNames": [
|
||||
|
||||
@@ -21,7 +21,7 @@ jobs:
|
||||
steps:
|
||||
- name: Check if integration label was added and extract details
|
||||
id: extract
|
||||
uses: actions/github-script@ed597411d8f924073f98dfc5c65a23a2325f34cd # v8.0.0
|
||||
uses: actions/github-script@3a2844b7e9c422d3c10d287c895573f7108da1b3 # v9.0.0
|
||||
with:
|
||||
script: |
|
||||
// Debug: Log the event payload
|
||||
@@ -118,7 +118,7 @@ jobs:
|
||||
- name: Fetch similar issues
|
||||
id: fetch_similar
|
||||
if: steps.extract.outputs.should_continue == 'true'
|
||||
uses: actions/github-script@ed597411d8f924073f98dfc5c65a23a2325f34cd # v8.0.0
|
||||
uses: actions/github-script@3a2844b7e9c422d3c10d287c895573f7108da1b3 # v9.0.0
|
||||
env:
|
||||
INTEGRATION_LABELS: ${{ steps.extract.outputs.integration_labels }}
|
||||
CURRENT_NUMBER: ${{ steps.extract.outputs.current_number }}
|
||||
@@ -285,7 +285,7 @@ jobs:
|
||||
- name: Post duplicate detection results
|
||||
id: post_results
|
||||
if: steps.extract.outputs.should_continue == 'true' && steps.fetch_similar.outputs.has_similar == 'true'
|
||||
uses: actions/github-script@ed597411d8f924073f98dfc5c65a23a2325f34cd # v8.0.0
|
||||
uses: actions/github-script@3a2844b7e9c422d3c10d287c895573f7108da1b3 # v9.0.0
|
||||
env:
|
||||
AI_RESPONSE: ${{ steps.ai_detection.outputs.response }}
|
||||
SIMILAR_ISSUES: ${{ steps.fetch_similar.outputs.similar_issues }}
|
||||
|
||||
@@ -21,7 +21,7 @@ jobs:
|
||||
steps:
|
||||
- name: Check issue language
|
||||
id: detect_language
|
||||
uses: actions/github-script@ed597411d8f924073f98dfc5c65a23a2325f34cd # v8.0.0
|
||||
uses: actions/github-script@3a2844b7e9c422d3c10d287c895573f7108da1b3 # v9.0.0
|
||||
env:
|
||||
ISSUE_NUMBER: ${{ github.event.issue.number }}
|
||||
ISSUE_TITLE: ${{ github.event.issue.title }}
|
||||
@@ -95,7 +95,7 @@ jobs:
|
||||
|
||||
- name: Process non-English issues
|
||||
if: steps.detect_language.outputs.should_continue == 'true'
|
||||
uses: actions/github-script@ed597411d8f924073f98dfc5c65a23a2325f34cd # v8.0.0
|
||||
uses: actions/github-script@3a2844b7e9c422d3c10d287c895573f7108da1b3 # v9.0.0
|
||||
env:
|
||||
AI_RESPONSE: ${{ steps.ai_language_detection.outputs.response }}
|
||||
ISSUE_NUMBER: ${{ steps.detect_language.outputs.issue_number }}
|
||||
|
||||
4
.github/workflows/restrict-task-creation.yml
vendored
4
.github/workflows/restrict-task-creation.yml
vendored
@@ -22,7 +22,7 @@ jobs:
|
||||
|| github.event.issue.type.name == 'Opportunity'
|
||||
steps:
|
||||
- name: Add no-stale label
|
||||
uses: actions/github-script@ed597411d8f924073f98dfc5c65a23a2325f34cd # v8.0.0
|
||||
uses: actions/github-script@3a2844b7e9c422d3c10d287c895573f7108da1b3 # v9.0.0
|
||||
with:
|
||||
script: |
|
||||
await github.rest.issues.addLabels({
|
||||
@@ -42,7 +42,7 @@ jobs:
|
||||
if: github.event.issue.type.name == 'Task'
|
||||
steps:
|
||||
- name: Check if user is authorized
|
||||
uses: actions/github-script@ed597411d8f924073f98dfc5c65a23a2325f34cd # v8.0.0
|
||||
uses: actions/github-script@3a2844b7e9c422d3c10d287c895573f7108da1b3 # v9.0.0
|
||||
with:
|
||||
script: |
|
||||
const issueAuthor = context.payload.issue.user.login;
|
||||
|
||||
@@ -6,10 +6,11 @@ from typing import Final
|
||||
|
||||
from homeassistant.const import STATE_OFF, STATE_ON
|
||||
|
||||
CONF_READ_TIMEOUT: Final = "timeout"
|
||||
CONF_WRITE_TIMEOUT: Final = "write_timeout"
|
||||
|
||||
DEFAULT_NAME: Final = "Acer Projector"
|
||||
DEFAULT_TIMEOUT: Final = 1
|
||||
DEFAULT_READ_TIMEOUT: Final = 1
|
||||
DEFAULT_WRITE_TIMEOUT: Final = 1
|
||||
|
||||
ECO_MODE: Final = "ECO Mode"
|
||||
|
||||
@@ -5,5 +5,5 @@
|
||||
"documentation": "https://www.home-assistant.io/integrations/acer_projector",
|
||||
"iot_class": "local_polling",
|
||||
"quality_scale": "legacy",
|
||||
"requirements": ["pyserial==3.5"]
|
||||
"requirements": ["serialx==1.2.2"]
|
||||
}
|
||||
|
||||
@@ -6,7 +6,7 @@ import logging
|
||||
import re
|
||||
from typing import Any
|
||||
|
||||
import serial
|
||||
from serialx import Serial, SerialException
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.components.switch import (
|
||||
@@ -16,21 +16,22 @@ from homeassistant.components.switch import (
|
||||
from homeassistant.const import (
|
||||
CONF_FILENAME,
|
||||
CONF_NAME,
|
||||
CONF_TIMEOUT,
|
||||
STATE_OFF,
|
||||
STATE_ON,
|
||||
STATE_UNKNOWN,
|
||||
)
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.exceptions import HomeAssistantError
|
||||
from homeassistant.helpers import config_validation as cv
|
||||
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
||||
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
|
||||
|
||||
from .const import (
|
||||
CMD_DICT,
|
||||
CONF_READ_TIMEOUT,
|
||||
CONF_WRITE_TIMEOUT,
|
||||
DEFAULT_NAME,
|
||||
DEFAULT_TIMEOUT,
|
||||
DEFAULT_READ_TIMEOUT,
|
||||
DEFAULT_WRITE_TIMEOUT,
|
||||
ECO_MODE,
|
||||
ICON,
|
||||
@@ -45,7 +46,7 @@ PLATFORM_SCHEMA = SWITCH_PLATFORM_SCHEMA.extend(
|
||||
{
|
||||
vol.Required(CONF_FILENAME): cv.isdevice,
|
||||
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
|
||||
vol.Optional(CONF_TIMEOUT, default=DEFAULT_TIMEOUT): cv.positive_int,
|
||||
vol.Optional(CONF_READ_TIMEOUT, default=DEFAULT_READ_TIMEOUT): cv.positive_int,
|
||||
vol.Optional(
|
||||
CONF_WRITE_TIMEOUT, default=DEFAULT_WRITE_TIMEOUT
|
||||
): cv.positive_int,
|
||||
@@ -62,10 +63,10 @@ def setup_platform(
|
||||
"""Connect with serial port and return Acer Projector."""
|
||||
serial_port = config[CONF_FILENAME]
|
||||
name = config[CONF_NAME]
|
||||
timeout = config[CONF_TIMEOUT]
|
||||
read_timeout = config[CONF_READ_TIMEOUT]
|
||||
write_timeout = config[CONF_WRITE_TIMEOUT]
|
||||
|
||||
add_entities([AcerSwitch(serial_port, name, timeout, write_timeout)], True)
|
||||
add_entities([AcerSwitch(serial_port, name, read_timeout, write_timeout)], True)
|
||||
|
||||
|
||||
class AcerSwitch(SwitchEntity):
|
||||
@@ -77,14 +78,14 @@ class AcerSwitch(SwitchEntity):
|
||||
self,
|
||||
serial_port: str,
|
||||
name: str,
|
||||
timeout: int,
|
||||
read_timeout: int,
|
||||
write_timeout: int,
|
||||
) -> None:
|
||||
"""Init of the Acer projector."""
|
||||
self.serial = serial.Serial(
|
||||
port=serial_port, timeout=timeout, write_timeout=write_timeout
|
||||
)
|
||||
self._serial_port = serial_port
|
||||
self._read_timeout = read_timeout
|
||||
self._write_timeout = write_timeout
|
||||
|
||||
self._attr_name = name
|
||||
self._attributes = {
|
||||
LAMP_HOURS: STATE_UNKNOWN,
|
||||
@@ -94,22 +95,26 @@ class AcerSwitch(SwitchEntity):
|
||||
|
||||
def _write_read(self, msg: str) -> str:
|
||||
"""Write to the projector and read the return."""
|
||||
ret = ""
|
||||
|
||||
# Sometimes the projector won't answer for no reason or the projector
|
||||
# was disconnected during runtime.
|
||||
# This way the projector can be reconnected and will still work
|
||||
try:
|
||||
if not self.serial.is_open:
|
||||
self.serial.open()
|
||||
self.serial.write(msg.encode("utf-8"))
|
||||
# Size is an experience value there is no real limit.
|
||||
# AFAIK there is no limit and no end character so we will usually
|
||||
# need to wait for timeout
|
||||
ret = self.serial.read_until(size=20).decode("utf-8")
|
||||
except serial.SerialException:
|
||||
_LOGGER.error("Problem communicating with %s", self._serial_port)
|
||||
self.serial.close()
|
||||
return ret
|
||||
with Serial.from_url(
|
||||
self._serial_port,
|
||||
read_timeout=self._read_timeout,
|
||||
write_timeout=self._write_timeout,
|
||||
) as serial:
|
||||
serial.write(msg.encode("utf-8"))
|
||||
|
||||
# Size is an experience value there is no real limit.
|
||||
# AFAIK there is no limit and no end character so we will usually
|
||||
# need to wait for timeout
|
||||
return serial.read_until(size=20).decode("utf-8")
|
||||
except (OSError, SerialException, TimeoutError) as exc:
|
||||
raise HomeAssistantError(
|
||||
f"Problem communicating with {self._serial_port}"
|
||||
) from exc
|
||||
|
||||
def _write_read_format(self, msg: str) -> str:
|
||||
"""Write msg, obtain answer and format output."""
|
||||
|
||||
@@ -7,5 +7,5 @@
|
||||
"documentation": "https://www.home-assistant.io/integrations/camera",
|
||||
"integration_type": "entity",
|
||||
"quality_scale": "internal",
|
||||
"requirements": ["PyTurboJPEG==1.8.0"]
|
||||
"requirements": ["PyTurboJPEG==1.8.3"]
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
from homeassistant.core import callback
|
||||
from homeassistant.helpers.entity import Entity
|
||||
|
||||
from .const import DOMAIN, EVENT_HDMI_CEC_UNAVAILABLE
|
||||
@@ -55,9 +56,10 @@ class CecEntity(Entity):
|
||||
else:
|
||||
self._attr_name = f"{self._device.type_name} {self._logical_address} ({self._device.osd_name})"
|
||||
|
||||
@callback
|
||||
def _hdmi_cec_unavailable(self, callback_event):
|
||||
self._attr_available = False
|
||||
self.schedule_update_ha_state(False)
|
||||
self.async_write_ha_state()
|
||||
|
||||
async def async_added_to_hass(self) -> None:
|
||||
"""Register HDMI callbacks after initialization."""
|
||||
|
||||
@@ -3,7 +3,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
from pycec.commands import CecCommand, KeyPressCommand, KeyReleaseCommand
|
||||
from pycec.const import (
|
||||
@@ -31,7 +30,6 @@ from homeassistant.components.media_player import (
|
||||
MediaPlayerEntity,
|
||||
MediaPlayerEntityFeature,
|
||||
MediaPlayerState,
|
||||
MediaType,
|
||||
)
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
||||
@@ -45,20 +43,20 @@ _LOGGER = logging.getLogger(__name__)
|
||||
ENTITY_ID_FORMAT = MP_DOMAIN + ".{}"
|
||||
|
||||
|
||||
def setup_platform(
|
||||
async def async_setup_platform(
|
||||
hass: HomeAssistant,
|
||||
config: ConfigType,
|
||||
add_entities: AddEntitiesCallback,
|
||||
async_add_entities: AddEntitiesCallback,
|
||||
discovery_info: DiscoveryInfoType | None = None,
|
||||
) -> None:
|
||||
"""Find and return HDMI devices as +switches."""
|
||||
"""Find and return HDMI devices as media players."""
|
||||
if discovery_info and ATTR_NEW in discovery_info:
|
||||
_LOGGER.debug("Setting up HDMI devices %s", discovery_info[ATTR_NEW])
|
||||
entities = []
|
||||
for device in discovery_info[ATTR_NEW]:
|
||||
hdmi_device = hass.data[DOMAIN][device]
|
||||
entities.append(CecPlayerEntity(hdmi_device, hdmi_device.logical_address))
|
||||
add_entities(entities, True)
|
||||
async_add_entities(entities, True)
|
||||
|
||||
|
||||
class CecPlayerEntity(CecEntity, MediaPlayerEntity):
|
||||
@@ -79,78 +77,61 @@ class CecPlayerEntity(CecEntity, MediaPlayerEntity):
|
||||
|
||||
def send_playback(self, key):
|
||||
"""Send playback status to CEC adapter."""
|
||||
self._device.async_send_command(CecCommand(key, dst=self._logical_address))
|
||||
self._device.send_command(CecCommand(key, dst=self._logical_address))
|
||||
|
||||
def mute_volume(self, mute: bool) -> None:
|
||||
async def async_mute_volume(self, mute: bool) -> None:
|
||||
"""Mute volume."""
|
||||
self.send_keypress(KEY_MUTE_TOGGLE)
|
||||
|
||||
def media_previous_track(self) -> None:
|
||||
async def async_media_previous_track(self) -> None:
|
||||
"""Go to previous track."""
|
||||
self.send_keypress(KEY_BACKWARD)
|
||||
|
||||
def turn_on(self) -> None:
|
||||
async def async_turn_on(self) -> None:
|
||||
"""Turn device on."""
|
||||
self._device.turn_on()
|
||||
self._attr_state = MediaPlayerState.ON
|
||||
self.async_write_ha_state()
|
||||
|
||||
def clear_playlist(self) -> None:
|
||||
"""Clear players playlist."""
|
||||
raise NotImplementedError
|
||||
|
||||
def turn_off(self) -> None:
|
||||
async def async_turn_off(self) -> None:
|
||||
"""Turn device off."""
|
||||
self._device.turn_off()
|
||||
self._attr_state = MediaPlayerState.OFF
|
||||
self.async_write_ha_state()
|
||||
|
||||
def media_stop(self) -> None:
|
||||
async def async_media_stop(self) -> None:
|
||||
"""Stop playback."""
|
||||
self.send_keypress(KEY_STOP)
|
||||
self._attr_state = MediaPlayerState.IDLE
|
||||
self.async_write_ha_state()
|
||||
|
||||
def play_media(
|
||||
self, media_type: MediaType | str, media_id: str, **kwargs: Any
|
||||
) -> None:
|
||||
"""Not supported."""
|
||||
raise NotImplementedError
|
||||
|
||||
def media_next_track(self) -> None:
|
||||
async def async_media_next_track(self) -> None:
|
||||
"""Skip to next track."""
|
||||
self.send_keypress(KEY_FORWARD)
|
||||
|
||||
def media_seek(self, position: float) -> None:
|
||||
"""Not supported."""
|
||||
raise NotImplementedError
|
||||
|
||||
def set_volume_level(self, volume: float) -> None:
|
||||
"""Set volume level, range 0..1."""
|
||||
raise NotImplementedError
|
||||
|
||||
def media_pause(self) -> None:
|
||||
async def async_media_pause(self) -> None:
|
||||
"""Pause playback."""
|
||||
self.send_keypress(KEY_PAUSE)
|
||||
self._attr_state = MediaPlayerState.PAUSED
|
||||
self.async_write_ha_state()
|
||||
|
||||
def select_source(self, source: str) -> None:
|
||||
"""Not supported."""
|
||||
raise NotImplementedError
|
||||
|
||||
def media_play(self) -> None:
|
||||
async def async_media_play(self) -> None:
|
||||
"""Start playback."""
|
||||
self.send_keypress(KEY_PLAY)
|
||||
self._attr_state = MediaPlayerState.PLAYING
|
||||
self.async_write_ha_state()
|
||||
|
||||
def volume_up(self) -> None:
|
||||
async def async_volume_up(self) -> None:
|
||||
"""Increase volume."""
|
||||
_LOGGER.debug("%s: volume up", self._logical_address)
|
||||
self.send_keypress(KEY_VOLUME_UP)
|
||||
|
||||
def volume_down(self) -> None:
|
||||
async def async_volume_down(self) -> None:
|
||||
"""Decrease volume."""
|
||||
_LOGGER.debug("%s: volume down", self._logical_address)
|
||||
self.send_keypress(KEY_VOLUME_DOWN)
|
||||
|
||||
def update(self) -> None:
|
||||
async def async_update(self) -> None:
|
||||
"""Update device status."""
|
||||
device = self._device
|
||||
if device.power_status in [POWER_OFF, 3]:
|
||||
|
||||
@@ -20,10 +20,10 @@ _LOGGER = logging.getLogger(__name__)
|
||||
ENTITY_ID_FORMAT = SWITCH_DOMAIN + ".{}"
|
||||
|
||||
|
||||
def setup_platform(
|
||||
async def async_setup_platform(
|
||||
hass: HomeAssistant,
|
||||
config: ConfigType,
|
||||
add_entities: AddEntitiesCallback,
|
||||
async_add_entities: AddEntitiesCallback,
|
||||
discovery_info: DiscoveryInfoType | None = None,
|
||||
) -> None:
|
||||
"""Find and return HDMI devices as switches."""
|
||||
@@ -33,7 +33,7 @@ def setup_platform(
|
||||
for device in discovery_info[ATTR_NEW]:
|
||||
hdmi_device = hass.data[DOMAIN][device]
|
||||
entities.append(CecSwitchEntity(hdmi_device, hdmi_device.logical_address))
|
||||
add_entities(entities, True)
|
||||
async_add_entities(entities, True)
|
||||
|
||||
|
||||
class CecSwitchEntity(CecEntity, SwitchEntity):
|
||||
@@ -44,19 +44,19 @@ class CecSwitchEntity(CecEntity, SwitchEntity):
|
||||
CecEntity.__init__(self, device, logical)
|
||||
self.entity_id = f"{SWITCH_DOMAIN}.hdmi_{hex(self._logical_address)[2:]}"
|
||||
|
||||
def turn_on(self, **kwargs: Any) -> None:
|
||||
async def async_turn_on(self, **kwargs: Any) -> None:
|
||||
"""Turn device on."""
|
||||
self._device.turn_on()
|
||||
self._attr_is_on = True
|
||||
self.schedule_update_ha_state(force_refresh=False)
|
||||
self.async_write_ha_state()
|
||||
|
||||
def turn_off(self, **kwargs: Any) -> None:
|
||||
async def async_turn_off(self, **kwargs: Any) -> None:
|
||||
"""Turn device off."""
|
||||
self._device.turn_off()
|
||||
self._attr_is_on = False
|
||||
self.schedule_update_ha_state(force_refresh=False)
|
||||
self.async_write_ha_state()
|
||||
|
||||
def update(self) -> None:
|
||||
async def async_update(self) -> None:
|
||||
"""Update device status."""
|
||||
device = self._device
|
||||
if device.power_status in {POWER_OFF, 3}:
|
||||
|
||||
@@ -10,7 +10,7 @@
|
||||
"loggers": ["pyhap"],
|
||||
"requirements": [
|
||||
"HAP-python==5.0.0",
|
||||
"fnv-hash-fast==2.0.0",
|
||||
"fnv-hash-fast==2.0.2",
|
||||
"homekit-audio-proxy==1.2.1",
|
||||
"PyQRCode==1.2.1",
|
||||
"base36==0.1.1"
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
{
|
||||
"common": {
|
||||
"condition_behavior_name": "Condition passes if",
|
||||
"trigger_behavior_name": "Trigger when"
|
||||
"trigger_behavior_name": "Trigger when",
|
||||
"trigger_for_name": "For at least"
|
||||
},
|
||||
"conditions": {
|
||||
"is_detected": {
|
||||
@@ -45,6 +46,9 @@
|
||||
"fields": {
|
||||
"behavior": {
|
||||
"name": "[%key:component::motion::common::trigger_behavior_name%]"
|
||||
},
|
||||
"for": {
|
||||
"name": "[%key:component::motion::common::trigger_for_name%]"
|
||||
}
|
||||
},
|
||||
"name": "Motion cleared"
|
||||
@@ -54,6 +58,9 @@
|
||||
"fields": {
|
||||
"behavior": {
|
||||
"name": "[%key:component::motion::common::trigger_behavior_name%]"
|
||||
},
|
||||
"for": {
|
||||
"name": "[%key:component::motion::common::trigger_for_name%]"
|
||||
}
|
||||
},
|
||||
"name": "Motion detected"
|
||||
|
||||
@@ -9,6 +9,11 @@
|
||||
- first
|
||||
- last
|
||||
- any
|
||||
for:
|
||||
required: true
|
||||
default: 00:00:00
|
||||
selector:
|
||||
duration:
|
||||
|
||||
detected:
|
||||
fields: *trigger_common_fields
|
||||
|
||||
@@ -8,7 +8,7 @@
|
||||
"quality_scale": "internal",
|
||||
"requirements": [
|
||||
"SQLAlchemy==2.0.49",
|
||||
"fnv-hash-fast==2.0.0",
|
||||
"fnv-hash-fast==2.0.2",
|
||||
"psutil-home-assistant==0.0.1"
|
||||
]
|
||||
}
|
||||
|
||||
@@ -4,5 +4,5 @@
|
||||
"codeowners": ["@fabaff"],
|
||||
"documentation": "https://www.home-assistant.io/integrations/serial",
|
||||
"iot_class": "local_polling",
|
||||
"requirements": ["pyserial-asyncio-fast==0.16"]
|
||||
"requirements": ["serialx==1.2.2"]
|
||||
}
|
||||
|
||||
@@ -3,11 +3,11 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from asyncio import Task
|
||||
import json
|
||||
import logging
|
||||
|
||||
from serial import SerialException
|
||||
import serial_asyncio_fast as serial_asyncio
|
||||
from serialx import Parity, SerialException, StopBits, open_serial_connection
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant.components.sensor import (
|
||||
@@ -18,6 +18,7 @@ from homeassistant.const import CONF_NAME, CONF_VALUE_TEMPLATE, EVENT_HOMEASSIST
|
||||
from homeassistant.core import HomeAssistant, callback
|
||||
from homeassistant.helpers import config_validation as cv
|
||||
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
||||
from homeassistant.helpers.template import Template
|
||||
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
@@ -33,9 +34,9 @@ CONF_DSRDTR = "dsrdtr"
|
||||
|
||||
DEFAULT_NAME = "Serial Sensor"
|
||||
DEFAULT_BAUDRATE = 9600
|
||||
DEFAULT_BYTESIZE = serial_asyncio.serial.EIGHTBITS
|
||||
DEFAULT_PARITY = serial_asyncio.serial.PARITY_NONE
|
||||
DEFAULT_STOPBITS = serial_asyncio.serial.STOPBITS_ONE
|
||||
DEFAULT_BYTESIZE = 8
|
||||
DEFAULT_PARITY = Parity.NONE
|
||||
DEFAULT_STOPBITS = StopBits.ONE
|
||||
DEFAULT_XONXOFF = False
|
||||
DEFAULT_RTSCTS = False
|
||||
DEFAULT_DSRDTR = False
|
||||
@@ -46,28 +47,21 @@ PLATFORM_SCHEMA = SENSOR_PLATFORM_SCHEMA.extend(
|
||||
vol.Optional(CONF_BAUDRATE, default=DEFAULT_BAUDRATE): cv.positive_int,
|
||||
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
|
||||
vol.Optional(CONF_VALUE_TEMPLATE): cv.template,
|
||||
vol.Optional(CONF_BYTESIZE, default=DEFAULT_BYTESIZE): vol.In(
|
||||
[
|
||||
serial_asyncio.serial.FIVEBITS,
|
||||
serial_asyncio.serial.SIXBITS,
|
||||
serial_asyncio.serial.SEVENBITS,
|
||||
serial_asyncio.serial.EIGHTBITS,
|
||||
]
|
||||
),
|
||||
vol.Optional(CONF_BYTESIZE, default=DEFAULT_BYTESIZE): vol.In([5, 6, 7, 8]),
|
||||
vol.Optional(CONF_PARITY, default=DEFAULT_PARITY): vol.In(
|
||||
[
|
||||
serial_asyncio.serial.PARITY_NONE,
|
||||
serial_asyncio.serial.PARITY_EVEN,
|
||||
serial_asyncio.serial.PARITY_ODD,
|
||||
serial_asyncio.serial.PARITY_MARK,
|
||||
serial_asyncio.serial.PARITY_SPACE,
|
||||
Parity.NONE,
|
||||
Parity.EVEN,
|
||||
Parity.ODD,
|
||||
Parity.MARK,
|
||||
Parity.SPACE,
|
||||
]
|
||||
),
|
||||
vol.Optional(CONF_STOPBITS, default=DEFAULT_STOPBITS): vol.In(
|
||||
[
|
||||
serial_asyncio.serial.STOPBITS_ONE,
|
||||
serial_asyncio.serial.STOPBITS_ONE_POINT_FIVE,
|
||||
serial_asyncio.serial.STOPBITS_TWO,
|
||||
StopBits.ONE,
|
||||
StopBits.ONE_POINT_FIVE,
|
||||
StopBits.TWO,
|
||||
]
|
||||
),
|
||||
vol.Optional(CONF_XONXOFF, default=DEFAULT_XONXOFF): cv.boolean,
|
||||
@@ -84,28 +78,17 @@ async def async_setup_platform(
|
||||
discovery_info: DiscoveryInfoType | None = None,
|
||||
) -> None:
|
||||
"""Set up the Serial sensor platform."""
|
||||
name = config.get(CONF_NAME)
|
||||
port = config.get(CONF_SERIAL_PORT)
|
||||
baudrate = config.get(CONF_BAUDRATE)
|
||||
bytesize = config.get(CONF_BYTESIZE)
|
||||
parity = config.get(CONF_PARITY)
|
||||
stopbits = config.get(CONF_STOPBITS)
|
||||
xonxoff = config.get(CONF_XONXOFF)
|
||||
rtscts = config.get(CONF_RTSCTS)
|
||||
dsrdtr = config.get(CONF_DSRDTR)
|
||||
value_template = config.get(CONF_VALUE_TEMPLATE)
|
||||
|
||||
sensor = SerialSensor(
|
||||
name,
|
||||
port,
|
||||
baudrate,
|
||||
bytesize,
|
||||
parity,
|
||||
stopbits,
|
||||
xonxoff,
|
||||
rtscts,
|
||||
dsrdtr,
|
||||
value_template,
|
||||
name=config[CONF_NAME],
|
||||
port=config[CONF_SERIAL_PORT],
|
||||
baudrate=config[CONF_BAUDRATE],
|
||||
bytesize=config[CONF_BYTESIZE],
|
||||
parity=config[CONF_PARITY],
|
||||
stopbits=config[CONF_STOPBITS],
|
||||
xonxoff=config[CONF_XONXOFF],
|
||||
rtscts=config[CONF_RTSCTS],
|
||||
dsrdtr=config[CONF_DSRDTR],
|
||||
value_template=config.get(CONF_VALUE_TEMPLATE),
|
||||
)
|
||||
|
||||
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, sensor.stop_serial_read)
|
||||
@@ -119,17 +102,17 @@ class SerialSensor(SensorEntity):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name,
|
||||
port,
|
||||
baudrate,
|
||||
bytesize,
|
||||
parity,
|
||||
stopbits,
|
||||
xonxoff,
|
||||
rtscts,
|
||||
dsrdtr,
|
||||
value_template,
|
||||
):
|
||||
name: str,
|
||||
port: str,
|
||||
baudrate: int,
|
||||
bytesize: int,
|
||||
parity: Parity,
|
||||
stopbits: StopBits,
|
||||
xonxoff: bool,
|
||||
rtscts: bool,
|
||||
dsrdtr: bool,
|
||||
value_template: Template | None,
|
||||
) -> None:
|
||||
"""Initialize the Serial sensor."""
|
||||
self._attr_name = name
|
||||
self._port = port
|
||||
@@ -140,12 +123,12 @@ class SerialSensor(SensorEntity):
|
||||
self._xonxoff = xonxoff
|
||||
self._rtscts = rtscts
|
||||
self._dsrdtr = dsrdtr
|
||||
self._serial_loop_task = None
|
||||
self._serial_loop_task: Task[None] | None = None
|
||||
self._template = value_template
|
||||
|
||||
async def async_added_to_hass(self) -> None:
|
||||
"""Handle when an entity is about to be added to Home Assistant."""
|
||||
self._serial_loop_task = self.hass.loop.create_task(
|
||||
self._serial_loop_task = self.hass.async_create_background_task(
|
||||
self.serial_read(
|
||||
self._port,
|
||||
self._baudrate,
|
||||
@@ -155,26 +138,31 @@ class SerialSensor(SensorEntity):
|
||||
self._xonxoff,
|
||||
self._rtscts,
|
||||
self._dsrdtr,
|
||||
)
|
||||
),
|
||||
"Serial reader",
|
||||
)
|
||||
|
||||
async def serial_read(
|
||||
self,
|
||||
device,
|
||||
baudrate,
|
||||
bytesize,
|
||||
parity,
|
||||
stopbits,
|
||||
xonxoff,
|
||||
rtscts,
|
||||
dsrdtr,
|
||||
device: str,
|
||||
baudrate: int,
|
||||
bytesize: int,
|
||||
parity: Parity,
|
||||
stopbits: StopBits,
|
||||
xonxoff: bool,
|
||||
rtscts: bool,
|
||||
dsrdtr: bool,
|
||||
**kwargs,
|
||||
):
|
||||
"""Read the data from the port."""
|
||||
logged_error = False
|
||||
|
||||
while True:
|
||||
reader = None
|
||||
writer = None
|
||||
|
||||
try:
|
||||
reader, _ = await serial_asyncio.open_serial_connection(
|
||||
reader, writer = await open_serial_connection(
|
||||
url=device,
|
||||
baudrate=baudrate,
|
||||
bytesize=bytesize,
|
||||
@@ -185,8 +173,7 @@ class SerialSensor(SensorEntity):
|
||||
dsrdtr=dsrdtr,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
except SerialException:
|
||||
except OSError, SerialException, TimeoutError:
|
||||
if not logged_error:
|
||||
_LOGGER.exception(
|
||||
"Unable to connect to the serial device %s. Will retry", device
|
||||
@@ -197,15 +184,15 @@ class SerialSensor(SensorEntity):
|
||||
_LOGGER.debug("Serial device %s connected", device)
|
||||
while True:
|
||||
try:
|
||||
line = await reader.readline()
|
||||
except SerialException:
|
||||
line_bytes = await reader.readline()
|
||||
except OSError, SerialException:
|
||||
_LOGGER.exception(
|
||||
"Error while reading serial device %s", device
|
||||
)
|
||||
await self._handle_error()
|
||||
break
|
||||
else:
|
||||
line = line.decode("utf-8").strip()
|
||||
line = line_bytes.decode("utf-8").strip()
|
||||
|
||||
try:
|
||||
data = json.loads(line)
|
||||
@@ -223,6 +210,10 @@ class SerialSensor(SensorEntity):
|
||||
_LOGGER.debug("Received: %s", line)
|
||||
self._attr_native_value = line
|
||||
self.async_write_ha_state()
|
||||
finally:
|
||||
if writer is not None:
|
||||
writer.close()
|
||||
await writer.wait_closed()
|
||||
|
||||
async def _handle_error(self):
|
||||
"""Handle error for serial connection."""
|
||||
|
||||
@@ -7,5 +7,5 @@
|
||||
"integration_type": "system",
|
||||
"iot_class": "local_push",
|
||||
"quality_scale": "internal",
|
||||
"requirements": ["PyTurboJPEG==1.8.0", "av==16.0.1", "numpy==2.3.2"]
|
||||
"requirements": ["PyTurboJPEG==1.8.3", "av==16.0.1", "numpy==2.3.2"]
|
||||
}
|
||||
|
||||
@@ -8,5 +8,5 @@
|
||||
"iot_class": "cloud_polling",
|
||||
"loggers": ["twentemilieu"],
|
||||
"quality_scale": "silver",
|
||||
"requirements": ["twentemilieu==2.2.1"]
|
||||
"requirements": ["twentemilieu==3.0.0"]
|
||||
}
|
||||
|
||||
@@ -26,24 +26,20 @@ from homeassistant.core import (
|
||||
from homeassistant.helpers import config_validation as cv, discovery_flow
|
||||
from homeassistant.helpers.debounce import Debouncer
|
||||
from homeassistant.helpers.event import async_track_time_interval
|
||||
from homeassistant.helpers.service_info.usb import UsbServiceInfo as _UsbServiceInfo
|
||||
from homeassistant.helpers.service_info.usb import UsbServiceInfo
|
||||
from homeassistant.helpers.typing import ConfigType
|
||||
from homeassistant.loader import USBMatcher, async_get_usb
|
||||
from homeassistant.util.hass_dict import HassKey
|
||||
|
||||
from .const import DOMAIN
|
||||
from .models import (
|
||||
SerialDevice, # noqa: F401
|
||||
USBDevice,
|
||||
)
|
||||
from .models import SerialDevice, USBDevice
|
||||
from .utils import (
|
||||
async_scan_serial_ports,
|
||||
scan_serial_ports, # noqa: F401
|
||||
usb_device_from_path, # noqa: F401
|
||||
usb_device_from_port, # noqa: F401
|
||||
scan_serial_ports,
|
||||
usb_device_from_path,
|
||||
usb_device_matches_matcher,
|
||||
usb_service_info_from_device,
|
||||
usb_unique_id_from_service_info, # noqa: F401
|
||||
usb_unique_id_from_service_info,
|
||||
)
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
@@ -56,9 +52,17 @@ REQUEST_SCAN_COOLDOWN = 10 # 10 second cooldown
|
||||
ADD_REMOVE_SCAN_COOLDOWN = 5 # 5 second cooldown to give devices a chance to register
|
||||
|
||||
__all__ = [
|
||||
"SerialDevice",
|
||||
"USBCallbackMatcher",
|
||||
"USBDevice",
|
||||
"async_register_port_event_callback",
|
||||
"async_register_scan_request_callback",
|
||||
"async_scan_serial_ports",
|
||||
"scan_serial_ports",
|
||||
"usb_device_from_path",
|
||||
"usb_device_matches_matcher",
|
||||
"usb_service_info_from_device",
|
||||
"usb_unique_id_from_service_info",
|
||||
]
|
||||
|
||||
CONFIG_SCHEMA = cv.empty_config_schema(DOMAIN)
|
||||
@@ -358,7 +362,7 @@ class USBDiscovery:
|
||||
|
||||
for matcher in matched:
|
||||
for flow in self.hass.config_entries.flow.async_progress_by_init_data_type(
|
||||
_UsbServiceInfo,
|
||||
UsbServiceInfo,
|
||||
lambda flow_service_info: flow_service_info == service_info,
|
||||
):
|
||||
if matcher["domain"] != flow["handler"]:
|
||||
|
||||
@@ -7,5 +7,5 @@
|
||||
"integration_type": "system",
|
||||
"iot_class": "local_push",
|
||||
"quality_scale": "internal",
|
||||
"requirements": ["aiousbwatcher==1.1.1", "pyserial==3.5"]
|
||||
"requirements": ["aiousbwatcher==1.1.1", "serialx==1.2.2"]
|
||||
}
|
||||
|
||||
@@ -3,12 +3,10 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Sequence
|
||||
import dataclasses
|
||||
import fnmatch
|
||||
import os
|
||||
|
||||
from serial.tools.list_ports import comports
|
||||
from serial.tools.list_ports_common import ListPortInfo
|
||||
from serialx import SerialPortInfo, list_serial_ports
|
||||
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.helpers.service_info.usb import UsbServiceInfo
|
||||
@@ -17,8 +15,8 @@ from homeassistant.loader import USBMatcher
|
||||
from .models import SerialDevice, USBDevice
|
||||
|
||||
|
||||
def usb_device_from_port(port: ListPortInfo) -> USBDevice:
|
||||
"""Convert serial ListPortInfo to USBDevice."""
|
||||
def usb_device_from_port(port: SerialPortInfo) -> USBDevice:
|
||||
"""Convert serialx SerialPortInfo to USBDevice."""
|
||||
assert port.vid is not None
|
||||
assert port.pid is not None
|
||||
|
||||
@@ -28,53 +26,30 @@ def usb_device_from_port(port: ListPortInfo) -> USBDevice:
|
||||
pid=f"{hex(port.pid)[2:]:0>4}".upper(),
|
||||
serial_number=port.serial_number,
|
||||
manufacturer=port.manufacturer,
|
||||
description=port.description,
|
||||
description=port.product,
|
||||
)
|
||||
|
||||
|
||||
def serial_device_from_port(port: ListPortInfo) -> SerialDevice:
|
||||
"""Convert serial ListPortInfo to SerialDevice."""
|
||||
def serial_device_from_port(port: SerialPortInfo) -> SerialDevice:
|
||||
"""Convert serialx SerialPortInfo to SerialDevice."""
|
||||
return SerialDevice(
|
||||
device=port.device,
|
||||
serial_number=port.serial_number,
|
||||
manufacturer=port.manufacturer,
|
||||
description=port.description,
|
||||
description=port.product,
|
||||
)
|
||||
|
||||
|
||||
def usb_serial_device_from_port(port: ListPortInfo) -> USBDevice | SerialDevice:
|
||||
"""Convert serial ListPortInfo to USBDevice or SerialDevice."""
|
||||
if port.vid is not None or port.pid is not None:
|
||||
assert port.vid is not None
|
||||
assert port.pid is not None
|
||||
|
||||
def usb_serial_device_from_port(port: SerialPortInfo) -> USBDevice | SerialDevice:
|
||||
"""Convert serialx SerialPortInfo to USBDevice or SerialDevice."""
|
||||
if port.vid is not None and port.pid is not None:
|
||||
return usb_device_from_port(port)
|
||||
return serial_device_from_port(port)
|
||||
|
||||
|
||||
def scan_serial_ports() -> Sequence[USBDevice | SerialDevice]:
|
||||
"""Scan serial ports and return USB and other serial devices."""
|
||||
|
||||
# Scan all symlinks first
|
||||
by_id = "/dev/serial/by-id"
|
||||
realpath_to_by_id: dict[str, str] = {}
|
||||
if os.path.isdir(by_id):
|
||||
for path in (entry.path for entry in os.scandir(by_id) if entry.is_symlink()):
|
||||
realpath_to_by_id[os.path.realpath(path)] = path
|
||||
|
||||
serial_ports = []
|
||||
|
||||
for port in comports():
|
||||
device = usb_serial_device_from_port(port)
|
||||
device_path = realpath_to_by_id.get(port.device, port.device)
|
||||
|
||||
if device_path != port.device:
|
||||
# Prefer the unique /dev/serial/by-id/ path if it exists
|
||||
device = dataclasses.replace(device, device=device_path)
|
||||
|
||||
serial_ports.append(device)
|
||||
|
||||
return serial_ports
|
||||
return [usb_serial_device_from_port(port) for port in list_serial_ports()]
|
||||
|
||||
|
||||
async def async_scan_serial_ports(
|
||||
|
||||
@@ -7,6 +7,7 @@ import asyncio
|
||||
from collections import defaultdict
|
||||
from collections.abc import Callable, Coroutine, Iterable, Mapping
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import timedelta
|
||||
import functools
|
||||
import inspect
|
||||
import logging
|
||||
@@ -31,6 +32,7 @@ from homeassistant.const import (
|
||||
CONF_ENABLED,
|
||||
CONF_ENTITY_ID,
|
||||
CONF_EVENT_DATA,
|
||||
CONF_FOR,
|
||||
CONF_ID,
|
||||
CONF_OPTIONS,
|
||||
CONF_PLATFORM,
|
||||
@@ -74,6 +76,7 @@ from .automation import (
|
||||
get_relative_description_key,
|
||||
move_options_fields_to_top_level,
|
||||
)
|
||||
from .event import async_track_same_state
|
||||
from .integration_platform import async_process_integration_platforms
|
||||
from .selector import (
|
||||
NumericThresholdMode,
|
||||
@@ -340,6 +343,7 @@ ENTITY_STATE_TRIGGER_SCHEMA_FIRST_LAST = ENTITY_STATE_TRIGGER_SCHEMA.extend(
|
||||
vol.Required(ATTR_BEHAVIOR, default=BEHAVIOR_ANY): vol.In(
|
||||
[BEHAVIOR_FIRST, BEHAVIOR_LAST, BEHAVIOR_ANY]
|
||||
),
|
||||
vol.Optional(CONF_FOR): cv.positive_time_period_dict,
|
||||
},
|
||||
}
|
||||
)
|
||||
@@ -368,6 +372,7 @@ class EntityTriggerBase(Trigger):
|
||||
if TYPE_CHECKING:
|
||||
assert config.target is not None
|
||||
self._options = config.options or {}
|
||||
self._duration: timedelta | None = self._options.get(CONF_FOR)
|
||||
self._target = config.target
|
||||
|
||||
def entity_filter(self, entities: set[str]) -> set[str]:
|
||||
@@ -398,16 +403,13 @@ class EntityTriggerBase(Trigger):
|
||||
and state.state not in self._excluded_states
|
||||
)
|
||||
|
||||
def check_one_match(self, entity_ids: set[str]) -> bool:
|
||||
"""Check that only one entity state matches."""
|
||||
return (
|
||||
sum(
|
||||
self.is_valid_state(state)
|
||||
for entity_id in entity_ids
|
||||
if (state := self._hass.states.get(entity_id)) is not None
|
||||
and state.state not in self._excluded_states
|
||||
)
|
||||
== 1
|
||||
def count_matches(self, entity_ids: set[str]) -> int:
|
||||
"""Count the number of entity states that match."""
|
||||
return sum(
|
||||
self.is_valid_state(state)
|
||||
for entity_id in entity_ids
|
||||
if (state := self._hass.states.get(entity_id)) is not None
|
||||
and state.state not in self._excluded_states
|
||||
)
|
||||
|
||||
@override
|
||||
@@ -416,7 +418,8 @@ class EntityTriggerBase(Trigger):
|
||||
) -> CALLBACK_TYPE:
|
||||
"""Attach the trigger to an action runner."""
|
||||
|
||||
behavior = self._options.get(ATTR_BEHAVIOR)
|
||||
behavior: str = self._options.get(ATTR_BEHAVIOR, BEHAVIOR_ANY)
|
||||
unsub_track_same: dict[str, Callable[[], None]] = {}
|
||||
|
||||
@callback
|
||||
def state_change_listener(
|
||||
@@ -428,6 +431,30 @@ class EntityTriggerBase(Trigger):
|
||||
from_state = event.data["old_state"]
|
||||
to_state = event.data["new_state"]
|
||||
|
||||
def state_still_valid(
|
||||
_: str, from_state: State | None, to_state: State | None
|
||||
) -> bool:
|
||||
"""Check if the state is still valid during the duration wait.
|
||||
|
||||
Called by async_track_same_state on each state change to
|
||||
determine whether to cancel the timer.
|
||||
For behavior any, checks the individual entity's state.
|
||||
For behavior first/last, checks the combined state.
|
||||
"""
|
||||
if behavior == BEHAVIOR_LAST:
|
||||
return self.check_all_match(
|
||||
target_state_change_data.targeted_entity_ids
|
||||
)
|
||||
if behavior == BEHAVIOR_FIRST:
|
||||
return (
|
||||
self.count_matches(target_state_change_data.targeted_entity_ids)
|
||||
>= 1
|
||||
)
|
||||
# Behavior any: check the individual entity's state
|
||||
if not to_state:
|
||||
return False
|
||||
return self.is_valid_state(to_state)
|
||||
|
||||
if not from_state or not to_state:
|
||||
return
|
||||
|
||||
@@ -445,25 +472,65 @@ class EntityTriggerBase(Trigger):
|
||||
):
|
||||
return
|
||||
elif behavior == BEHAVIOR_FIRST:
|
||||
if not self.check_one_match(
|
||||
target_state_change_data.targeted_entity_ids
|
||||
# Note: It's enough to test for exactly 1 match here because if there
|
||||
# were previously 2 matches the transition would not be valid and we
|
||||
# would have returned already.
|
||||
if (
|
||||
self.count_matches(target_state_change_data.targeted_entity_ids)
|
||||
!= 1
|
||||
):
|
||||
return
|
||||
|
||||
run_action(
|
||||
{
|
||||
ATTR_ENTITY_ID: entity_id,
|
||||
"from_state": from_state,
|
||||
"to_state": to_state,
|
||||
},
|
||||
f"state of {entity_id}",
|
||||
event.context,
|
||||
@callback
|
||||
def call_action() -> None:
|
||||
"""Call action with right context."""
|
||||
# After a `for` delay, keep the original triggering event payload.
|
||||
# `async_track_same_state` only verifies the state remained valid
|
||||
# for the configured duration before firing the action.
|
||||
run_action(
|
||||
{
|
||||
ATTR_ENTITY_ID: entity_id,
|
||||
"from_state": from_state,
|
||||
"to_state": to_state,
|
||||
"for": self._duration,
|
||||
},
|
||||
f"state of {entity_id}",
|
||||
event.context,
|
||||
)
|
||||
|
||||
if not self._duration:
|
||||
call_action()
|
||||
return
|
||||
|
||||
subscription_key = entity_id if behavior == BEHAVIOR_ANY else behavior
|
||||
if subscription_key in unsub_track_same:
|
||||
unsub_track_same.pop(subscription_key)()
|
||||
unsub_track_same[subscription_key] = async_track_same_state(
|
||||
self._hass,
|
||||
self._duration,
|
||||
call_action,
|
||||
state_still_valid,
|
||||
entity_ids=(
|
||||
entity_id
|
||||
if behavior == BEHAVIOR_ANY
|
||||
else target_state_change_data.targeted_entity_ids
|
||||
),
|
||||
)
|
||||
|
||||
return async_track_target_selector_state_change_event(
|
||||
unsub = async_track_target_selector_state_change_event(
|
||||
self._hass, self._target, state_change_listener, self.entity_filter
|
||||
)
|
||||
|
||||
@callback
|
||||
def async_remove() -> None:
|
||||
"""Remove state listeners async."""
|
||||
unsub()
|
||||
for async_remove in unsub_track_same.values():
|
||||
async_remove()
|
||||
unsub_track_same.clear()
|
||||
|
||||
return async_remove
|
||||
|
||||
|
||||
class EntityTargetStateTriggerBase(EntityTriggerBase):
|
||||
"""Trigger for entity state changes to a specific state.
|
||||
|
||||
@@ -32,7 +32,7 @@ cronsim==2.7
|
||||
cryptography==46.0.7
|
||||
dbus-fast==4.0.4
|
||||
file-read-backwards==2.0.0
|
||||
fnv-hash-fast==2.0.0
|
||||
fnv-hash-fast==2.0.2
|
||||
go2rtc-client==0.4.0
|
||||
ha-ffmpeg==3.2.2
|
||||
habluetooth==6.0.0
|
||||
@@ -57,13 +57,13 @@ PyJWT==2.10.1
|
||||
pymicro-vad==1.0.1
|
||||
PyNaCl==1.6.2
|
||||
pyOpenSSL==26.0.0
|
||||
pyserial==3.5
|
||||
pyspeex-noise==1.0.2
|
||||
python-slugify==8.0.4
|
||||
PyTurboJPEG==1.8.0
|
||||
PyTurboJPEG==1.8.3
|
||||
PyYAML==6.0.3
|
||||
requests==2.33.1
|
||||
securetar==2026.4.1
|
||||
serialx==1.2.2
|
||||
SQLAlchemy==2.0.49
|
||||
standard-aifc==3.13.0
|
||||
standard-telnetlib==3.13.0
|
||||
|
||||
@@ -44,7 +44,7 @@ dependencies = [
|
||||
"certifi>=2021.5.30",
|
||||
"ciso8601==2.3.3",
|
||||
"cronsim==2.7",
|
||||
"fnv-hash-fast==2.0.0",
|
||||
"fnv-hash-fast==2.0.2",
|
||||
# hass-nabucasa is imported by helpers which don't depend on the cloud
|
||||
# integration
|
||||
"hass-nabucasa==2.2.0",
|
||||
|
||||
4
requirements.txt
generated
4
requirements.txt
generated
@@ -22,7 +22,7 @@ certifi>=2021.5.30
|
||||
ciso8601==2.3.3
|
||||
cronsim==2.7
|
||||
cryptography==46.0.7
|
||||
fnv-hash-fast==2.0.0
|
||||
fnv-hash-fast==2.0.2
|
||||
ha-ffmpeg==3.2.2
|
||||
hass-nabucasa==2.2.0
|
||||
hassil==3.5.0
|
||||
@@ -44,7 +44,7 @@ pymicro-vad==1.0.1
|
||||
pyOpenSSL==26.0.0
|
||||
pyspeex-noise==1.0.2
|
||||
python-slugify==8.0.4
|
||||
PyTurboJPEG==1.8.0
|
||||
PyTurboJPEG==1.8.3
|
||||
PyYAML==6.0.3
|
||||
requests==2.33.1
|
||||
securetar==2026.4.1
|
||||
|
||||
16
requirements_all.txt
generated
16
requirements_all.txt
generated
@@ -96,7 +96,7 @@ PyTransportNSW==0.1.1
|
||||
|
||||
# homeassistant.components.camera
|
||||
# homeassistant.components.stream
|
||||
PyTurboJPEG==1.8.0
|
||||
PyTurboJPEG==1.8.3
|
||||
|
||||
# homeassistant.components.vicare
|
||||
PyViCare==2.59.0
|
||||
@@ -1006,7 +1006,7 @@ flux-led==1.2.0
|
||||
|
||||
# homeassistant.components.homekit
|
||||
# homeassistant.components.recorder
|
||||
fnv-hash-fast==2.0.0
|
||||
fnv-hash-fast==2.0.2
|
||||
|
||||
# homeassistant.components.foobot
|
||||
foobot_async==1.0.0
|
||||
@@ -2465,13 +2465,6 @@ pysensibo==1.2.1
|
||||
# homeassistant.components.senz
|
||||
pysenz==1.0.2
|
||||
|
||||
# homeassistant.components.serial
|
||||
pyserial-asyncio-fast==0.16
|
||||
|
||||
# homeassistant.components.acer_projector
|
||||
# homeassistant.components.usb
|
||||
pyserial==3.5
|
||||
|
||||
# homeassistant.components.sesame
|
||||
pysesame2==1.0.1
|
||||
|
||||
@@ -2928,7 +2921,10 @@ sentence-stream==1.2.0
|
||||
# homeassistant.components.sentry
|
||||
sentry-sdk==2.48.0
|
||||
|
||||
# homeassistant.components.acer_projector
|
||||
# homeassistant.components.homeassistant_hardware
|
||||
# homeassistant.components.serial
|
||||
# homeassistant.components.usb
|
||||
# homeassistant.components.zha
|
||||
serialx==1.2.2
|
||||
|
||||
@@ -3163,7 +3159,7 @@ tuya-device-handlers==0.0.17
|
||||
tuya-device-sharing-sdk==0.2.8
|
||||
|
||||
# homeassistant.components.twentemilieu
|
||||
twentemilieu==2.2.1
|
||||
twentemilieu==3.0.0
|
||||
|
||||
# homeassistant.components.twilio
|
||||
twilio==6.32.0
|
||||
|
||||
13
requirements_test_all.txt
generated
13
requirements_test_all.txt
generated
@@ -93,7 +93,7 @@ PyTransportNSW==0.1.1
|
||||
|
||||
# homeassistant.components.camera
|
||||
# homeassistant.components.stream
|
||||
PyTurboJPEG==1.8.0
|
||||
PyTurboJPEG==1.8.3
|
||||
|
||||
# homeassistant.components.vicare
|
||||
PyViCare==2.59.0
|
||||
@@ -894,7 +894,7 @@ flux-led==1.2.0
|
||||
|
||||
# homeassistant.components.homekit
|
||||
# homeassistant.components.recorder
|
||||
fnv-hash-fast==2.0.0
|
||||
fnv-hash-fast==2.0.2
|
||||
|
||||
# homeassistant.components.foobot
|
||||
foobot_async==1.0.0
|
||||
@@ -2109,10 +2109,6 @@ pysensibo==1.2.1
|
||||
# homeassistant.components.senz
|
||||
pysenz==1.0.2
|
||||
|
||||
# homeassistant.components.acer_projector
|
||||
# homeassistant.components.usb
|
||||
pyserial==3.5
|
||||
|
||||
# homeassistant.components.seventeentrack
|
||||
pyseventeentrack==1.1.3
|
||||
|
||||
@@ -2485,7 +2481,10 @@ sentence-stream==1.2.0
|
||||
# homeassistant.components.sentry
|
||||
sentry-sdk==2.48.0
|
||||
|
||||
# homeassistant.components.acer_projector
|
||||
# homeassistant.components.homeassistant_hardware
|
||||
# homeassistant.components.serial
|
||||
# homeassistant.components.usb
|
||||
# homeassistant.components.zha
|
||||
serialx==1.2.2
|
||||
|
||||
@@ -2675,7 +2674,7 @@ tuya-device-handlers==0.0.17
|
||||
tuya-device-sharing-sdk==0.2.8
|
||||
|
||||
# homeassistant.components.twentemilieu
|
||||
twentemilieu==2.2.1
|
||||
twentemilieu==3.0.0
|
||||
|
||||
# homeassistant.components.twilio
|
||||
twilio==6.32.0
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
"""Tests for the HDMI-CEC media player platform."""
|
||||
|
||||
from collections.abc import Callable
|
||||
from typing import Any
|
||||
|
||||
from pycec.const import (
|
||||
@@ -58,39 +57,6 @@ from homeassistant.core import HomeAssistant
|
||||
from . import MockHDMIDevice, assert_key_press_release
|
||||
from .conftest import CecEntityCreator, HDMINetworkCreator
|
||||
|
||||
type AssertState = Callable[[str, str], None]
|
||||
|
||||
|
||||
@pytest.fixture(
|
||||
name="assert_state",
|
||||
params=[
|
||||
False,
|
||||
pytest.param(
|
||||
True,
|
||||
marks=pytest.mark.xfail(
|
||||
reason="""State isn't updated because the function is missing the
|
||||
`schedule_update_ha_state` for a correct push entity. Would still
|
||||
update once the data comes back from the device."""
|
||||
),
|
||||
),
|
||||
],
|
||||
ids=["skip_assert_state", "run_assert_state"],
|
||||
)
|
||||
def assert_state_fixture(request: pytest.FixtureRequest) -> AssertState:
|
||||
"""Allow for skipping the assert state changes.
|
||||
|
||||
This is broken in this entity, but we still want to test that
|
||||
the rest of the code works as expected.
|
||||
"""
|
||||
|
||||
def _test_state(state: str, expected: str) -> None:
|
||||
if request.param:
|
||||
assert state == expected
|
||||
else:
|
||||
assert True
|
||||
|
||||
return _test_state
|
||||
|
||||
|
||||
async def test_load_platform(
|
||||
hass: HomeAssistant,
|
||||
@@ -142,7 +108,6 @@ async def test_service_on(
|
||||
hass: HomeAssistant,
|
||||
create_hdmi_network: HDMINetworkCreator,
|
||||
create_cec_entity: CecEntityCreator,
|
||||
assert_state: AssertState,
|
||||
) -> None:
|
||||
"""Test that media_player triggers on `on` service."""
|
||||
hdmi_network = await create_hdmi_network({"platform": "media_player"})
|
||||
@@ -157,19 +122,17 @@ async def test_service_on(
|
||||
{ATTR_ENTITY_ID: "media_player.hdmi_3"},
|
||||
blocking=True,
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
mock_hdmi_device.turn_on.assert_called_once_with()
|
||||
|
||||
state = hass.states.get("media_player.hdmi_3")
|
||||
assert_state(state.state, STATE_ON)
|
||||
assert state.state == STATE_ON
|
||||
|
||||
|
||||
async def test_service_off(
|
||||
hass: HomeAssistant,
|
||||
create_hdmi_network: HDMINetworkCreator,
|
||||
create_cec_entity: CecEntityCreator,
|
||||
assert_state: AssertState,
|
||||
) -> None:
|
||||
"""Test that media_player triggers on `off` service."""
|
||||
hdmi_network = await create_hdmi_network({"platform": "media_player"})
|
||||
@@ -188,7 +151,7 @@ async def test_service_off(
|
||||
mock_hdmi_device.turn_off.assert_called_once_with()
|
||||
|
||||
state = hass.states.get("media_player.hdmi_3")
|
||||
assert_state(state.state, STATE_OFF)
|
||||
assert state.state == STATE_OFF
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
@@ -317,7 +280,6 @@ async def test_volume_services(
|
||||
data,
|
||||
blocking=True,
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert mock_hdmi_device.send_command.call_count == 2
|
||||
assert_key_press_release(mock_hdmi_device.send_command, dst=3, key=key)
|
||||
@@ -348,7 +310,6 @@ async def test_track_change_services(
|
||||
{ATTR_ENTITY_ID: "media_player.hdmi_3"},
|
||||
blocking=True,
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert mock_hdmi_device.send_command.call_count == 2
|
||||
assert_key_press_release(mock_hdmi_device.send_command, dst=3, key=key)
|
||||
@@ -373,7 +334,6 @@ async def test_playback_services(
|
||||
hass: HomeAssistant,
|
||||
create_hdmi_network: HDMINetworkCreator,
|
||||
create_cec_entity: CecEntityCreator,
|
||||
assert_state: AssertState,
|
||||
service: str,
|
||||
key: int,
|
||||
expected_state: str,
|
||||
@@ -389,13 +349,12 @@ async def test_playback_services(
|
||||
{ATTR_ENTITY_ID: "media_player.hdmi_3"},
|
||||
blocking=True,
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert mock_hdmi_device.send_command.call_count == 2
|
||||
assert_key_press_release(mock_hdmi_device.send_command, dst=3, key=key)
|
||||
|
||||
state = hass.states.get("media_player.hdmi_3")
|
||||
assert_state(state.state, expected_state)
|
||||
assert state.state == expected_state
|
||||
|
||||
|
||||
@pytest.mark.xfail(reason="PLAY feature isn't enabled")
|
||||
@@ -403,7 +362,6 @@ async def test_play_pause_service(
|
||||
hass: HomeAssistant,
|
||||
create_hdmi_network: HDMINetworkCreator,
|
||||
create_cec_entity: CecEntityCreator,
|
||||
assert_state: AssertState,
|
||||
) -> None:
|
||||
"""Test play pause service."""
|
||||
hdmi_network = await create_hdmi_network({"platform": "media_player"})
|
||||
@@ -418,13 +376,12 @@ async def test_play_pause_service(
|
||||
{ATTR_ENTITY_ID: "media_player.hdmi_3"},
|
||||
blocking=True,
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert mock_hdmi_device.send_command.call_count == 2
|
||||
assert_key_press_release(mock_hdmi_device.send_command, dst=3, key=KEY_PAUSE)
|
||||
|
||||
state = hass.states.get("media_player.hdmi_3")
|
||||
assert_state(state.state, STATE_PAUSED)
|
||||
assert state.state == STATE_PAUSED
|
||||
|
||||
await hass.services.async_call(
|
||||
MEDIA_PLAYER_DOMAIN,
|
||||
@@ -432,7 +389,6 @@ async def test_play_pause_service(
|
||||
{ATTR_ENTITY_ID: "media_player.hdmi_3"},
|
||||
blocking=True,
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
assert mock_hdmi_device.send_command.call_count == 4
|
||||
assert_key_press_release(mock_hdmi_device.send_command, 1, dst=3, key=KEY_PLAY)
|
||||
@@ -527,9 +483,6 @@ async def test_starting_state(
|
||||
assert state.state == expected_state
|
||||
|
||||
|
||||
@pytest.mark.xfail(
|
||||
reason="The code only sets the state to unavailable, doesn't set the `_attr_available` to false."
|
||||
)
|
||||
async def test_unavailable_status(
|
||||
hass: HomeAssistant,
|
||||
create_hdmi_network: HDMINetworkCreator,
|
||||
@@ -541,6 +494,7 @@ async def test_unavailable_status(
|
||||
await create_cec_entity(hdmi_network, mock_hdmi_device)
|
||||
|
||||
hass.bus.async_fire(EVENT_HDMI_CEC_UNAVAILABLE)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
state = hass.states.get("media_player.hdmi_3")
|
||||
assert state.state == STATE_UNAVAILABLE
|
||||
|
||||
@@ -2634,7 +2634,7 @@ async def help_test_reload_with_config(
|
||||
"""Test reloading with supplied config."""
|
||||
new_yaml_config_file = tmp_path / "configuration.yaml"
|
||||
|
||||
def _write_yaml_config() -> None:
|
||||
def _write_yaml_config() -> str:
|
||||
new_yaml_config = yaml.dump(config)
|
||||
new_yaml_config_file.write_text(new_yaml_config)
|
||||
assert new_yaml_config_file.read_text() == new_yaml_config
|
||||
|
||||
@@ -536,7 +536,6 @@ async def test_loading_subentries(
|
||||
async def test_loading_subentry_with_bad_component_schema(
|
||||
hass: HomeAssistant,
|
||||
mqtt_mock_entry: MqttMockHAClientGenerator,
|
||||
mqtt_config_subentries_data: tuple[dict[str, Any]],
|
||||
device_registry: dr.DeviceRegistry,
|
||||
caplog: pytest.LogCaptureFixture,
|
||||
) -> None:
|
||||
@@ -565,10 +564,9 @@ async def test_loading_subentry_with_bad_component_schema(
|
||||
)
|
||||
],
|
||||
)
|
||||
async def test_qos_on_mqt_device_from_subentry(
|
||||
async def test_qos_on_mqtt_device_from_subentry(
|
||||
hass: HomeAssistant,
|
||||
mqtt_mock_entry: MqttMockHAClientGenerator,
|
||||
mqtt_config_subentries_data: tuple[dict[str, Any]],
|
||||
device_registry: dr.DeviceRegistry,
|
||||
) -> None:
|
||||
"""Test QoS is set correctly on entities from MQTT device."""
|
||||
|
||||
@@ -7,6 +7,7 @@ import os
|
||||
from unittest.mock import MagicMock, Mock, call, patch, sentinel
|
||||
|
||||
import pytest
|
||||
from serialx import SerialPortInfo
|
||||
|
||||
from homeassistant import config_entries
|
||||
from homeassistant.components import usb
|
||||
@@ -1296,112 +1297,55 @@ async def test_register_port_event_callback_failure(
|
||||
assert "Failure 2" in caplog.text
|
||||
|
||||
|
||||
async def test_async_scan_serial_ports_with_unique_symlinks(
|
||||
hass: HomeAssistant,
|
||||
) -> None:
|
||||
"""Test async_scan_serial_ports returns devices with unique /dev/serial/by-id paths."""
|
||||
entry1 = MagicMock(spec_set=os.DirEntry)
|
||||
entry1.is_symlink.return_value = True
|
||||
entry1.path = "/dev/serial/by-id/usb-device1"
|
||||
|
||||
entry2 = MagicMock(spec_set=os.DirEntry)
|
||||
entry2.is_symlink.return_value = True
|
||||
entry2.path = "/dev/serial/by-id/usb-device2"
|
||||
|
||||
mock_port1 = MagicMock()
|
||||
mock_port1.device = "/dev/ttyUSB0"
|
||||
mock_port1.vid = 0x1234
|
||||
mock_port1.pid = 0x5678
|
||||
mock_port1.serial_number = "ABC123"
|
||||
mock_port1.manufacturer = "Test Manufacturer"
|
||||
mock_port1.description = "Test Device"
|
||||
|
||||
mock_port2 = MagicMock()
|
||||
mock_port2.device = "/dev/ttyUSB1"
|
||||
mock_port2.vid = 0xABCD
|
||||
mock_port2.pid = 0xEF01
|
||||
mock_port2.serial_number = "XYZ789"
|
||||
mock_port2.manufacturer = "Another Manufacturer"
|
||||
mock_port2.description = "Another Device"
|
||||
|
||||
def mock_realpath(path: str) -> str:
|
||||
realpath_map = {
|
||||
"/dev/serial/by-id/usb-device1": "/dev/ttyUSB0",
|
||||
"/dev/serial/by-id/usb-device2": "/dev/ttyUSB1",
|
||||
}
|
||||
return realpath_map.get(path, path)
|
||||
|
||||
with (
|
||||
patch("os.path.isdir", return_value=True),
|
||||
patch("os.scandir", return_value=[entry1, entry2]),
|
||||
patch("os.path.realpath", side_effect=mock_realpath),
|
||||
patch(
|
||||
"homeassistant.components.usb.utils.comports",
|
||||
return_value=[mock_port1, mock_port2],
|
||||
),
|
||||
async def test_async_scan_serial_ports(hass: HomeAssistant) -> None:
|
||||
"""Test async_scan_serial_ports parsing."""
|
||||
with patch(
|
||||
"homeassistant.components.usb.utils.list_serial_ports",
|
||||
return_value=[
|
||||
SerialPortInfo(
|
||||
device="/dev/ttyAMA1",
|
||||
resolved_device="/dev/ttyAMA1",
|
||||
vid=None,
|
||||
pid=None,
|
||||
serial_number=None,
|
||||
manufacturer=None,
|
||||
product=None,
|
||||
bcd_device=None,
|
||||
interface_description=None,
|
||||
interface_num=None,
|
||||
),
|
||||
SerialPortInfo(
|
||||
device="/dev/serial/by-id/usb-Nabu_Casa_ZBT-2_10B41DE589FC-if00",
|
||||
resolved_device="/dev/ttyACM0",
|
||||
vid=12346,
|
||||
pid=16385,
|
||||
serial_number="10B41DE589FC",
|
||||
manufacturer="Nabu Casa",
|
||||
product="ZBT-2",
|
||||
bcd_device=257,
|
||||
interface_description="Nabu Casa ZBT-2",
|
||||
interface_num=0,
|
||||
),
|
||||
],
|
||||
):
|
||||
devices = await async_scan_serial_ports(hass)
|
||||
|
||||
assert len(devices) == 2
|
||||
assert devices[0].device == "/dev/serial/by-id/usb-device1"
|
||||
assert devices[0].vid == "1234"
|
||||
assert devices[1].device == "/dev/serial/by-id/usb-device2"
|
||||
assert devices[1].vid == "ABCD"
|
||||
|
||||
|
||||
async def test_async_scan_serial_ports_without_unique_symlinks(
|
||||
hass: HomeAssistant,
|
||||
) -> None:
|
||||
"""Test async_scan_serial_ports returns devices with original paths when no symlinks exist."""
|
||||
mock_port = MagicMock()
|
||||
mock_port.device = "/dev/ttyUSB0"
|
||||
mock_port.vid = 0x1234
|
||||
mock_port.pid = 0x5678
|
||||
mock_port.serial_number = "ABC123"
|
||||
mock_port.manufacturer = "Test Manufacturer"
|
||||
mock_port.description = "Test Device"
|
||||
|
||||
with (
|
||||
patch("os.path.isdir", return_value=False),
|
||||
patch("os.path.realpath", side_effect=lambda x: x),
|
||||
patch(
|
||||
"homeassistant.components.usb.utils.comports",
|
||||
return_value=[mock_port],
|
||||
assert devices == [
|
||||
SerialDevice(
|
||||
device="/dev/ttyAMA1",
|
||||
serial_number=None,
|
||||
manufacturer=None,
|
||||
description=None,
|
||||
),
|
||||
):
|
||||
devices = await async_scan_serial_ports(hass)
|
||||
|
||||
assert len(devices) == 1
|
||||
assert devices[0].device == "/dev/ttyUSB0"
|
||||
assert devices[0].vid == "1234"
|
||||
|
||||
|
||||
async def test_async_scan_serial_ports_no_vid_pid(hass: HomeAssistant) -> None:
|
||||
"""Test async_scan_serial_ports returns devices without VID:PID."""
|
||||
mock_port = MagicMock()
|
||||
mock_port.device = "/dev/ttyAMA1"
|
||||
mock_port.vid = None
|
||||
mock_port.pid = None
|
||||
mock_port.serial_number = None
|
||||
mock_port.manufacturer = None
|
||||
mock_port.description = None
|
||||
|
||||
with (
|
||||
patch("os.path.isdir", return_value=False),
|
||||
patch("os.path.realpath", side_effect=lambda x: x),
|
||||
patch(
|
||||
"homeassistant.components.usb.utils.comports",
|
||||
return_value=[mock_port],
|
||||
USBDevice(
|
||||
device="/dev/serial/by-id/usb-Nabu_Casa_ZBT-2_10B41DE589FC-if00",
|
||||
vid="303A",
|
||||
pid="4001",
|
||||
serial_number="10B41DE589FC",
|
||||
manufacturer="Nabu Casa",
|
||||
description="ZBT-2",
|
||||
),
|
||||
):
|
||||
devices = await async_scan_serial_ports(hass)
|
||||
|
||||
assert len(devices) == 1
|
||||
assert isinstance(devices[0], SerialDevice)
|
||||
assert devices[0].device == "/dev/ttyAMA1"
|
||||
assert devices[0].serial_number is None
|
||||
assert devices[0].manufacturer is None
|
||||
assert devices[0].description is None
|
||||
]
|
||||
|
||||
|
||||
def test_usb_device_from_path_finds_by_symlink() -> None:
|
||||
|
||||
@@ -17,7 +17,6 @@ from unittest.mock import (
|
||||
import uuid
|
||||
|
||||
import pytest
|
||||
from serial.tools.list_ports_common import ListPortInfo
|
||||
from zha.application.const import RadioType
|
||||
from zigpy.application import ControllerApplication
|
||||
from zigpy.backups import BackupManager
|
||||
@@ -33,7 +32,7 @@ import zigpy.types
|
||||
|
||||
from homeassistant import config_entries
|
||||
from homeassistant.components.hassio import AddonError, AddonState
|
||||
from homeassistant.components.usb import USBDevice
|
||||
from homeassistant.components.usb import SerialDevice, USBDevice
|
||||
from homeassistant.components.zha import config_flow, radio_manager
|
||||
from homeassistant.components.zha.const import (
|
||||
CONF_BAUDRATE,
|
||||
@@ -66,9 +65,7 @@ from homeassistant.helpers.service_info.zeroconf import ZeroconfServiceInfo
|
||||
|
||||
from tests.common import MockConfigEntry
|
||||
|
||||
type RadioPicker = Callable[
|
||||
[RadioType], Coroutine[Any, Any, tuple[ConfigFlowResult, ListPortInfo]]
|
||||
]
|
||||
type RadioPicker = Callable[[RadioType], Coroutine[Any, Any, ConfigFlowResult]]
|
||||
PROBE_FUNCTION_PATH = "zigbee.application.ControllerApplication.probe"
|
||||
|
||||
|
||||
@@ -168,15 +165,14 @@ def mock_detect_radio_type(
|
||||
return detect
|
||||
|
||||
|
||||
def com_port(device="/dev/ttyUSB1234") -> ListPortInfo:
|
||||
def com_port(device="/dev/ttyUSB1234") -> SerialDevice:
|
||||
"""Mock of a serial port."""
|
||||
port = ListPortInfo(device)
|
||||
port.serial_number = "1234"
|
||||
port.manufacturer = "Virtual serial port"
|
||||
port.device = device
|
||||
port.description = "Some serial port"
|
||||
|
||||
return port
|
||||
return SerialDevice(
|
||||
device=device,
|
||||
serial_number="1234",
|
||||
manufacturer="Virtual serial port",
|
||||
description="Some serial port",
|
||||
)
|
||||
|
||||
|
||||
def usb_port(device="/dev/ttyUSB1234") -> USBDevice:
|
||||
@@ -1129,7 +1125,7 @@ async def test_user_flow_not_detected(hass: HomeAssistant) -> None:
|
||||
"""Test user flow, radio not detected."""
|
||||
|
||||
port = com_port()
|
||||
port_select = f"{port}, s/n: {port.serial_number} - {port.manufacturer}"
|
||||
port_select = f"{port.device} - {port.description}, s/n: {port.serial_number} - {port.manufacturer}"
|
||||
|
||||
result = await hass.config_entries.flow.async_init(
|
||||
DOMAIN,
|
||||
@@ -1700,14 +1696,12 @@ def test_prevent_overwrite_ezsp_ieee() -> None:
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def advanced_pick_radio(
|
||||
hass: HomeAssistant,
|
||||
) -> Generator[RadioPicker]:
|
||||
def advanced_pick_radio(hass: HomeAssistant) -> Generator[RadioPicker]:
|
||||
"""Fixture for the first step of the config flow (where a radio is picked)."""
|
||||
|
||||
async def wrapper(radio_type: RadioType) -> tuple[ConfigFlowResult, ListPortInfo]:
|
||||
async def wrapper(radio_type: RadioType) -> ConfigFlowResult:
|
||||
port = com_port()
|
||||
port_select = f"{port}, s/n: {port.serial_number} - {port.manufacturer}"
|
||||
port_select = f"{port.device} - {port.description}, s/n: {port.serial_number} - {port.manufacturer}"
|
||||
|
||||
with patch(
|
||||
"homeassistant.components.zha.radio_manager.ZhaRadioManager.detect_radio_type",
|
||||
|
||||
@@ -4,7 +4,6 @@ from collections.abc import Generator
|
||||
from unittest.mock import AsyncMock, MagicMock, create_autospec, patch
|
||||
|
||||
import pytest
|
||||
import serial.tools.list_ports
|
||||
from zha.application.const import RadioType
|
||||
from zigpy.backups import BackupManager
|
||||
import zigpy.config
|
||||
@@ -77,17 +76,6 @@ def mock_detect_radio_type(
|
||||
return detect
|
||||
|
||||
|
||||
def com_port(device="/dev/ttyUSB1234"):
|
||||
"""Mock of a serial port."""
|
||||
port = serial.tools.list_ports_common.ListPortInfo("/dev/ttyUSB1234")
|
||||
port.serial_number = "1234"
|
||||
port.manufacturer = "Virtual serial port"
|
||||
port.device = device
|
||||
port.description = "Some serial port"
|
||||
|
||||
return port
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_create_zigpy_app() -> Generator[MagicMock]:
|
||||
"""Mock the radio connection."""
|
||||
|
||||
@@ -2,11 +2,13 @@
|
||||
|
||||
from collections.abc import Mapping
|
||||
from contextlib import AbstractContextManager, nullcontext as does_not_raise
|
||||
import datetime
|
||||
import io
|
||||
import logging
|
||||
from typing import Any
|
||||
from unittest.mock import ANY, AsyncMock, MagicMock, Mock, call, patch
|
||||
|
||||
from freezegun.api import FrozenDateTimeFactory
|
||||
import pytest
|
||||
from pytest_unordered import unordered
|
||||
import voluptuous as vol
|
||||
@@ -20,6 +22,7 @@ from homeassistant.const import (
|
||||
ATTR_DEVICE_CLASS,
|
||||
ATTR_UNIT_OF_MEASUREMENT,
|
||||
CONF_ENTITY_ID,
|
||||
CONF_FOR,
|
||||
CONF_OPTIONS,
|
||||
CONF_PLATFORM,
|
||||
CONF_TARGET,
|
||||
@@ -72,7 +75,13 @@ from homeassistant.setup import async_setup_component
|
||||
from homeassistant.util.unit_conversion import TemperatureConverter
|
||||
from homeassistant.util.yaml.loader import parse_yaml
|
||||
|
||||
from tests.common import MockModule, MockPlatform, mock_integration, mock_platform
|
||||
from tests.common import (
|
||||
MockModule,
|
||||
MockPlatform,
|
||||
async_fire_time_changed,
|
||||
mock_integration,
|
||||
mock_platform,
|
||||
)
|
||||
from tests.typing import WebSocketGenerator
|
||||
|
||||
|
||||
@@ -3110,6 +3119,7 @@ async def _arm_off_to_on_trigger(
|
||||
entity_ids: list[str],
|
||||
behavior: str,
|
||||
calls: list[dict[str, Any]],
|
||||
duration: dict[str, int] | None,
|
||||
) -> CALLBACK_TYPE:
|
||||
"""Set up _OffToOnTrigger via async_initialize_triggers."""
|
||||
|
||||
@@ -3121,10 +3131,14 @@ async def _arm_off_to_on_trigger(
|
||||
mock_integration(hass, MockModule("test"))
|
||||
mock_platform(hass, "test.trigger", Mock(async_get_triggers=async_get_triggers))
|
||||
|
||||
options: dict[str, Any] = {ATTR_BEHAVIOR: behavior}
|
||||
if duration is not None:
|
||||
options[CONF_FOR] = duration
|
||||
|
||||
trigger_config = {
|
||||
CONF_PLATFORM: "test.off_to_on",
|
||||
CONF_TARGET: {CONF_ENTITY_ID: entity_ids},
|
||||
CONF_OPTIONS: {ATTR_BEHAVIOR: behavior},
|
||||
CONF_OPTIONS: options,
|
||||
}
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
@@ -3158,18 +3172,23 @@ def _set_or_remove_state(
|
||||
async def test_entity_trigger_fires_on_valid_transition(
|
||||
hass: HomeAssistant, behavior: str
|
||||
) -> None:
|
||||
"""Test EntityTriggerBase fires on a valid off→on transition."""
|
||||
"""Test EntityTriggerBase fires immediately on a valid off→on transition without duration."""
|
||||
entity_id = "test.entity_1"
|
||||
hass.states.async_set(entity_id, STATE_OFF)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
calls: list[dict[str, Any]] = []
|
||||
unsub = await _arm_off_to_on_trigger(hass, [entity_id], behavior, calls)
|
||||
unsub = await _arm_off_to_on_trigger(
|
||||
hass, [entity_id], behavior, calls, duration=None
|
||||
)
|
||||
|
||||
hass.states.async_set(entity_id, STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 1
|
||||
assert calls[0]["entity_id"] == entity_id
|
||||
assert calls[0]["from_state"].state == STATE_OFF
|
||||
assert calls[0]["to_state"].state == STATE_ON
|
||||
assert calls[0]["for"] is None
|
||||
|
||||
# Transition back and trigger again
|
||||
calls.clear()
|
||||
@@ -3197,7 +3216,9 @@ async def test_entity_trigger_from_invalid_initial_state(
|
||||
await hass.async_block_till_done()
|
||||
|
||||
calls: list[dict[str, Any]] = []
|
||||
unsub = await _arm_off_to_on_trigger(hass, [entity_id], behavior, calls)
|
||||
unsub = await _arm_off_to_on_trigger(
|
||||
hass, [entity_id], behavior, calls, duration=None
|
||||
)
|
||||
|
||||
# Transition to "on" from the invalid initial state
|
||||
_set_or_remove_state(hass, entity_id, STATE_ON)
|
||||
@@ -3228,7 +3249,7 @@ async def test_entity_trigger_last_requires_all(
|
||||
|
||||
calls: list[dict[str, Any]] = []
|
||||
unsub = await _arm_off_to_on_trigger(
|
||||
hass, [entity_a, entity_b], BEHAVIOR_LAST, calls
|
||||
hass, [entity_a, entity_b], BEHAVIOR_LAST, calls, duration=None
|
||||
)
|
||||
|
||||
# Turn only A on — not all match, should not fire
|
||||
@@ -3256,7 +3277,7 @@ async def test_entity_trigger_first_requires_exactly_one(
|
||||
|
||||
calls: list[dict[str, Any]] = []
|
||||
unsub = await _arm_off_to_on_trigger(
|
||||
hass, [entity_a, entity_b], BEHAVIOR_FIRST, calls
|
||||
hass, [entity_a, entity_b], BEHAVIOR_FIRST, calls, duration=None
|
||||
)
|
||||
|
||||
# Turn A on — exactly one matches, should fire
|
||||
@@ -3277,7 +3298,7 @@ async def test_entity_trigger_first_requires_exactly_one(
|
||||
[STATE_UNAVAILABLE, STATE_UNKNOWN],
|
||||
ids=["unavailable", "unknown"],
|
||||
)
|
||||
async def test_entity_trigger_last_ignores_unavailable_and_unknownentity(
|
||||
async def test_entity_trigger_last_ignores_unavailable_and_unknown_entity(
|
||||
hass: HomeAssistant, invalid_state: str
|
||||
) -> None:
|
||||
"""Test behavior last: unavailable/unknown entities are excluded from check_all_match.
|
||||
@@ -3297,7 +3318,7 @@ async def test_entity_trigger_last_ignores_unavailable_and_unknownentity(
|
||||
|
||||
calls: list[dict[str, Any]] = []
|
||||
unsub = await _arm_off_to_on_trigger(
|
||||
hass, [entity_a, entity_b, entity_c], BEHAVIOR_LAST, calls
|
||||
hass, [entity_a, entity_b, entity_c], BEHAVIOR_LAST, calls, duration=None
|
||||
)
|
||||
|
||||
# Turn A on — B is unavailable and skipped, only A is on → all doesn't match
|
||||
@@ -3349,7 +3370,7 @@ async def test_entity_trigger_first_ignores_unavailable_and_unknown_entity(
|
||||
|
||||
calls: list[dict[str, Any]] = []
|
||||
unsub = await _arm_off_to_on_trigger(
|
||||
hass, [entity_a, entity_b, entity_c], BEHAVIOR_FIRST, calls
|
||||
hass, [entity_a, entity_b, entity_c], BEHAVIOR_FIRST, calls, duration=None
|
||||
)
|
||||
|
||||
# Turn A on — B is unavailable and skipped, only A matches → exactly one
|
||||
@@ -3364,3 +3385,498 @@ async def test_entity_trigger_first_ignores_unavailable_and_unknown_entity(
|
||||
assert len(calls) == 1
|
||||
|
||||
unsub()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("behavior", [BEHAVIOR_ANY, BEHAVIOR_FIRST, BEHAVIOR_LAST])
|
||||
async def test_entity_trigger_with_duration(
|
||||
hass: HomeAssistant, freezer: FrozenDateTimeFactory, behavior: str
|
||||
) -> None:
|
||||
"""Test EntityTriggerBase waits for duration before firing."""
|
||||
entity_id = "test.entity_1"
|
||||
hass.states.async_set(entity_id, STATE_OFF)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
calls: list[dict[str, Any]] = []
|
||||
unsub = await _arm_off_to_on_trigger(
|
||||
hass, [entity_id], behavior, calls, duration={"seconds": 5}
|
||||
)
|
||||
|
||||
# Turn on — should NOT fire immediately
|
||||
hass.states.async_set(entity_id, STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 0
|
||||
|
||||
# Advance time past duration — should fire
|
||||
freezer.tick(datetime.timedelta(seconds=6))
|
||||
async_fire_time_changed(hass)
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 1
|
||||
assert calls[0]["entity_id"] == entity_id
|
||||
assert calls[0]["from_state"].state == STATE_OFF
|
||||
assert calls[0]["to_state"].state == STATE_ON
|
||||
assert calls[0]["for"] == datetime.timedelta(seconds=5)
|
||||
|
||||
unsub()
|
||||
|
||||
|
||||
@pytest.mark.parametrize("behavior", [BEHAVIOR_ANY, BEHAVIOR_FIRST, BEHAVIOR_LAST])
|
||||
async def test_entity_trigger_duration_cancelled_on_state_change(
|
||||
hass: HomeAssistant, freezer: FrozenDateTimeFactory, behavior: str
|
||||
) -> None:
|
||||
"""Test that the duration timer is cancelled if state changes back."""
|
||||
entity_id = "test.entity_1"
|
||||
hass.states.async_set(entity_id, STATE_OFF)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
calls: list[dict[str, Any]] = []
|
||||
unsub = await _arm_off_to_on_trigger(
|
||||
hass, [entity_id], behavior, calls, duration={"seconds": 5}
|
||||
)
|
||||
|
||||
# Turn on, then back off before duration expires
|
||||
hass.states.async_set(entity_id, STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
freezer.tick(datetime.timedelta(seconds=2))
|
||||
async_fire_time_changed(hass)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
hass.states.async_set(entity_id, STATE_OFF)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# Advance past the original duration — should NOT fire
|
||||
freezer.tick(datetime.timedelta(seconds=10))
|
||||
async_fire_time_changed(hass)
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 0
|
||||
|
||||
unsub()
|
||||
|
||||
|
||||
async def test_entity_trigger_duration_any_independent(
|
||||
hass: HomeAssistant, freezer: FrozenDateTimeFactory
|
||||
) -> None:
|
||||
"""Test behavior any tracks per-entity durations independently."""
|
||||
entity_a = "test.entity_a"
|
||||
entity_b = "test.entity_b"
|
||||
hass.states.async_set(entity_a, STATE_OFF)
|
||||
hass.states.async_set(entity_b, STATE_OFF)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
calls: list[dict[str, Any]] = []
|
||||
unsub = await _arm_off_to_on_trigger(
|
||||
hass, [entity_a, entity_b], BEHAVIOR_ANY, calls, duration={"seconds": 5}
|
||||
)
|
||||
|
||||
# Turn A on
|
||||
hass.states.async_set(entity_a, STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 0
|
||||
|
||||
# Turn B on 2 seconds later
|
||||
freezer.tick(datetime.timedelta(seconds=2))
|
||||
async_fire_time_changed(hass)
|
||||
hass.states.async_set(entity_b, STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 0
|
||||
|
||||
# After 5s from A's turn-on, A should fire
|
||||
freezer.tick(datetime.timedelta(seconds=3))
|
||||
async_fire_time_changed(hass)
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 1
|
||||
assert calls[0]["entity_id"] == entity_a
|
||||
|
||||
# After 5s from B's turn-on (2 more seconds), B should fire
|
||||
freezer.tick(datetime.timedelta(seconds=2))
|
||||
async_fire_time_changed(hass)
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 2
|
||||
assert calls[1]["entity_id"] == entity_b
|
||||
|
||||
unsub()
|
||||
|
||||
|
||||
async def test_entity_trigger_duration_any_entity_off_cancels_only_that_entity(
|
||||
hass: HomeAssistant, freezer: FrozenDateTimeFactory
|
||||
) -> None:
|
||||
"""Test behavior any: turning off one entity doesn't cancel the other's timer."""
|
||||
entity_a = "test.entity_a"
|
||||
entity_b = "test.entity_b"
|
||||
hass.states.async_set(entity_a, STATE_OFF)
|
||||
hass.states.async_set(entity_b, STATE_OFF)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
calls: list[dict[str, Any]] = []
|
||||
unsub = await _arm_off_to_on_trigger(
|
||||
hass, [entity_a, entity_b], BEHAVIOR_ANY, calls, duration={"seconds": 5}
|
||||
)
|
||||
|
||||
# Turn both on
|
||||
hass.states.async_set(entity_a, STATE_ON)
|
||||
hass.states.async_set(entity_b, STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# Turn A off after 2 seconds — cancels A's timer but not B's
|
||||
freezer.tick(datetime.timedelta(seconds=2))
|
||||
async_fire_time_changed(hass)
|
||||
hass.states.async_set(entity_a, STATE_OFF)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# After 5s total, B should fire but A should not
|
||||
freezer.tick(datetime.timedelta(seconds=3))
|
||||
async_fire_time_changed(hass)
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 1
|
||||
assert calls[0]["entity_id"] == entity_b
|
||||
|
||||
unsub()
|
||||
|
||||
|
||||
async def test_entity_trigger_duration_last_requires_all(
|
||||
hass: HomeAssistant, freezer: FrozenDateTimeFactory
|
||||
) -> None:
|
||||
"""Test behavior last: trigger fires only when ALL entities are on for duration."""
|
||||
entity_a = "test.entity_a"
|
||||
entity_b = "test.entity_b"
|
||||
hass.states.async_set(entity_a, STATE_OFF)
|
||||
hass.states.async_set(entity_b, STATE_OFF)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
calls: list[dict[str, Any]] = []
|
||||
unsub = await _arm_off_to_on_trigger(
|
||||
hass, [entity_a, entity_b], BEHAVIOR_LAST, calls, duration={"seconds": 5}
|
||||
)
|
||||
|
||||
# Turn only A on — should not start timer (not all match)
|
||||
hass.states.async_set(entity_a, STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
freezer.tick(datetime.timedelta(seconds=6))
|
||||
async_fire_time_changed(hass)
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 0
|
||||
|
||||
# Turn B on — now all match, timer starts
|
||||
hass.states.async_set(entity_b, STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
freezer.tick(datetime.timedelta(seconds=6))
|
||||
async_fire_time_changed(hass)
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 1
|
||||
|
||||
unsub()
|
||||
|
||||
|
||||
async def test_entity_trigger_duration_last_cancelled_when_one_turns_off(
|
||||
hass: HomeAssistant, freezer: FrozenDateTimeFactory
|
||||
) -> None:
|
||||
"""Test behavior last: timer is cancelled when one entity turns off."""
|
||||
entity_a = "test.entity_a"
|
||||
entity_b = "test.entity_b"
|
||||
hass.states.async_set(entity_a, STATE_OFF)
|
||||
hass.states.async_set(entity_b, STATE_OFF)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
calls: list[dict[str, Any]] = []
|
||||
unsub = await _arm_off_to_on_trigger(
|
||||
hass, [entity_a, entity_b], BEHAVIOR_LAST, calls, duration={"seconds": 5}
|
||||
)
|
||||
|
||||
# Turn both on
|
||||
hass.states.async_set(entity_a, STATE_ON)
|
||||
hass.states.async_set(entity_b, STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# Turn A off after 2 seconds
|
||||
freezer.tick(datetime.timedelta(seconds=2))
|
||||
async_fire_time_changed(hass)
|
||||
hass.states.async_set(entity_a, STATE_OFF)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# Advance past original duration — should NOT fire
|
||||
freezer.tick(datetime.timedelta(seconds=10))
|
||||
async_fire_time_changed(hass)
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 0
|
||||
|
||||
unsub()
|
||||
|
||||
|
||||
async def test_entity_trigger_duration_last_timer_reset(
|
||||
hass: HomeAssistant, freezer: FrozenDateTimeFactory
|
||||
) -> None:
|
||||
"""Test behavior last: timer resets when combined state goes off and back on."""
|
||||
entity_a = "test.entity_a"
|
||||
entity_b = "test.entity_b"
|
||||
hass.states.async_set(entity_a, STATE_OFF)
|
||||
hass.states.async_set(entity_b, STATE_OFF)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
calls: list[dict[str, Any]] = []
|
||||
unsub = await _arm_off_to_on_trigger(
|
||||
hass, [entity_a, entity_b], BEHAVIOR_LAST, calls, duration={"seconds": 5}
|
||||
)
|
||||
|
||||
# Turn both on — combined state "all on", timer starts
|
||||
hass.states.async_set(entity_a, STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
hass.states.async_set(entity_b, STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# After 2 seconds, B turns off — combined state breaks, timer cancelled
|
||||
freezer.tick(datetime.timedelta(seconds=2))
|
||||
async_fire_time_changed(hass)
|
||||
hass.states.async_set(entity_b, STATE_OFF)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# B turns back on — combined state restored, timer restarts
|
||||
freezer.tick(datetime.timedelta(seconds=1))
|
||||
async_fire_time_changed(hass)
|
||||
hass.states.async_set(entity_b, STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# 4 seconds after restart (not enough) — should NOT fire
|
||||
freezer.tick(datetime.timedelta(seconds=4))
|
||||
async_fire_time_changed(hass)
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 0
|
||||
|
||||
# 1 more second (5 total from restart) — should fire
|
||||
freezer.tick(datetime.timedelta(seconds=1))
|
||||
async_fire_time_changed(hass)
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 1
|
||||
|
||||
unsub()
|
||||
|
||||
|
||||
async def test_entity_trigger_duration_first_fires_when_any_on(
|
||||
hass: HomeAssistant, freezer: FrozenDateTimeFactory
|
||||
) -> None:
|
||||
"""Test behavior first: trigger fires when first entity turns on for duration."""
|
||||
entity_a = "test.entity_a"
|
||||
entity_b = "test.entity_b"
|
||||
hass.states.async_set(entity_a, STATE_OFF)
|
||||
hass.states.async_set(entity_b, STATE_OFF)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
calls: list[dict[str, Any]] = []
|
||||
unsub = await _arm_off_to_on_trigger(
|
||||
hass, [entity_a, entity_b], BEHAVIOR_FIRST, calls, duration={"seconds": 5}
|
||||
)
|
||||
|
||||
# Turn A on — combined state goes to "at least one on", timer starts
|
||||
hass.states.async_set(entity_a, STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 0
|
||||
|
||||
# Advance past duration — should fire
|
||||
freezer.tick(datetime.timedelta(seconds=6))
|
||||
async_fire_time_changed(hass)
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 1
|
||||
|
||||
unsub()
|
||||
|
||||
|
||||
async def test_entity_trigger_duration_first_not_cancelled_by_second(
|
||||
hass: HomeAssistant, freezer: FrozenDateTimeFactory
|
||||
) -> None:
|
||||
"""Test behavior first: second entity turning on doesn't restart timer."""
|
||||
entity_a = "test.entity_a"
|
||||
entity_b = "test.entity_b"
|
||||
hass.states.async_set(entity_a, STATE_OFF)
|
||||
hass.states.async_set(entity_b, STATE_OFF)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
calls: list[dict[str, Any]] = []
|
||||
unsub = await _arm_off_to_on_trigger(
|
||||
hass, [entity_a, entity_b], BEHAVIOR_FIRST, calls, duration={"seconds": 5}
|
||||
)
|
||||
|
||||
# Turn A on
|
||||
hass.states.async_set(entity_a, STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# Turn B on 3 seconds later — combined state was already "any on",
|
||||
# so this should NOT restart the timer
|
||||
freezer.tick(datetime.timedelta(seconds=3))
|
||||
async_fire_time_changed(hass)
|
||||
hass.states.async_set(entity_b, STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 0
|
||||
|
||||
# 2 more seconds (5 total from A) — should fire
|
||||
freezer.tick(datetime.timedelta(seconds=2))
|
||||
async_fire_time_changed(hass)
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 1
|
||||
|
||||
unsub()
|
||||
|
||||
|
||||
async def test_entity_trigger_duration_first_not_cancelled_by_partial_off(
|
||||
hass: HomeAssistant, freezer: FrozenDateTimeFactory
|
||||
) -> None:
|
||||
"""Test behavior first: one entity off doesn't cancel if another is still on."""
|
||||
entity_a = "test.entity_a"
|
||||
entity_b = "test.entity_b"
|
||||
hass.states.async_set(entity_a, STATE_OFF)
|
||||
hass.states.async_set(entity_b, STATE_OFF)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
calls: list[dict[str, Any]] = []
|
||||
unsub = await _arm_off_to_on_trigger(
|
||||
hass, [entity_a, entity_b], BEHAVIOR_FIRST, calls, duration={"seconds": 5}
|
||||
)
|
||||
|
||||
# Turn both on
|
||||
hass.states.async_set(entity_a, STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
hass.states.async_set(entity_b, STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# Turn A off after 2 seconds — combined state still "any on" (B is on)
|
||||
freezer.tick(datetime.timedelta(seconds=2))
|
||||
async_fire_time_changed(hass)
|
||||
hass.states.async_set(entity_a, STATE_OFF)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# Advance past duration — should still fire (combined state never went to "none on")
|
||||
freezer.tick(datetime.timedelta(seconds=4))
|
||||
async_fire_time_changed(hass)
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 1
|
||||
|
||||
unsub()
|
||||
|
||||
|
||||
async def test_entity_trigger_duration_first_cancelled_when_all_off(
|
||||
hass: HomeAssistant, freezer: FrozenDateTimeFactory
|
||||
) -> None:
|
||||
"""Test behavior first: timer cancelled when ALL entities turn off."""
|
||||
entity_a = "test.entity_a"
|
||||
entity_b = "test.entity_b"
|
||||
hass.states.async_set(entity_a, STATE_OFF)
|
||||
hass.states.async_set(entity_b, STATE_OFF)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
calls: list[dict[str, Any]] = []
|
||||
unsub = await _arm_off_to_on_trigger(
|
||||
hass, [entity_a, entity_b], BEHAVIOR_FIRST, calls, duration={"seconds": 5}
|
||||
)
|
||||
|
||||
# Turn both on
|
||||
hass.states.async_set(entity_a, STATE_ON)
|
||||
hass.states.async_set(entity_b, STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# Turn both off after 2 seconds — combined state goes to "none on"
|
||||
freezer.tick(datetime.timedelta(seconds=2))
|
||||
async_fire_time_changed(hass)
|
||||
hass.states.async_set(entity_a, STATE_OFF)
|
||||
hass.states.async_set(entity_b, STATE_OFF)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# Advance past original duration — should NOT fire
|
||||
freezer.tick(datetime.timedelta(seconds=10))
|
||||
async_fire_time_changed(hass)
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 0
|
||||
|
||||
unsub()
|
||||
|
||||
|
||||
async def test_entity_trigger_duration_any_retrigger_resets_timer(
|
||||
hass: HomeAssistant, freezer: FrozenDateTimeFactory
|
||||
) -> None:
|
||||
"""Test behavior any: turning an entity off and on resets its timer."""
|
||||
entity_id = "test.entity_1"
|
||||
hass.states.async_set(entity_id, STATE_OFF)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
calls: list[dict[str, Any]] = []
|
||||
unsub = await _arm_off_to_on_trigger(
|
||||
hass, [entity_id], BEHAVIOR_ANY, calls, duration={"seconds": 5}
|
||||
)
|
||||
|
||||
# Turn on
|
||||
hass.states.async_set(entity_id, STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# After 3 seconds, turn off and on again — resets the timer
|
||||
freezer.tick(datetime.timedelta(seconds=3))
|
||||
async_fire_time_changed(hass)
|
||||
hass.states.async_set(entity_id, STATE_OFF)
|
||||
await hass.async_block_till_done()
|
||||
hass.states.async_set(entity_id, STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# 3 more seconds (6 from start, but only 3 from retrigger) — should NOT fire
|
||||
freezer.tick(datetime.timedelta(seconds=3))
|
||||
async_fire_time_changed(hass)
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 0
|
||||
|
||||
# 2 more seconds (5 from retrigger) — should fire
|
||||
freezer.tick(datetime.timedelta(seconds=2))
|
||||
async_fire_time_changed(hass)
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == 1
|
||||
|
||||
unsub()
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
("behavior", "expected_calls"),
|
||||
[(BEHAVIOR_ANY, 0), (BEHAVIOR_FIRST, 0), (BEHAVIOR_LAST, 1)],
|
||||
)
|
||||
@pytest.mark.parametrize(
|
||||
"invalid_state",
|
||||
[STATE_UNAVAILABLE, STATE_UNKNOWN, None],
|
||||
ids=["unavailable", "unknown", "removed"],
|
||||
)
|
||||
async def test_entity_trigger_duration_cancelled_on_invalid_state(
|
||||
hass: HomeAssistant,
|
||||
freezer: FrozenDateTimeFactory,
|
||||
behavior: str,
|
||||
expected_calls: int,
|
||||
invalid_state: str | None,
|
||||
) -> None:
|
||||
"""Test if the duration timer is cancelled if entity becomes unavailable, unknown, or is removed.
|
||||
|
||||
This is expected to happen in first and any modes, but not in last mode.
|
||||
"""
|
||||
entity_a = "test.entity_a"
|
||||
entity_b = "test.entity_b"
|
||||
_set_or_remove_state(hass, entity_a, STATE_OFF)
|
||||
_set_or_remove_state(hass, entity_b, STATE_OFF)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
calls: list[dict[str, Any]] = []
|
||||
unsub = await _arm_off_to_on_trigger(
|
||||
hass, [entity_a, entity_b], behavior, calls, duration={"seconds": 5}
|
||||
)
|
||||
|
||||
# Turn on the entities needed to start the timer
|
||||
_set_or_remove_state(hass, entity_a, STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
if behavior == BEHAVIOR_LAST:
|
||||
_set_or_remove_state(hass, entity_b, STATE_ON)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# Entity A becomes invalid during the wait
|
||||
freezer.tick(datetime.timedelta(seconds=2))
|
||||
async_fire_time_changed(hass)
|
||||
_set_or_remove_state(hass, entity_a, invalid_state)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
# Advance past the original duration — should NOT fire
|
||||
freezer.tick(datetime.timedelta(seconds=10))
|
||||
async_fire_time_changed(hass)
|
||||
await hass.async_block_till_done()
|
||||
assert len(calls) == expected_calls
|
||||
|
||||
unsub()
|
||||
|
||||
Reference in New Issue
Block a user