Compare commits

...

11 Commits

Author SHA1 Message Date
dependabot[bot]
dc3778bfbe Bump actions/github-script from 8.0.0 to 9.0.0
Bumps [actions/github-script](https://github.com/actions/github-script) from 8.0.0 to 9.0.0.
- [Release notes](https://github.com/actions/github-script/releases)
- [Commits](ed597411d8...3a2844b7e9)

---
updated-dependencies:
- dependency-name: actions/github-script
  dependency-version: 9.0.0
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
2026-04-16 06:03:25 +00:00
Erik Montnemery
a9becca321 Add duration to state based entity triggers (#167740) 2026-04-16 07:38:50 +02:00
renovate[bot]
0043a307f0 Update PyTurboJPEG to 1.8.3 (#168329)
Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
2026-04-16 05:49:04 +02:00
renovate[bot]
dfb1819800 Update fnv-hash-fast to 2.0.2 (#168327)
Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>
2026-04-15 17:04:50 -10:00
puddly
12018cf9f4 Migrate remaining Core integrations from pyserial to serialx (#168325) 2026-04-15 22:39:32 -04:00
Franck Nijhof
70368c622e Extend Renovate allowlist with common packages (#168295) 2026-04-15 23:42:32 +02:00
Franck Nijhof
743aef05be Update twentemilieu to 3.0.0 (#168313) 2026-04-15 22:39:42 +02:00
Ariel Ebersberger
49e5b03c08 Migrate hdmi_cec to async (#168306) 2026-04-15 21:51:07 +02:00
Jan Bouwhuis
6bc3fcef36 Fix minor issues in MQTT tests (#168303) 2026-04-15 21:34:44 +02:00
puddly
e3e87185c5 Switch USB integration to list serial ports with serialx (#167615) 2026-04-15 19:22:45 +02:00
epenet
6d83b73cbb Simplify raise-pull-request agent push step (#167739)
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-15 18:10:31 +01:00
36 changed files with 919 additions and 452 deletions

View File

@@ -186,15 +186,11 @@ If `CHANGE_TYPE` IS "Breaking change" or "Deprecation", keep the `## Breaking ch
## Step 10: Push Branch and Create PR
```bash
# Get branch name and GitHub username
BRANCH=$(git branch --show-current)
PUSH_REMOTE=$(git config "branch.$BRANCH.remote" 2>/dev/null || git remote | head -1)
GITHUB_USER=$(gh api user --jq .login 2>/dev/null || git remote get-url "$PUSH_REMOTE" | sed -E 's#.*[:/]([^/]+)/([^/]+)(\.git)?$#\1#')
Push the branch with upstream tracking, and create a PR against `home-assistant/core` with the generated title and body:
```bash
# Create PR (gh pr create pushes the branch automatically)
gh pr create --repo home-assistant/core --base dev \
--head "$GITHUB_USER:$BRANCH" \
--draft \
--title "TITLE_HERE" \
--body "$(cat <<'EOF'

44
.github/renovate.json vendored
View File

@@ -78,6 +78,50 @@
"enabled": true,
"labels": ["dependency", "core"]
},
{
"description": "Common Python utilities (allowlisted)",
"matchPackageNames": [
"astral",
"atomicwrites-homeassistant",
"audioop-lts",
"awesomeversion",
"bcrypt",
"ciso8601",
"cronsim",
"defusedxml",
"fnv-hash-fast",
"getmac",
"ical",
"ifaddr",
"lru-dict",
"mutagen",
"propcache",
"pyserial",
"python-slugify",
"PyTurboJPEG",
"securetar",
"standard-aifc",
"standard-telnetlib",
"ulid-transform",
"url-normalize",
"xmltodict"
],
"enabled": true,
"labels": ["dependency"]
},
{
"description": "Home Assistant ecosystem packages (core-maintained, no cooldown)",
"matchPackageNames": [
"hassil",
"home-assistant-bluetooth",
"home-assistant-frontend",
"home-assistant-intents",
"infrared-protocols"
],
"enabled": true,
"minimumReleaseAge": null,
"labels": ["dependency", "core"]
},
{
"description": "Test dependencies (allowlisted)",
"matchPackageNames": [

View File

@@ -21,7 +21,7 @@ jobs:
steps:
- name: Check if integration label was added and extract details
id: extract
uses: actions/github-script@ed597411d8f924073f98dfc5c65a23a2325f34cd # v8.0.0
uses: actions/github-script@3a2844b7e9c422d3c10d287c895573f7108da1b3 # v9.0.0
with:
script: |
// Debug: Log the event payload
@@ -118,7 +118,7 @@ jobs:
- name: Fetch similar issues
id: fetch_similar
if: steps.extract.outputs.should_continue == 'true'
uses: actions/github-script@ed597411d8f924073f98dfc5c65a23a2325f34cd # v8.0.0
uses: actions/github-script@3a2844b7e9c422d3c10d287c895573f7108da1b3 # v9.0.0
env:
INTEGRATION_LABELS: ${{ steps.extract.outputs.integration_labels }}
CURRENT_NUMBER: ${{ steps.extract.outputs.current_number }}
@@ -285,7 +285,7 @@ jobs:
- name: Post duplicate detection results
id: post_results
if: steps.extract.outputs.should_continue == 'true' && steps.fetch_similar.outputs.has_similar == 'true'
uses: actions/github-script@ed597411d8f924073f98dfc5c65a23a2325f34cd # v8.0.0
uses: actions/github-script@3a2844b7e9c422d3c10d287c895573f7108da1b3 # v9.0.0
env:
AI_RESPONSE: ${{ steps.ai_detection.outputs.response }}
SIMILAR_ISSUES: ${{ steps.fetch_similar.outputs.similar_issues }}

View File

@@ -21,7 +21,7 @@ jobs:
steps:
- name: Check issue language
id: detect_language
uses: actions/github-script@ed597411d8f924073f98dfc5c65a23a2325f34cd # v8.0.0
uses: actions/github-script@3a2844b7e9c422d3c10d287c895573f7108da1b3 # v9.0.0
env:
ISSUE_NUMBER: ${{ github.event.issue.number }}
ISSUE_TITLE: ${{ github.event.issue.title }}
@@ -95,7 +95,7 @@ jobs:
- name: Process non-English issues
if: steps.detect_language.outputs.should_continue == 'true'
uses: actions/github-script@ed597411d8f924073f98dfc5c65a23a2325f34cd # v8.0.0
uses: actions/github-script@3a2844b7e9c422d3c10d287c895573f7108da1b3 # v9.0.0
env:
AI_RESPONSE: ${{ steps.ai_language_detection.outputs.response }}
ISSUE_NUMBER: ${{ steps.detect_language.outputs.issue_number }}

View File

@@ -22,7 +22,7 @@ jobs:
|| github.event.issue.type.name == 'Opportunity'
steps:
- name: Add no-stale label
uses: actions/github-script@ed597411d8f924073f98dfc5c65a23a2325f34cd # v8.0.0
uses: actions/github-script@3a2844b7e9c422d3c10d287c895573f7108da1b3 # v9.0.0
with:
script: |
await github.rest.issues.addLabels({
@@ -42,7 +42,7 @@ jobs:
if: github.event.issue.type.name == 'Task'
steps:
- name: Check if user is authorized
uses: actions/github-script@ed597411d8f924073f98dfc5c65a23a2325f34cd # v8.0.0
uses: actions/github-script@3a2844b7e9c422d3c10d287c895573f7108da1b3 # v9.0.0
with:
script: |
const issueAuthor = context.payload.issue.user.login;

View File

@@ -6,10 +6,11 @@ from typing import Final
from homeassistant.const import STATE_OFF, STATE_ON
CONF_READ_TIMEOUT: Final = "timeout"
CONF_WRITE_TIMEOUT: Final = "write_timeout"
DEFAULT_NAME: Final = "Acer Projector"
DEFAULT_TIMEOUT: Final = 1
DEFAULT_READ_TIMEOUT: Final = 1
DEFAULT_WRITE_TIMEOUT: Final = 1
ECO_MODE: Final = "ECO Mode"

View File

@@ -5,5 +5,5 @@
"documentation": "https://www.home-assistant.io/integrations/acer_projector",
"iot_class": "local_polling",
"quality_scale": "legacy",
"requirements": ["pyserial==3.5"]
"requirements": ["serialx==1.2.2"]
}

View File

@@ -6,7 +6,7 @@ import logging
import re
from typing import Any
import serial
from serialx import Serial, SerialException
import voluptuous as vol
from homeassistant.components.switch import (
@@ -16,21 +16,22 @@ from homeassistant.components.switch import (
from homeassistant.const import (
CONF_FILENAME,
CONF_NAME,
CONF_TIMEOUT,
STATE_OFF,
STATE_ON,
STATE_UNKNOWN,
)
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from .const import (
CMD_DICT,
CONF_READ_TIMEOUT,
CONF_WRITE_TIMEOUT,
DEFAULT_NAME,
DEFAULT_TIMEOUT,
DEFAULT_READ_TIMEOUT,
DEFAULT_WRITE_TIMEOUT,
ECO_MODE,
ICON,
@@ -45,7 +46,7 @@ PLATFORM_SCHEMA = SWITCH_PLATFORM_SCHEMA.extend(
{
vol.Required(CONF_FILENAME): cv.isdevice,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_TIMEOUT, default=DEFAULT_TIMEOUT): cv.positive_int,
vol.Optional(CONF_READ_TIMEOUT, default=DEFAULT_READ_TIMEOUT): cv.positive_int,
vol.Optional(
CONF_WRITE_TIMEOUT, default=DEFAULT_WRITE_TIMEOUT
): cv.positive_int,
@@ -62,10 +63,10 @@ def setup_platform(
"""Connect with serial port and return Acer Projector."""
serial_port = config[CONF_FILENAME]
name = config[CONF_NAME]
timeout = config[CONF_TIMEOUT]
read_timeout = config[CONF_READ_TIMEOUT]
write_timeout = config[CONF_WRITE_TIMEOUT]
add_entities([AcerSwitch(serial_port, name, timeout, write_timeout)], True)
add_entities([AcerSwitch(serial_port, name, read_timeout, write_timeout)], True)
class AcerSwitch(SwitchEntity):
@@ -77,14 +78,14 @@ class AcerSwitch(SwitchEntity):
self,
serial_port: str,
name: str,
timeout: int,
read_timeout: int,
write_timeout: int,
) -> None:
"""Init of the Acer projector."""
self.serial = serial.Serial(
port=serial_port, timeout=timeout, write_timeout=write_timeout
)
self._serial_port = serial_port
self._read_timeout = read_timeout
self._write_timeout = write_timeout
self._attr_name = name
self._attributes = {
LAMP_HOURS: STATE_UNKNOWN,
@@ -94,22 +95,26 @@ class AcerSwitch(SwitchEntity):
def _write_read(self, msg: str) -> str:
"""Write to the projector and read the return."""
ret = ""
# Sometimes the projector won't answer for no reason or the projector
# was disconnected during runtime.
# This way the projector can be reconnected and will still work
try:
if not self.serial.is_open:
self.serial.open()
self.serial.write(msg.encode("utf-8"))
# Size is an experience value there is no real limit.
# AFAIK there is no limit and no end character so we will usually
# need to wait for timeout
ret = self.serial.read_until(size=20).decode("utf-8")
except serial.SerialException:
_LOGGER.error("Problem communicating with %s", self._serial_port)
self.serial.close()
return ret
with Serial.from_url(
self._serial_port,
read_timeout=self._read_timeout,
write_timeout=self._write_timeout,
) as serial:
serial.write(msg.encode("utf-8"))
# Size is an experience value there is no real limit.
# AFAIK there is no limit and no end character so we will usually
# need to wait for timeout
return serial.read_until(size=20).decode("utf-8")
except (OSError, SerialException, TimeoutError) as exc:
raise HomeAssistantError(
f"Problem communicating with {self._serial_port}"
) from exc
def _write_read_format(self, msg: str) -> str:
"""Write msg, obtain answer and format output."""

View File

@@ -7,5 +7,5 @@
"documentation": "https://www.home-assistant.io/integrations/camera",
"integration_type": "entity",
"quality_scale": "internal",
"requirements": ["PyTurboJPEG==1.8.0"]
"requirements": ["PyTurboJPEG==1.8.3"]
}

View File

@@ -4,6 +4,7 @@ from __future__ import annotations
from typing import Any
from homeassistant.core import callback
from homeassistant.helpers.entity import Entity
from .const import DOMAIN, EVENT_HDMI_CEC_UNAVAILABLE
@@ -55,9 +56,10 @@ class CecEntity(Entity):
else:
self._attr_name = f"{self._device.type_name} {self._logical_address} ({self._device.osd_name})"
@callback
def _hdmi_cec_unavailable(self, callback_event):
self._attr_available = False
self.schedule_update_ha_state(False)
self.async_write_ha_state()
async def async_added_to_hass(self) -> None:
"""Register HDMI callbacks after initialization."""

View File

@@ -3,7 +3,6 @@
from __future__ import annotations
import logging
from typing import Any
from pycec.commands import CecCommand, KeyPressCommand, KeyReleaseCommand
from pycec.const import (
@@ -31,7 +30,6 @@ from homeassistant.components.media_player import (
MediaPlayerEntity,
MediaPlayerEntityFeature,
MediaPlayerState,
MediaType,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
@@ -45,20 +43,20 @@ _LOGGER = logging.getLogger(__name__)
ENTITY_ID_FORMAT = MP_DOMAIN + ".{}"
def setup_platform(
async def async_setup_platform(
hass: HomeAssistant,
config: ConfigType,
add_entities: AddEntitiesCallback,
async_add_entities: AddEntitiesCallback,
discovery_info: DiscoveryInfoType | None = None,
) -> None:
"""Find and return HDMI devices as +switches."""
"""Find and return HDMI devices as media players."""
if discovery_info and ATTR_NEW in discovery_info:
_LOGGER.debug("Setting up HDMI devices %s", discovery_info[ATTR_NEW])
entities = []
for device in discovery_info[ATTR_NEW]:
hdmi_device = hass.data[DOMAIN][device]
entities.append(CecPlayerEntity(hdmi_device, hdmi_device.logical_address))
add_entities(entities, True)
async_add_entities(entities, True)
class CecPlayerEntity(CecEntity, MediaPlayerEntity):
@@ -79,78 +77,61 @@ class CecPlayerEntity(CecEntity, MediaPlayerEntity):
def send_playback(self, key):
"""Send playback status to CEC adapter."""
self._device.async_send_command(CecCommand(key, dst=self._logical_address))
self._device.send_command(CecCommand(key, dst=self._logical_address))
def mute_volume(self, mute: bool) -> None:
async def async_mute_volume(self, mute: bool) -> None:
"""Mute volume."""
self.send_keypress(KEY_MUTE_TOGGLE)
def media_previous_track(self) -> None:
async def async_media_previous_track(self) -> None:
"""Go to previous track."""
self.send_keypress(KEY_BACKWARD)
def turn_on(self) -> None:
async def async_turn_on(self) -> None:
"""Turn device on."""
self._device.turn_on()
self._attr_state = MediaPlayerState.ON
self.async_write_ha_state()
def clear_playlist(self) -> None:
"""Clear players playlist."""
raise NotImplementedError
def turn_off(self) -> None:
async def async_turn_off(self) -> None:
"""Turn device off."""
self._device.turn_off()
self._attr_state = MediaPlayerState.OFF
self.async_write_ha_state()
def media_stop(self) -> None:
async def async_media_stop(self) -> None:
"""Stop playback."""
self.send_keypress(KEY_STOP)
self._attr_state = MediaPlayerState.IDLE
self.async_write_ha_state()
def play_media(
self, media_type: MediaType | str, media_id: str, **kwargs: Any
) -> None:
"""Not supported."""
raise NotImplementedError
def media_next_track(self) -> None:
async def async_media_next_track(self) -> None:
"""Skip to next track."""
self.send_keypress(KEY_FORWARD)
def media_seek(self, position: float) -> None:
"""Not supported."""
raise NotImplementedError
def set_volume_level(self, volume: float) -> None:
"""Set volume level, range 0..1."""
raise NotImplementedError
def media_pause(self) -> None:
async def async_media_pause(self) -> None:
"""Pause playback."""
self.send_keypress(KEY_PAUSE)
self._attr_state = MediaPlayerState.PAUSED
self.async_write_ha_state()
def select_source(self, source: str) -> None:
"""Not supported."""
raise NotImplementedError
def media_play(self) -> None:
async def async_media_play(self) -> None:
"""Start playback."""
self.send_keypress(KEY_PLAY)
self._attr_state = MediaPlayerState.PLAYING
self.async_write_ha_state()
def volume_up(self) -> None:
async def async_volume_up(self) -> None:
"""Increase volume."""
_LOGGER.debug("%s: volume up", self._logical_address)
self.send_keypress(KEY_VOLUME_UP)
def volume_down(self) -> None:
async def async_volume_down(self) -> None:
"""Decrease volume."""
_LOGGER.debug("%s: volume down", self._logical_address)
self.send_keypress(KEY_VOLUME_DOWN)
def update(self) -> None:
async def async_update(self) -> None:
"""Update device status."""
device = self._device
if device.power_status in [POWER_OFF, 3]:

View File

@@ -20,10 +20,10 @@ _LOGGER = logging.getLogger(__name__)
ENTITY_ID_FORMAT = SWITCH_DOMAIN + ".{}"
def setup_platform(
async def async_setup_platform(
hass: HomeAssistant,
config: ConfigType,
add_entities: AddEntitiesCallback,
async_add_entities: AddEntitiesCallback,
discovery_info: DiscoveryInfoType | None = None,
) -> None:
"""Find and return HDMI devices as switches."""
@@ -33,7 +33,7 @@ def setup_platform(
for device in discovery_info[ATTR_NEW]:
hdmi_device = hass.data[DOMAIN][device]
entities.append(CecSwitchEntity(hdmi_device, hdmi_device.logical_address))
add_entities(entities, True)
async_add_entities(entities, True)
class CecSwitchEntity(CecEntity, SwitchEntity):
@@ -44,19 +44,19 @@ class CecSwitchEntity(CecEntity, SwitchEntity):
CecEntity.__init__(self, device, logical)
self.entity_id = f"{SWITCH_DOMAIN}.hdmi_{hex(self._logical_address)[2:]}"
def turn_on(self, **kwargs: Any) -> None:
async def async_turn_on(self, **kwargs: Any) -> None:
"""Turn device on."""
self._device.turn_on()
self._attr_is_on = True
self.schedule_update_ha_state(force_refresh=False)
self.async_write_ha_state()
def turn_off(self, **kwargs: Any) -> None:
async def async_turn_off(self, **kwargs: Any) -> None:
"""Turn device off."""
self._device.turn_off()
self._attr_is_on = False
self.schedule_update_ha_state(force_refresh=False)
self.async_write_ha_state()
def update(self) -> None:
async def async_update(self) -> None:
"""Update device status."""
device = self._device
if device.power_status in {POWER_OFF, 3}:

View File

@@ -10,7 +10,7 @@
"loggers": ["pyhap"],
"requirements": [
"HAP-python==5.0.0",
"fnv-hash-fast==2.0.0",
"fnv-hash-fast==2.0.2",
"homekit-audio-proxy==1.2.1",
"PyQRCode==1.2.1",
"base36==0.1.1"

View File

@@ -1,7 +1,8 @@
{
"common": {
"condition_behavior_name": "Condition passes if",
"trigger_behavior_name": "Trigger when"
"trigger_behavior_name": "Trigger when",
"trigger_for_name": "For at least"
},
"conditions": {
"is_detected": {
@@ -45,6 +46,9 @@
"fields": {
"behavior": {
"name": "[%key:component::motion::common::trigger_behavior_name%]"
},
"for": {
"name": "[%key:component::motion::common::trigger_for_name%]"
}
},
"name": "Motion cleared"
@@ -54,6 +58,9 @@
"fields": {
"behavior": {
"name": "[%key:component::motion::common::trigger_behavior_name%]"
},
"for": {
"name": "[%key:component::motion::common::trigger_for_name%]"
}
},
"name": "Motion detected"

View File

@@ -9,6 +9,11 @@
- first
- last
- any
for:
required: true
default: 00:00:00
selector:
duration:
detected:
fields: *trigger_common_fields

View File

@@ -8,7 +8,7 @@
"quality_scale": "internal",
"requirements": [
"SQLAlchemy==2.0.49",
"fnv-hash-fast==2.0.0",
"fnv-hash-fast==2.0.2",
"psutil-home-assistant==0.0.1"
]
}

View File

@@ -4,5 +4,5 @@
"codeowners": ["@fabaff"],
"documentation": "https://www.home-assistant.io/integrations/serial",
"iot_class": "local_polling",
"requirements": ["pyserial-asyncio-fast==0.16"]
"requirements": ["serialx==1.2.2"]
}

View File

@@ -3,11 +3,11 @@
from __future__ import annotations
import asyncio
from asyncio import Task
import json
import logging
from serial import SerialException
import serial_asyncio_fast as serial_asyncio
from serialx import Parity, SerialException, StopBits, open_serial_connection
import voluptuous as vol
from homeassistant.components.sensor import (
@@ -18,6 +18,7 @@ from homeassistant.const import CONF_NAME, CONF_VALUE_TEMPLATE, EVENT_HOMEASSIST
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.template import Template
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
_LOGGER = logging.getLogger(__name__)
@@ -33,9 +34,9 @@ CONF_DSRDTR = "dsrdtr"
DEFAULT_NAME = "Serial Sensor"
DEFAULT_BAUDRATE = 9600
DEFAULT_BYTESIZE = serial_asyncio.serial.EIGHTBITS
DEFAULT_PARITY = serial_asyncio.serial.PARITY_NONE
DEFAULT_STOPBITS = serial_asyncio.serial.STOPBITS_ONE
DEFAULT_BYTESIZE = 8
DEFAULT_PARITY = Parity.NONE
DEFAULT_STOPBITS = StopBits.ONE
DEFAULT_XONXOFF = False
DEFAULT_RTSCTS = False
DEFAULT_DSRDTR = False
@@ -46,28 +47,21 @@ PLATFORM_SCHEMA = SENSOR_PLATFORM_SCHEMA.extend(
vol.Optional(CONF_BAUDRATE, default=DEFAULT_BAUDRATE): cv.positive_int,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_VALUE_TEMPLATE): cv.template,
vol.Optional(CONF_BYTESIZE, default=DEFAULT_BYTESIZE): vol.In(
[
serial_asyncio.serial.FIVEBITS,
serial_asyncio.serial.SIXBITS,
serial_asyncio.serial.SEVENBITS,
serial_asyncio.serial.EIGHTBITS,
]
),
vol.Optional(CONF_BYTESIZE, default=DEFAULT_BYTESIZE): vol.In([5, 6, 7, 8]),
vol.Optional(CONF_PARITY, default=DEFAULT_PARITY): vol.In(
[
serial_asyncio.serial.PARITY_NONE,
serial_asyncio.serial.PARITY_EVEN,
serial_asyncio.serial.PARITY_ODD,
serial_asyncio.serial.PARITY_MARK,
serial_asyncio.serial.PARITY_SPACE,
Parity.NONE,
Parity.EVEN,
Parity.ODD,
Parity.MARK,
Parity.SPACE,
]
),
vol.Optional(CONF_STOPBITS, default=DEFAULT_STOPBITS): vol.In(
[
serial_asyncio.serial.STOPBITS_ONE,
serial_asyncio.serial.STOPBITS_ONE_POINT_FIVE,
serial_asyncio.serial.STOPBITS_TWO,
StopBits.ONE,
StopBits.ONE_POINT_FIVE,
StopBits.TWO,
]
),
vol.Optional(CONF_XONXOFF, default=DEFAULT_XONXOFF): cv.boolean,
@@ -84,28 +78,17 @@ async def async_setup_platform(
discovery_info: DiscoveryInfoType | None = None,
) -> None:
"""Set up the Serial sensor platform."""
name = config.get(CONF_NAME)
port = config.get(CONF_SERIAL_PORT)
baudrate = config.get(CONF_BAUDRATE)
bytesize = config.get(CONF_BYTESIZE)
parity = config.get(CONF_PARITY)
stopbits = config.get(CONF_STOPBITS)
xonxoff = config.get(CONF_XONXOFF)
rtscts = config.get(CONF_RTSCTS)
dsrdtr = config.get(CONF_DSRDTR)
value_template = config.get(CONF_VALUE_TEMPLATE)
sensor = SerialSensor(
name,
port,
baudrate,
bytesize,
parity,
stopbits,
xonxoff,
rtscts,
dsrdtr,
value_template,
name=config[CONF_NAME],
port=config[CONF_SERIAL_PORT],
baudrate=config[CONF_BAUDRATE],
bytesize=config[CONF_BYTESIZE],
parity=config[CONF_PARITY],
stopbits=config[CONF_STOPBITS],
xonxoff=config[CONF_XONXOFF],
rtscts=config[CONF_RTSCTS],
dsrdtr=config[CONF_DSRDTR],
value_template=config.get(CONF_VALUE_TEMPLATE),
)
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, sensor.stop_serial_read)
@@ -119,17 +102,17 @@ class SerialSensor(SensorEntity):
def __init__(
self,
name,
port,
baudrate,
bytesize,
parity,
stopbits,
xonxoff,
rtscts,
dsrdtr,
value_template,
):
name: str,
port: str,
baudrate: int,
bytesize: int,
parity: Parity,
stopbits: StopBits,
xonxoff: bool,
rtscts: bool,
dsrdtr: bool,
value_template: Template | None,
) -> None:
"""Initialize the Serial sensor."""
self._attr_name = name
self._port = port
@@ -140,12 +123,12 @@ class SerialSensor(SensorEntity):
self._xonxoff = xonxoff
self._rtscts = rtscts
self._dsrdtr = dsrdtr
self._serial_loop_task = None
self._serial_loop_task: Task[None] | None = None
self._template = value_template
async def async_added_to_hass(self) -> None:
"""Handle when an entity is about to be added to Home Assistant."""
self._serial_loop_task = self.hass.loop.create_task(
self._serial_loop_task = self.hass.async_create_background_task(
self.serial_read(
self._port,
self._baudrate,
@@ -155,26 +138,31 @@ class SerialSensor(SensorEntity):
self._xonxoff,
self._rtscts,
self._dsrdtr,
)
),
"Serial reader",
)
async def serial_read(
self,
device,
baudrate,
bytesize,
parity,
stopbits,
xonxoff,
rtscts,
dsrdtr,
device: str,
baudrate: int,
bytesize: int,
parity: Parity,
stopbits: StopBits,
xonxoff: bool,
rtscts: bool,
dsrdtr: bool,
**kwargs,
):
"""Read the data from the port."""
logged_error = False
while True:
reader = None
writer = None
try:
reader, _ = await serial_asyncio.open_serial_connection(
reader, writer = await open_serial_connection(
url=device,
baudrate=baudrate,
bytesize=bytesize,
@@ -185,8 +173,7 @@ class SerialSensor(SensorEntity):
dsrdtr=dsrdtr,
**kwargs,
)
except SerialException:
except OSError, SerialException, TimeoutError:
if not logged_error:
_LOGGER.exception(
"Unable to connect to the serial device %s. Will retry", device
@@ -197,15 +184,15 @@ class SerialSensor(SensorEntity):
_LOGGER.debug("Serial device %s connected", device)
while True:
try:
line = await reader.readline()
except SerialException:
line_bytes = await reader.readline()
except OSError, SerialException:
_LOGGER.exception(
"Error while reading serial device %s", device
)
await self._handle_error()
break
else:
line = line.decode("utf-8").strip()
line = line_bytes.decode("utf-8").strip()
try:
data = json.loads(line)
@@ -223,6 +210,10 @@ class SerialSensor(SensorEntity):
_LOGGER.debug("Received: %s", line)
self._attr_native_value = line
self.async_write_ha_state()
finally:
if writer is not None:
writer.close()
await writer.wait_closed()
async def _handle_error(self):
"""Handle error for serial connection."""

View File

@@ -7,5 +7,5 @@
"integration_type": "system",
"iot_class": "local_push",
"quality_scale": "internal",
"requirements": ["PyTurboJPEG==1.8.0", "av==16.0.1", "numpy==2.3.2"]
"requirements": ["PyTurboJPEG==1.8.3", "av==16.0.1", "numpy==2.3.2"]
}

View File

@@ -8,5 +8,5 @@
"iot_class": "cloud_polling",
"loggers": ["twentemilieu"],
"quality_scale": "silver",
"requirements": ["twentemilieu==2.2.1"]
"requirements": ["twentemilieu==3.0.0"]
}

View File

@@ -26,24 +26,20 @@ from homeassistant.core import (
from homeassistant.helpers import config_validation as cv, discovery_flow
from homeassistant.helpers.debounce import Debouncer
from homeassistant.helpers.event import async_track_time_interval
from homeassistant.helpers.service_info.usb import UsbServiceInfo as _UsbServiceInfo
from homeassistant.helpers.service_info.usb import UsbServiceInfo
from homeassistant.helpers.typing import ConfigType
from homeassistant.loader import USBMatcher, async_get_usb
from homeassistant.util.hass_dict import HassKey
from .const import DOMAIN
from .models import (
SerialDevice, # noqa: F401
USBDevice,
)
from .models import SerialDevice, USBDevice
from .utils import (
async_scan_serial_ports,
scan_serial_ports, # noqa: F401
usb_device_from_path, # noqa: F401
usb_device_from_port, # noqa: F401
scan_serial_ports,
usb_device_from_path,
usb_device_matches_matcher,
usb_service_info_from_device,
usb_unique_id_from_service_info, # noqa: F401
usb_unique_id_from_service_info,
)
_LOGGER = logging.getLogger(__name__)
@@ -56,9 +52,17 @@ REQUEST_SCAN_COOLDOWN = 10 # 10 second cooldown
ADD_REMOVE_SCAN_COOLDOWN = 5 # 5 second cooldown to give devices a chance to register
__all__ = [
"SerialDevice",
"USBCallbackMatcher",
"USBDevice",
"async_register_port_event_callback",
"async_register_scan_request_callback",
"async_scan_serial_ports",
"scan_serial_ports",
"usb_device_from_path",
"usb_device_matches_matcher",
"usb_service_info_from_device",
"usb_unique_id_from_service_info",
]
CONFIG_SCHEMA = cv.empty_config_schema(DOMAIN)
@@ -358,7 +362,7 @@ class USBDiscovery:
for matcher in matched:
for flow in self.hass.config_entries.flow.async_progress_by_init_data_type(
_UsbServiceInfo,
UsbServiceInfo,
lambda flow_service_info: flow_service_info == service_info,
):
if matcher["domain"] != flow["handler"]:

View File

@@ -7,5 +7,5 @@
"integration_type": "system",
"iot_class": "local_push",
"quality_scale": "internal",
"requirements": ["aiousbwatcher==1.1.1", "pyserial==3.5"]
"requirements": ["aiousbwatcher==1.1.1", "serialx==1.2.2"]
}

View File

@@ -3,12 +3,10 @@
from __future__ import annotations
from collections.abc import Sequence
import dataclasses
import fnmatch
import os
from serial.tools.list_ports import comports
from serial.tools.list_ports_common import ListPortInfo
from serialx import SerialPortInfo, list_serial_ports
from homeassistant.core import HomeAssistant
from homeassistant.helpers.service_info.usb import UsbServiceInfo
@@ -17,8 +15,8 @@ from homeassistant.loader import USBMatcher
from .models import SerialDevice, USBDevice
def usb_device_from_port(port: ListPortInfo) -> USBDevice:
"""Convert serial ListPortInfo to USBDevice."""
def usb_device_from_port(port: SerialPortInfo) -> USBDevice:
"""Convert serialx SerialPortInfo to USBDevice."""
assert port.vid is not None
assert port.pid is not None
@@ -28,53 +26,30 @@ def usb_device_from_port(port: ListPortInfo) -> USBDevice:
pid=f"{hex(port.pid)[2:]:0>4}".upper(),
serial_number=port.serial_number,
manufacturer=port.manufacturer,
description=port.description,
description=port.product,
)
def serial_device_from_port(port: ListPortInfo) -> SerialDevice:
"""Convert serial ListPortInfo to SerialDevice."""
def serial_device_from_port(port: SerialPortInfo) -> SerialDevice:
"""Convert serialx SerialPortInfo to SerialDevice."""
return SerialDevice(
device=port.device,
serial_number=port.serial_number,
manufacturer=port.manufacturer,
description=port.description,
description=port.product,
)
def usb_serial_device_from_port(port: ListPortInfo) -> USBDevice | SerialDevice:
"""Convert serial ListPortInfo to USBDevice or SerialDevice."""
if port.vid is not None or port.pid is not None:
assert port.vid is not None
assert port.pid is not None
def usb_serial_device_from_port(port: SerialPortInfo) -> USBDevice | SerialDevice:
"""Convert serialx SerialPortInfo to USBDevice or SerialDevice."""
if port.vid is not None and port.pid is not None:
return usb_device_from_port(port)
return serial_device_from_port(port)
def scan_serial_ports() -> Sequence[USBDevice | SerialDevice]:
"""Scan serial ports and return USB and other serial devices."""
# Scan all symlinks first
by_id = "/dev/serial/by-id"
realpath_to_by_id: dict[str, str] = {}
if os.path.isdir(by_id):
for path in (entry.path for entry in os.scandir(by_id) if entry.is_symlink()):
realpath_to_by_id[os.path.realpath(path)] = path
serial_ports = []
for port in comports():
device = usb_serial_device_from_port(port)
device_path = realpath_to_by_id.get(port.device, port.device)
if device_path != port.device:
# Prefer the unique /dev/serial/by-id/ path if it exists
device = dataclasses.replace(device, device=device_path)
serial_ports.append(device)
return serial_ports
return [usb_serial_device_from_port(port) for port in list_serial_ports()]
async def async_scan_serial_ports(

View File

@@ -7,6 +7,7 @@ import asyncio
from collections import defaultdict
from collections.abc import Callable, Coroutine, Iterable, Mapping
from dataclasses import dataclass, field
from datetime import timedelta
import functools
import inspect
import logging
@@ -31,6 +32,7 @@ from homeassistant.const import (
CONF_ENABLED,
CONF_ENTITY_ID,
CONF_EVENT_DATA,
CONF_FOR,
CONF_ID,
CONF_OPTIONS,
CONF_PLATFORM,
@@ -74,6 +76,7 @@ from .automation import (
get_relative_description_key,
move_options_fields_to_top_level,
)
from .event import async_track_same_state
from .integration_platform import async_process_integration_platforms
from .selector import (
NumericThresholdMode,
@@ -340,6 +343,7 @@ ENTITY_STATE_TRIGGER_SCHEMA_FIRST_LAST = ENTITY_STATE_TRIGGER_SCHEMA.extend(
vol.Required(ATTR_BEHAVIOR, default=BEHAVIOR_ANY): vol.In(
[BEHAVIOR_FIRST, BEHAVIOR_LAST, BEHAVIOR_ANY]
),
vol.Optional(CONF_FOR): cv.positive_time_period_dict,
},
}
)
@@ -368,6 +372,7 @@ class EntityTriggerBase(Trigger):
if TYPE_CHECKING:
assert config.target is not None
self._options = config.options or {}
self._duration: timedelta | None = self._options.get(CONF_FOR)
self._target = config.target
def entity_filter(self, entities: set[str]) -> set[str]:
@@ -398,16 +403,13 @@ class EntityTriggerBase(Trigger):
and state.state not in self._excluded_states
)
def check_one_match(self, entity_ids: set[str]) -> bool:
"""Check that only one entity state matches."""
return (
sum(
self.is_valid_state(state)
for entity_id in entity_ids
if (state := self._hass.states.get(entity_id)) is not None
and state.state not in self._excluded_states
)
== 1
def count_matches(self, entity_ids: set[str]) -> int:
"""Count the number of entity states that match."""
return sum(
self.is_valid_state(state)
for entity_id in entity_ids
if (state := self._hass.states.get(entity_id)) is not None
and state.state not in self._excluded_states
)
@override
@@ -416,7 +418,8 @@ class EntityTriggerBase(Trigger):
) -> CALLBACK_TYPE:
"""Attach the trigger to an action runner."""
behavior = self._options.get(ATTR_BEHAVIOR)
behavior: str = self._options.get(ATTR_BEHAVIOR, BEHAVIOR_ANY)
unsub_track_same: dict[str, Callable[[], None]] = {}
@callback
def state_change_listener(
@@ -428,6 +431,30 @@ class EntityTriggerBase(Trigger):
from_state = event.data["old_state"]
to_state = event.data["new_state"]
def state_still_valid(
_: str, from_state: State | None, to_state: State | None
) -> bool:
"""Check if the state is still valid during the duration wait.
Called by async_track_same_state on each state change to
determine whether to cancel the timer.
For behavior any, checks the individual entity's state.
For behavior first/last, checks the combined state.
"""
if behavior == BEHAVIOR_LAST:
return self.check_all_match(
target_state_change_data.targeted_entity_ids
)
if behavior == BEHAVIOR_FIRST:
return (
self.count_matches(target_state_change_data.targeted_entity_ids)
>= 1
)
# Behavior any: check the individual entity's state
if not to_state:
return False
return self.is_valid_state(to_state)
if not from_state or not to_state:
return
@@ -445,25 +472,65 @@ class EntityTriggerBase(Trigger):
):
return
elif behavior == BEHAVIOR_FIRST:
if not self.check_one_match(
target_state_change_data.targeted_entity_ids
# Note: It's enough to test for exactly 1 match here because if there
# were previously 2 matches the transition would not be valid and we
# would have returned already.
if (
self.count_matches(target_state_change_data.targeted_entity_ids)
!= 1
):
return
run_action(
{
ATTR_ENTITY_ID: entity_id,
"from_state": from_state,
"to_state": to_state,
},
f"state of {entity_id}",
event.context,
@callback
def call_action() -> None:
"""Call action with right context."""
# After a `for` delay, keep the original triggering event payload.
# `async_track_same_state` only verifies the state remained valid
# for the configured duration before firing the action.
run_action(
{
ATTR_ENTITY_ID: entity_id,
"from_state": from_state,
"to_state": to_state,
"for": self._duration,
},
f"state of {entity_id}",
event.context,
)
if not self._duration:
call_action()
return
subscription_key = entity_id if behavior == BEHAVIOR_ANY else behavior
if subscription_key in unsub_track_same:
unsub_track_same.pop(subscription_key)()
unsub_track_same[subscription_key] = async_track_same_state(
self._hass,
self._duration,
call_action,
state_still_valid,
entity_ids=(
entity_id
if behavior == BEHAVIOR_ANY
else target_state_change_data.targeted_entity_ids
),
)
return async_track_target_selector_state_change_event(
unsub = async_track_target_selector_state_change_event(
self._hass, self._target, state_change_listener, self.entity_filter
)
@callback
def async_remove() -> None:
"""Remove state listeners async."""
unsub()
for async_remove in unsub_track_same.values():
async_remove()
unsub_track_same.clear()
return async_remove
class EntityTargetStateTriggerBase(EntityTriggerBase):
"""Trigger for entity state changes to a specific state.

View File

@@ -32,7 +32,7 @@ cronsim==2.7
cryptography==46.0.7
dbus-fast==4.0.4
file-read-backwards==2.0.0
fnv-hash-fast==2.0.0
fnv-hash-fast==2.0.2
go2rtc-client==0.4.0
ha-ffmpeg==3.2.2
habluetooth==6.0.0
@@ -57,13 +57,13 @@ PyJWT==2.10.1
pymicro-vad==1.0.1
PyNaCl==1.6.2
pyOpenSSL==26.0.0
pyserial==3.5
pyspeex-noise==1.0.2
python-slugify==8.0.4
PyTurboJPEG==1.8.0
PyTurboJPEG==1.8.3
PyYAML==6.0.3
requests==2.33.1
securetar==2026.4.1
serialx==1.2.2
SQLAlchemy==2.0.49
standard-aifc==3.13.0
standard-telnetlib==3.13.0

View File

@@ -44,7 +44,7 @@ dependencies = [
"certifi>=2021.5.30",
"ciso8601==2.3.3",
"cronsim==2.7",
"fnv-hash-fast==2.0.0",
"fnv-hash-fast==2.0.2",
# hass-nabucasa is imported by helpers which don't depend on the cloud
# integration
"hass-nabucasa==2.2.0",

4
requirements.txt generated
View File

@@ -22,7 +22,7 @@ certifi>=2021.5.30
ciso8601==2.3.3
cronsim==2.7
cryptography==46.0.7
fnv-hash-fast==2.0.0
fnv-hash-fast==2.0.2
ha-ffmpeg==3.2.2
hass-nabucasa==2.2.0
hassil==3.5.0
@@ -44,7 +44,7 @@ pymicro-vad==1.0.1
pyOpenSSL==26.0.0
pyspeex-noise==1.0.2
python-slugify==8.0.4
PyTurboJPEG==1.8.0
PyTurboJPEG==1.8.3
PyYAML==6.0.3
requests==2.33.1
securetar==2026.4.1

16
requirements_all.txt generated
View File

@@ -96,7 +96,7 @@ PyTransportNSW==0.1.1
# homeassistant.components.camera
# homeassistant.components.stream
PyTurboJPEG==1.8.0
PyTurboJPEG==1.8.3
# homeassistant.components.vicare
PyViCare==2.59.0
@@ -1006,7 +1006,7 @@ flux-led==1.2.0
# homeassistant.components.homekit
# homeassistant.components.recorder
fnv-hash-fast==2.0.0
fnv-hash-fast==2.0.2
# homeassistant.components.foobot
foobot_async==1.0.0
@@ -2465,13 +2465,6 @@ pysensibo==1.2.1
# homeassistant.components.senz
pysenz==1.0.2
# homeassistant.components.serial
pyserial-asyncio-fast==0.16
# homeassistant.components.acer_projector
# homeassistant.components.usb
pyserial==3.5
# homeassistant.components.sesame
pysesame2==1.0.1
@@ -2928,7 +2921,10 @@ sentence-stream==1.2.0
# homeassistant.components.sentry
sentry-sdk==2.48.0
# homeassistant.components.acer_projector
# homeassistant.components.homeassistant_hardware
# homeassistant.components.serial
# homeassistant.components.usb
# homeassistant.components.zha
serialx==1.2.2
@@ -3163,7 +3159,7 @@ tuya-device-handlers==0.0.17
tuya-device-sharing-sdk==0.2.8
# homeassistant.components.twentemilieu
twentemilieu==2.2.1
twentemilieu==3.0.0
# homeassistant.components.twilio
twilio==6.32.0

View File

@@ -93,7 +93,7 @@ PyTransportNSW==0.1.1
# homeassistant.components.camera
# homeassistant.components.stream
PyTurboJPEG==1.8.0
PyTurboJPEG==1.8.3
# homeassistant.components.vicare
PyViCare==2.59.0
@@ -894,7 +894,7 @@ flux-led==1.2.0
# homeassistant.components.homekit
# homeassistant.components.recorder
fnv-hash-fast==2.0.0
fnv-hash-fast==2.0.2
# homeassistant.components.foobot
foobot_async==1.0.0
@@ -2109,10 +2109,6 @@ pysensibo==1.2.1
# homeassistant.components.senz
pysenz==1.0.2
# homeassistant.components.acer_projector
# homeassistant.components.usb
pyserial==3.5
# homeassistant.components.seventeentrack
pyseventeentrack==1.1.3
@@ -2485,7 +2481,10 @@ sentence-stream==1.2.0
# homeassistant.components.sentry
sentry-sdk==2.48.0
# homeassistant.components.acer_projector
# homeassistant.components.homeassistant_hardware
# homeassistant.components.serial
# homeassistant.components.usb
# homeassistant.components.zha
serialx==1.2.2
@@ -2675,7 +2674,7 @@ tuya-device-handlers==0.0.17
tuya-device-sharing-sdk==0.2.8
# homeassistant.components.twentemilieu
twentemilieu==2.2.1
twentemilieu==3.0.0
# homeassistant.components.twilio
twilio==6.32.0

View File

@@ -1,6 +1,5 @@
"""Tests for the HDMI-CEC media player platform."""
from collections.abc import Callable
from typing import Any
from pycec.const import (
@@ -58,39 +57,6 @@ from homeassistant.core import HomeAssistant
from . import MockHDMIDevice, assert_key_press_release
from .conftest import CecEntityCreator, HDMINetworkCreator
type AssertState = Callable[[str, str], None]
@pytest.fixture(
name="assert_state",
params=[
False,
pytest.param(
True,
marks=pytest.mark.xfail(
reason="""State isn't updated because the function is missing the
`schedule_update_ha_state` for a correct push entity. Would still
update once the data comes back from the device."""
),
),
],
ids=["skip_assert_state", "run_assert_state"],
)
def assert_state_fixture(request: pytest.FixtureRequest) -> AssertState:
"""Allow for skipping the assert state changes.
This is broken in this entity, but we still want to test that
the rest of the code works as expected.
"""
def _test_state(state: str, expected: str) -> None:
if request.param:
assert state == expected
else:
assert True
return _test_state
async def test_load_platform(
hass: HomeAssistant,
@@ -142,7 +108,6 @@ async def test_service_on(
hass: HomeAssistant,
create_hdmi_network: HDMINetworkCreator,
create_cec_entity: CecEntityCreator,
assert_state: AssertState,
) -> None:
"""Test that media_player triggers on `on` service."""
hdmi_network = await create_hdmi_network({"platform": "media_player"})
@@ -157,19 +122,17 @@ async def test_service_on(
{ATTR_ENTITY_ID: "media_player.hdmi_3"},
blocking=True,
)
await hass.async_block_till_done()
mock_hdmi_device.turn_on.assert_called_once_with()
state = hass.states.get("media_player.hdmi_3")
assert_state(state.state, STATE_ON)
assert state.state == STATE_ON
async def test_service_off(
hass: HomeAssistant,
create_hdmi_network: HDMINetworkCreator,
create_cec_entity: CecEntityCreator,
assert_state: AssertState,
) -> None:
"""Test that media_player triggers on `off` service."""
hdmi_network = await create_hdmi_network({"platform": "media_player"})
@@ -188,7 +151,7 @@ async def test_service_off(
mock_hdmi_device.turn_off.assert_called_once_with()
state = hass.states.get("media_player.hdmi_3")
assert_state(state.state, STATE_OFF)
assert state.state == STATE_OFF
@pytest.mark.parametrize(
@@ -317,7 +280,6 @@ async def test_volume_services(
data,
blocking=True,
)
await hass.async_block_till_done()
assert mock_hdmi_device.send_command.call_count == 2
assert_key_press_release(mock_hdmi_device.send_command, dst=3, key=key)
@@ -348,7 +310,6 @@ async def test_track_change_services(
{ATTR_ENTITY_ID: "media_player.hdmi_3"},
blocking=True,
)
await hass.async_block_till_done()
assert mock_hdmi_device.send_command.call_count == 2
assert_key_press_release(mock_hdmi_device.send_command, dst=3, key=key)
@@ -373,7 +334,6 @@ async def test_playback_services(
hass: HomeAssistant,
create_hdmi_network: HDMINetworkCreator,
create_cec_entity: CecEntityCreator,
assert_state: AssertState,
service: str,
key: int,
expected_state: str,
@@ -389,13 +349,12 @@ async def test_playback_services(
{ATTR_ENTITY_ID: "media_player.hdmi_3"},
blocking=True,
)
await hass.async_block_till_done()
assert mock_hdmi_device.send_command.call_count == 2
assert_key_press_release(mock_hdmi_device.send_command, dst=3, key=key)
state = hass.states.get("media_player.hdmi_3")
assert_state(state.state, expected_state)
assert state.state == expected_state
@pytest.mark.xfail(reason="PLAY feature isn't enabled")
@@ -403,7 +362,6 @@ async def test_play_pause_service(
hass: HomeAssistant,
create_hdmi_network: HDMINetworkCreator,
create_cec_entity: CecEntityCreator,
assert_state: AssertState,
) -> None:
"""Test play pause service."""
hdmi_network = await create_hdmi_network({"platform": "media_player"})
@@ -418,13 +376,12 @@ async def test_play_pause_service(
{ATTR_ENTITY_ID: "media_player.hdmi_3"},
blocking=True,
)
await hass.async_block_till_done()
assert mock_hdmi_device.send_command.call_count == 2
assert_key_press_release(mock_hdmi_device.send_command, dst=3, key=KEY_PAUSE)
state = hass.states.get("media_player.hdmi_3")
assert_state(state.state, STATE_PAUSED)
assert state.state == STATE_PAUSED
await hass.services.async_call(
MEDIA_PLAYER_DOMAIN,
@@ -432,7 +389,6 @@ async def test_play_pause_service(
{ATTR_ENTITY_ID: "media_player.hdmi_3"},
blocking=True,
)
await hass.async_block_till_done()
assert mock_hdmi_device.send_command.call_count == 4
assert_key_press_release(mock_hdmi_device.send_command, 1, dst=3, key=KEY_PLAY)
@@ -527,9 +483,6 @@ async def test_starting_state(
assert state.state == expected_state
@pytest.mark.xfail(
reason="The code only sets the state to unavailable, doesn't set the `_attr_available` to false."
)
async def test_unavailable_status(
hass: HomeAssistant,
create_hdmi_network: HDMINetworkCreator,
@@ -541,6 +494,7 @@ async def test_unavailable_status(
await create_cec_entity(hdmi_network, mock_hdmi_device)
hass.bus.async_fire(EVENT_HDMI_CEC_UNAVAILABLE)
await hass.async_block_till_done()
state = hass.states.get("media_player.hdmi_3")
assert state.state == STATE_UNAVAILABLE

View File

@@ -2634,7 +2634,7 @@ async def help_test_reload_with_config(
"""Test reloading with supplied config."""
new_yaml_config_file = tmp_path / "configuration.yaml"
def _write_yaml_config() -> None:
def _write_yaml_config() -> str:
new_yaml_config = yaml.dump(config)
new_yaml_config_file.write_text(new_yaml_config)
assert new_yaml_config_file.read_text() == new_yaml_config

View File

@@ -536,7 +536,6 @@ async def test_loading_subentries(
async def test_loading_subentry_with_bad_component_schema(
hass: HomeAssistant,
mqtt_mock_entry: MqttMockHAClientGenerator,
mqtt_config_subentries_data: tuple[dict[str, Any]],
device_registry: dr.DeviceRegistry,
caplog: pytest.LogCaptureFixture,
) -> None:
@@ -565,10 +564,9 @@ async def test_loading_subentry_with_bad_component_schema(
)
],
)
async def test_qos_on_mqt_device_from_subentry(
async def test_qos_on_mqtt_device_from_subentry(
hass: HomeAssistant,
mqtt_mock_entry: MqttMockHAClientGenerator,
mqtt_config_subentries_data: tuple[dict[str, Any]],
device_registry: dr.DeviceRegistry,
) -> None:
"""Test QoS is set correctly on entities from MQTT device."""

View File

@@ -7,6 +7,7 @@ import os
from unittest.mock import MagicMock, Mock, call, patch, sentinel
import pytest
from serialx import SerialPortInfo
from homeassistant import config_entries
from homeassistant.components import usb
@@ -1296,112 +1297,55 @@ async def test_register_port_event_callback_failure(
assert "Failure 2" in caplog.text
async def test_async_scan_serial_ports_with_unique_symlinks(
hass: HomeAssistant,
) -> None:
"""Test async_scan_serial_ports returns devices with unique /dev/serial/by-id paths."""
entry1 = MagicMock(spec_set=os.DirEntry)
entry1.is_symlink.return_value = True
entry1.path = "/dev/serial/by-id/usb-device1"
entry2 = MagicMock(spec_set=os.DirEntry)
entry2.is_symlink.return_value = True
entry2.path = "/dev/serial/by-id/usb-device2"
mock_port1 = MagicMock()
mock_port1.device = "/dev/ttyUSB0"
mock_port1.vid = 0x1234
mock_port1.pid = 0x5678
mock_port1.serial_number = "ABC123"
mock_port1.manufacturer = "Test Manufacturer"
mock_port1.description = "Test Device"
mock_port2 = MagicMock()
mock_port2.device = "/dev/ttyUSB1"
mock_port2.vid = 0xABCD
mock_port2.pid = 0xEF01
mock_port2.serial_number = "XYZ789"
mock_port2.manufacturer = "Another Manufacturer"
mock_port2.description = "Another Device"
def mock_realpath(path: str) -> str:
realpath_map = {
"/dev/serial/by-id/usb-device1": "/dev/ttyUSB0",
"/dev/serial/by-id/usb-device2": "/dev/ttyUSB1",
}
return realpath_map.get(path, path)
with (
patch("os.path.isdir", return_value=True),
patch("os.scandir", return_value=[entry1, entry2]),
patch("os.path.realpath", side_effect=mock_realpath),
patch(
"homeassistant.components.usb.utils.comports",
return_value=[mock_port1, mock_port2],
),
async def test_async_scan_serial_ports(hass: HomeAssistant) -> None:
"""Test async_scan_serial_ports parsing."""
with patch(
"homeassistant.components.usb.utils.list_serial_ports",
return_value=[
SerialPortInfo(
device="/dev/ttyAMA1",
resolved_device="/dev/ttyAMA1",
vid=None,
pid=None,
serial_number=None,
manufacturer=None,
product=None,
bcd_device=None,
interface_description=None,
interface_num=None,
),
SerialPortInfo(
device="/dev/serial/by-id/usb-Nabu_Casa_ZBT-2_10B41DE589FC-if00",
resolved_device="/dev/ttyACM0",
vid=12346,
pid=16385,
serial_number="10B41DE589FC",
manufacturer="Nabu Casa",
product="ZBT-2",
bcd_device=257,
interface_description="Nabu Casa ZBT-2",
interface_num=0,
),
],
):
devices = await async_scan_serial_ports(hass)
assert len(devices) == 2
assert devices[0].device == "/dev/serial/by-id/usb-device1"
assert devices[0].vid == "1234"
assert devices[1].device == "/dev/serial/by-id/usb-device2"
assert devices[1].vid == "ABCD"
async def test_async_scan_serial_ports_without_unique_symlinks(
hass: HomeAssistant,
) -> None:
"""Test async_scan_serial_ports returns devices with original paths when no symlinks exist."""
mock_port = MagicMock()
mock_port.device = "/dev/ttyUSB0"
mock_port.vid = 0x1234
mock_port.pid = 0x5678
mock_port.serial_number = "ABC123"
mock_port.manufacturer = "Test Manufacturer"
mock_port.description = "Test Device"
with (
patch("os.path.isdir", return_value=False),
patch("os.path.realpath", side_effect=lambda x: x),
patch(
"homeassistant.components.usb.utils.comports",
return_value=[mock_port],
assert devices == [
SerialDevice(
device="/dev/ttyAMA1",
serial_number=None,
manufacturer=None,
description=None,
),
):
devices = await async_scan_serial_ports(hass)
assert len(devices) == 1
assert devices[0].device == "/dev/ttyUSB0"
assert devices[0].vid == "1234"
async def test_async_scan_serial_ports_no_vid_pid(hass: HomeAssistant) -> None:
"""Test async_scan_serial_ports returns devices without VID:PID."""
mock_port = MagicMock()
mock_port.device = "/dev/ttyAMA1"
mock_port.vid = None
mock_port.pid = None
mock_port.serial_number = None
mock_port.manufacturer = None
mock_port.description = None
with (
patch("os.path.isdir", return_value=False),
patch("os.path.realpath", side_effect=lambda x: x),
patch(
"homeassistant.components.usb.utils.comports",
return_value=[mock_port],
USBDevice(
device="/dev/serial/by-id/usb-Nabu_Casa_ZBT-2_10B41DE589FC-if00",
vid="303A",
pid="4001",
serial_number="10B41DE589FC",
manufacturer="Nabu Casa",
description="ZBT-2",
),
):
devices = await async_scan_serial_ports(hass)
assert len(devices) == 1
assert isinstance(devices[0], SerialDevice)
assert devices[0].device == "/dev/ttyAMA1"
assert devices[0].serial_number is None
assert devices[0].manufacturer is None
assert devices[0].description is None
]
def test_usb_device_from_path_finds_by_symlink() -> None:

View File

@@ -17,7 +17,6 @@ from unittest.mock import (
import uuid
import pytest
from serial.tools.list_ports_common import ListPortInfo
from zha.application.const import RadioType
from zigpy.application import ControllerApplication
from zigpy.backups import BackupManager
@@ -33,7 +32,7 @@ import zigpy.types
from homeassistant import config_entries
from homeassistant.components.hassio import AddonError, AddonState
from homeassistant.components.usb import USBDevice
from homeassistant.components.usb import SerialDevice, USBDevice
from homeassistant.components.zha import config_flow, radio_manager
from homeassistant.components.zha.const import (
CONF_BAUDRATE,
@@ -66,9 +65,7 @@ from homeassistant.helpers.service_info.zeroconf import ZeroconfServiceInfo
from tests.common import MockConfigEntry
type RadioPicker = Callable[
[RadioType], Coroutine[Any, Any, tuple[ConfigFlowResult, ListPortInfo]]
]
type RadioPicker = Callable[[RadioType], Coroutine[Any, Any, ConfigFlowResult]]
PROBE_FUNCTION_PATH = "zigbee.application.ControllerApplication.probe"
@@ -168,15 +165,14 @@ def mock_detect_radio_type(
return detect
def com_port(device="/dev/ttyUSB1234") -> ListPortInfo:
def com_port(device="/dev/ttyUSB1234") -> SerialDevice:
"""Mock of a serial port."""
port = ListPortInfo(device)
port.serial_number = "1234"
port.manufacturer = "Virtual serial port"
port.device = device
port.description = "Some serial port"
return port
return SerialDevice(
device=device,
serial_number="1234",
manufacturer="Virtual serial port",
description="Some serial port",
)
def usb_port(device="/dev/ttyUSB1234") -> USBDevice:
@@ -1129,7 +1125,7 @@ async def test_user_flow_not_detected(hass: HomeAssistant) -> None:
"""Test user flow, radio not detected."""
port = com_port()
port_select = f"{port}, s/n: {port.serial_number} - {port.manufacturer}"
port_select = f"{port.device} - {port.description}, s/n: {port.serial_number} - {port.manufacturer}"
result = await hass.config_entries.flow.async_init(
DOMAIN,
@@ -1700,14 +1696,12 @@ def test_prevent_overwrite_ezsp_ieee() -> None:
@pytest.fixture
def advanced_pick_radio(
hass: HomeAssistant,
) -> Generator[RadioPicker]:
def advanced_pick_radio(hass: HomeAssistant) -> Generator[RadioPicker]:
"""Fixture for the first step of the config flow (where a radio is picked)."""
async def wrapper(radio_type: RadioType) -> tuple[ConfigFlowResult, ListPortInfo]:
async def wrapper(radio_type: RadioType) -> ConfigFlowResult:
port = com_port()
port_select = f"{port}, s/n: {port.serial_number} - {port.manufacturer}"
port_select = f"{port.device} - {port.description}, s/n: {port.serial_number} - {port.manufacturer}"
with patch(
"homeassistant.components.zha.radio_manager.ZhaRadioManager.detect_radio_type",

View File

@@ -4,7 +4,6 @@ from collections.abc import Generator
from unittest.mock import AsyncMock, MagicMock, create_autospec, patch
import pytest
import serial.tools.list_ports
from zha.application.const import RadioType
from zigpy.backups import BackupManager
import zigpy.config
@@ -77,17 +76,6 @@ def mock_detect_radio_type(
return detect
def com_port(device="/dev/ttyUSB1234"):
"""Mock of a serial port."""
port = serial.tools.list_ports_common.ListPortInfo("/dev/ttyUSB1234")
port.serial_number = "1234"
port.manufacturer = "Virtual serial port"
port.device = device
port.description = "Some serial port"
return port
@pytest.fixture
def mock_create_zigpy_app() -> Generator[MagicMock]:
"""Mock the radio connection."""

View File

@@ -2,11 +2,13 @@
from collections.abc import Mapping
from contextlib import AbstractContextManager, nullcontext as does_not_raise
import datetime
import io
import logging
from typing import Any
from unittest.mock import ANY, AsyncMock, MagicMock, Mock, call, patch
from freezegun.api import FrozenDateTimeFactory
import pytest
from pytest_unordered import unordered
import voluptuous as vol
@@ -20,6 +22,7 @@ from homeassistant.const import (
ATTR_DEVICE_CLASS,
ATTR_UNIT_OF_MEASUREMENT,
CONF_ENTITY_ID,
CONF_FOR,
CONF_OPTIONS,
CONF_PLATFORM,
CONF_TARGET,
@@ -72,7 +75,13 @@ from homeassistant.setup import async_setup_component
from homeassistant.util.unit_conversion import TemperatureConverter
from homeassistant.util.yaml.loader import parse_yaml
from tests.common import MockModule, MockPlatform, mock_integration, mock_platform
from tests.common import (
MockModule,
MockPlatform,
async_fire_time_changed,
mock_integration,
mock_platform,
)
from tests.typing import WebSocketGenerator
@@ -3110,6 +3119,7 @@ async def _arm_off_to_on_trigger(
entity_ids: list[str],
behavior: str,
calls: list[dict[str, Any]],
duration: dict[str, int] | None,
) -> CALLBACK_TYPE:
"""Set up _OffToOnTrigger via async_initialize_triggers."""
@@ -3121,10 +3131,14 @@ async def _arm_off_to_on_trigger(
mock_integration(hass, MockModule("test"))
mock_platform(hass, "test.trigger", Mock(async_get_triggers=async_get_triggers))
options: dict[str, Any] = {ATTR_BEHAVIOR: behavior}
if duration is not None:
options[CONF_FOR] = duration
trigger_config = {
CONF_PLATFORM: "test.off_to_on",
CONF_TARGET: {CONF_ENTITY_ID: entity_ids},
CONF_OPTIONS: {ATTR_BEHAVIOR: behavior},
CONF_OPTIONS: options,
}
log = logging.getLogger(__name__)
@@ -3158,18 +3172,23 @@ def _set_or_remove_state(
async def test_entity_trigger_fires_on_valid_transition(
hass: HomeAssistant, behavior: str
) -> None:
"""Test EntityTriggerBase fires on a valid off→on transition."""
"""Test EntityTriggerBase fires immediately on a valid off→on transition without duration."""
entity_id = "test.entity_1"
hass.states.async_set(entity_id, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_off_to_on_trigger(hass, [entity_id], behavior, calls)
unsub = await _arm_off_to_on_trigger(
hass, [entity_id], behavior, calls, duration=None
)
hass.states.async_set(entity_id, STATE_ON)
await hass.async_block_till_done()
assert len(calls) == 1
assert calls[0]["entity_id"] == entity_id
assert calls[0]["from_state"].state == STATE_OFF
assert calls[0]["to_state"].state == STATE_ON
assert calls[0]["for"] is None
# Transition back and trigger again
calls.clear()
@@ -3197,7 +3216,9 @@ async def test_entity_trigger_from_invalid_initial_state(
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_off_to_on_trigger(hass, [entity_id], behavior, calls)
unsub = await _arm_off_to_on_trigger(
hass, [entity_id], behavior, calls, duration=None
)
# Transition to "on" from the invalid initial state
_set_or_remove_state(hass, entity_id, STATE_ON)
@@ -3228,7 +3249,7 @@ async def test_entity_trigger_last_requires_all(
calls: list[dict[str, Any]] = []
unsub = await _arm_off_to_on_trigger(
hass, [entity_a, entity_b], BEHAVIOR_LAST, calls
hass, [entity_a, entity_b], BEHAVIOR_LAST, calls, duration=None
)
# Turn only A on — not all match, should not fire
@@ -3256,7 +3277,7 @@ async def test_entity_trigger_first_requires_exactly_one(
calls: list[dict[str, Any]] = []
unsub = await _arm_off_to_on_trigger(
hass, [entity_a, entity_b], BEHAVIOR_FIRST, calls
hass, [entity_a, entity_b], BEHAVIOR_FIRST, calls, duration=None
)
# Turn A on — exactly one matches, should fire
@@ -3277,7 +3298,7 @@ async def test_entity_trigger_first_requires_exactly_one(
[STATE_UNAVAILABLE, STATE_UNKNOWN],
ids=["unavailable", "unknown"],
)
async def test_entity_trigger_last_ignores_unavailable_and_unknownentity(
async def test_entity_trigger_last_ignores_unavailable_and_unknown_entity(
hass: HomeAssistant, invalid_state: str
) -> None:
"""Test behavior last: unavailable/unknown entities are excluded from check_all_match.
@@ -3297,7 +3318,7 @@ async def test_entity_trigger_last_ignores_unavailable_and_unknownentity(
calls: list[dict[str, Any]] = []
unsub = await _arm_off_to_on_trigger(
hass, [entity_a, entity_b, entity_c], BEHAVIOR_LAST, calls
hass, [entity_a, entity_b, entity_c], BEHAVIOR_LAST, calls, duration=None
)
# Turn A on — B is unavailable and skipped, only A is on → all doesn't match
@@ -3349,7 +3370,7 @@ async def test_entity_trigger_first_ignores_unavailable_and_unknown_entity(
calls: list[dict[str, Any]] = []
unsub = await _arm_off_to_on_trigger(
hass, [entity_a, entity_b, entity_c], BEHAVIOR_FIRST, calls
hass, [entity_a, entity_b, entity_c], BEHAVIOR_FIRST, calls, duration=None
)
# Turn A on — B is unavailable and skipped, only A matches → exactly one
@@ -3364,3 +3385,498 @@ async def test_entity_trigger_first_ignores_unavailable_and_unknown_entity(
assert len(calls) == 1
unsub()
@pytest.mark.parametrize("behavior", [BEHAVIOR_ANY, BEHAVIOR_FIRST, BEHAVIOR_LAST])
async def test_entity_trigger_with_duration(
hass: HomeAssistant, freezer: FrozenDateTimeFactory, behavior: str
) -> None:
"""Test EntityTriggerBase waits for duration before firing."""
entity_id = "test.entity_1"
hass.states.async_set(entity_id, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_off_to_on_trigger(
hass, [entity_id], behavior, calls, duration={"seconds": 5}
)
# Turn on — should NOT fire immediately
hass.states.async_set(entity_id, STATE_ON)
await hass.async_block_till_done()
assert len(calls) == 0
# Advance time past duration — should fire
freezer.tick(datetime.timedelta(seconds=6))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 1
assert calls[0]["entity_id"] == entity_id
assert calls[0]["from_state"].state == STATE_OFF
assert calls[0]["to_state"].state == STATE_ON
assert calls[0]["for"] == datetime.timedelta(seconds=5)
unsub()
@pytest.mark.parametrize("behavior", [BEHAVIOR_ANY, BEHAVIOR_FIRST, BEHAVIOR_LAST])
async def test_entity_trigger_duration_cancelled_on_state_change(
hass: HomeAssistant, freezer: FrozenDateTimeFactory, behavior: str
) -> None:
"""Test that the duration timer is cancelled if state changes back."""
entity_id = "test.entity_1"
hass.states.async_set(entity_id, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_off_to_on_trigger(
hass, [entity_id], behavior, calls, duration={"seconds": 5}
)
# Turn on, then back off before duration expires
hass.states.async_set(entity_id, STATE_ON)
await hass.async_block_till_done()
freezer.tick(datetime.timedelta(seconds=2))
async_fire_time_changed(hass)
await hass.async_block_till_done()
hass.states.async_set(entity_id, STATE_OFF)
await hass.async_block_till_done()
# Advance past the original duration — should NOT fire
freezer.tick(datetime.timedelta(seconds=10))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 0
unsub()
async def test_entity_trigger_duration_any_independent(
hass: HomeAssistant, freezer: FrozenDateTimeFactory
) -> None:
"""Test behavior any tracks per-entity durations independently."""
entity_a = "test.entity_a"
entity_b = "test.entity_b"
hass.states.async_set(entity_a, STATE_OFF)
hass.states.async_set(entity_b, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_off_to_on_trigger(
hass, [entity_a, entity_b], BEHAVIOR_ANY, calls, duration={"seconds": 5}
)
# Turn A on
hass.states.async_set(entity_a, STATE_ON)
await hass.async_block_till_done()
assert len(calls) == 0
# Turn B on 2 seconds later
freezer.tick(datetime.timedelta(seconds=2))
async_fire_time_changed(hass)
hass.states.async_set(entity_b, STATE_ON)
await hass.async_block_till_done()
assert len(calls) == 0
# After 5s from A's turn-on, A should fire
freezer.tick(datetime.timedelta(seconds=3))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 1
assert calls[0]["entity_id"] == entity_a
# After 5s from B's turn-on (2 more seconds), B should fire
freezer.tick(datetime.timedelta(seconds=2))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 2
assert calls[1]["entity_id"] == entity_b
unsub()
async def test_entity_trigger_duration_any_entity_off_cancels_only_that_entity(
hass: HomeAssistant, freezer: FrozenDateTimeFactory
) -> None:
"""Test behavior any: turning off one entity doesn't cancel the other's timer."""
entity_a = "test.entity_a"
entity_b = "test.entity_b"
hass.states.async_set(entity_a, STATE_OFF)
hass.states.async_set(entity_b, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_off_to_on_trigger(
hass, [entity_a, entity_b], BEHAVIOR_ANY, calls, duration={"seconds": 5}
)
# Turn both on
hass.states.async_set(entity_a, STATE_ON)
hass.states.async_set(entity_b, STATE_ON)
await hass.async_block_till_done()
# Turn A off after 2 seconds — cancels A's timer but not B's
freezer.tick(datetime.timedelta(seconds=2))
async_fire_time_changed(hass)
hass.states.async_set(entity_a, STATE_OFF)
await hass.async_block_till_done()
# After 5s total, B should fire but A should not
freezer.tick(datetime.timedelta(seconds=3))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 1
assert calls[0]["entity_id"] == entity_b
unsub()
async def test_entity_trigger_duration_last_requires_all(
hass: HomeAssistant, freezer: FrozenDateTimeFactory
) -> None:
"""Test behavior last: trigger fires only when ALL entities are on for duration."""
entity_a = "test.entity_a"
entity_b = "test.entity_b"
hass.states.async_set(entity_a, STATE_OFF)
hass.states.async_set(entity_b, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_off_to_on_trigger(
hass, [entity_a, entity_b], BEHAVIOR_LAST, calls, duration={"seconds": 5}
)
# Turn only A on — should not start timer (not all match)
hass.states.async_set(entity_a, STATE_ON)
await hass.async_block_till_done()
freezer.tick(datetime.timedelta(seconds=6))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 0
# Turn B on — now all match, timer starts
hass.states.async_set(entity_b, STATE_ON)
await hass.async_block_till_done()
freezer.tick(datetime.timedelta(seconds=6))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 1
unsub()
async def test_entity_trigger_duration_last_cancelled_when_one_turns_off(
hass: HomeAssistant, freezer: FrozenDateTimeFactory
) -> None:
"""Test behavior last: timer is cancelled when one entity turns off."""
entity_a = "test.entity_a"
entity_b = "test.entity_b"
hass.states.async_set(entity_a, STATE_OFF)
hass.states.async_set(entity_b, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_off_to_on_trigger(
hass, [entity_a, entity_b], BEHAVIOR_LAST, calls, duration={"seconds": 5}
)
# Turn both on
hass.states.async_set(entity_a, STATE_ON)
hass.states.async_set(entity_b, STATE_ON)
await hass.async_block_till_done()
# Turn A off after 2 seconds
freezer.tick(datetime.timedelta(seconds=2))
async_fire_time_changed(hass)
hass.states.async_set(entity_a, STATE_OFF)
await hass.async_block_till_done()
# Advance past original duration — should NOT fire
freezer.tick(datetime.timedelta(seconds=10))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 0
unsub()
async def test_entity_trigger_duration_last_timer_reset(
hass: HomeAssistant, freezer: FrozenDateTimeFactory
) -> None:
"""Test behavior last: timer resets when combined state goes off and back on."""
entity_a = "test.entity_a"
entity_b = "test.entity_b"
hass.states.async_set(entity_a, STATE_OFF)
hass.states.async_set(entity_b, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_off_to_on_trigger(
hass, [entity_a, entity_b], BEHAVIOR_LAST, calls, duration={"seconds": 5}
)
# Turn both on — combined state "all on", timer starts
hass.states.async_set(entity_a, STATE_ON)
await hass.async_block_till_done()
hass.states.async_set(entity_b, STATE_ON)
await hass.async_block_till_done()
# After 2 seconds, B turns off — combined state breaks, timer cancelled
freezer.tick(datetime.timedelta(seconds=2))
async_fire_time_changed(hass)
hass.states.async_set(entity_b, STATE_OFF)
await hass.async_block_till_done()
# B turns back on — combined state restored, timer restarts
freezer.tick(datetime.timedelta(seconds=1))
async_fire_time_changed(hass)
hass.states.async_set(entity_b, STATE_ON)
await hass.async_block_till_done()
# 4 seconds after restart (not enough) — should NOT fire
freezer.tick(datetime.timedelta(seconds=4))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 0
# 1 more second (5 total from restart) — should fire
freezer.tick(datetime.timedelta(seconds=1))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 1
unsub()
async def test_entity_trigger_duration_first_fires_when_any_on(
hass: HomeAssistant, freezer: FrozenDateTimeFactory
) -> None:
"""Test behavior first: trigger fires when first entity turns on for duration."""
entity_a = "test.entity_a"
entity_b = "test.entity_b"
hass.states.async_set(entity_a, STATE_OFF)
hass.states.async_set(entity_b, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_off_to_on_trigger(
hass, [entity_a, entity_b], BEHAVIOR_FIRST, calls, duration={"seconds": 5}
)
# Turn A on — combined state goes to "at least one on", timer starts
hass.states.async_set(entity_a, STATE_ON)
await hass.async_block_till_done()
assert len(calls) == 0
# Advance past duration — should fire
freezer.tick(datetime.timedelta(seconds=6))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 1
unsub()
async def test_entity_trigger_duration_first_not_cancelled_by_second(
hass: HomeAssistant, freezer: FrozenDateTimeFactory
) -> None:
"""Test behavior first: second entity turning on doesn't restart timer."""
entity_a = "test.entity_a"
entity_b = "test.entity_b"
hass.states.async_set(entity_a, STATE_OFF)
hass.states.async_set(entity_b, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_off_to_on_trigger(
hass, [entity_a, entity_b], BEHAVIOR_FIRST, calls, duration={"seconds": 5}
)
# Turn A on
hass.states.async_set(entity_a, STATE_ON)
await hass.async_block_till_done()
# Turn B on 3 seconds later — combined state was already "any on",
# so this should NOT restart the timer
freezer.tick(datetime.timedelta(seconds=3))
async_fire_time_changed(hass)
hass.states.async_set(entity_b, STATE_ON)
await hass.async_block_till_done()
assert len(calls) == 0
# 2 more seconds (5 total from A) — should fire
freezer.tick(datetime.timedelta(seconds=2))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 1
unsub()
async def test_entity_trigger_duration_first_not_cancelled_by_partial_off(
hass: HomeAssistant, freezer: FrozenDateTimeFactory
) -> None:
"""Test behavior first: one entity off doesn't cancel if another is still on."""
entity_a = "test.entity_a"
entity_b = "test.entity_b"
hass.states.async_set(entity_a, STATE_OFF)
hass.states.async_set(entity_b, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_off_to_on_trigger(
hass, [entity_a, entity_b], BEHAVIOR_FIRST, calls, duration={"seconds": 5}
)
# Turn both on
hass.states.async_set(entity_a, STATE_ON)
await hass.async_block_till_done()
hass.states.async_set(entity_b, STATE_ON)
await hass.async_block_till_done()
# Turn A off after 2 seconds — combined state still "any on" (B is on)
freezer.tick(datetime.timedelta(seconds=2))
async_fire_time_changed(hass)
hass.states.async_set(entity_a, STATE_OFF)
await hass.async_block_till_done()
# Advance past duration — should still fire (combined state never went to "none on")
freezer.tick(datetime.timedelta(seconds=4))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 1
unsub()
async def test_entity_trigger_duration_first_cancelled_when_all_off(
hass: HomeAssistant, freezer: FrozenDateTimeFactory
) -> None:
"""Test behavior first: timer cancelled when ALL entities turn off."""
entity_a = "test.entity_a"
entity_b = "test.entity_b"
hass.states.async_set(entity_a, STATE_OFF)
hass.states.async_set(entity_b, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_off_to_on_trigger(
hass, [entity_a, entity_b], BEHAVIOR_FIRST, calls, duration={"seconds": 5}
)
# Turn both on
hass.states.async_set(entity_a, STATE_ON)
hass.states.async_set(entity_b, STATE_ON)
await hass.async_block_till_done()
# Turn both off after 2 seconds — combined state goes to "none on"
freezer.tick(datetime.timedelta(seconds=2))
async_fire_time_changed(hass)
hass.states.async_set(entity_a, STATE_OFF)
hass.states.async_set(entity_b, STATE_OFF)
await hass.async_block_till_done()
# Advance past original duration — should NOT fire
freezer.tick(datetime.timedelta(seconds=10))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 0
unsub()
async def test_entity_trigger_duration_any_retrigger_resets_timer(
hass: HomeAssistant, freezer: FrozenDateTimeFactory
) -> None:
"""Test behavior any: turning an entity off and on resets its timer."""
entity_id = "test.entity_1"
hass.states.async_set(entity_id, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_off_to_on_trigger(
hass, [entity_id], BEHAVIOR_ANY, calls, duration={"seconds": 5}
)
# Turn on
hass.states.async_set(entity_id, STATE_ON)
await hass.async_block_till_done()
# After 3 seconds, turn off and on again — resets the timer
freezer.tick(datetime.timedelta(seconds=3))
async_fire_time_changed(hass)
hass.states.async_set(entity_id, STATE_OFF)
await hass.async_block_till_done()
hass.states.async_set(entity_id, STATE_ON)
await hass.async_block_till_done()
# 3 more seconds (6 from start, but only 3 from retrigger) — should NOT fire
freezer.tick(datetime.timedelta(seconds=3))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 0
# 2 more seconds (5 from retrigger) — should fire
freezer.tick(datetime.timedelta(seconds=2))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 1
unsub()
@pytest.mark.parametrize(
("behavior", "expected_calls"),
[(BEHAVIOR_ANY, 0), (BEHAVIOR_FIRST, 0), (BEHAVIOR_LAST, 1)],
)
@pytest.mark.parametrize(
"invalid_state",
[STATE_UNAVAILABLE, STATE_UNKNOWN, None],
ids=["unavailable", "unknown", "removed"],
)
async def test_entity_trigger_duration_cancelled_on_invalid_state(
hass: HomeAssistant,
freezer: FrozenDateTimeFactory,
behavior: str,
expected_calls: int,
invalid_state: str | None,
) -> None:
"""Test if the duration timer is cancelled if entity becomes unavailable, unknown, or is removed.
This is expected to happen in first and any modes, but not in last mode.
"""
entity_a = "test.entity_a"
entity_b = "test.entity_b"
_set_or_remove_state(hass, entity_a, STATE_OFF)
_set_or_remove_state(hass, entity_b, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_off_to_on_trigger(
hass, [entity_a, entity_b], behavior, calls, duration={"seconds": 5}
)
# Turn on the entities needed to start the timer
_set_or_remove_state(hass, entity_a, STATE_ON)
await hass.async_block_till_done()
if behavior == BEHAVIOR_LAST:
_set_or_remove_state(hass, entity_b, STATE_ON)
await hass.async_block_till_done()
# Entity A becomes invalid during the wait
freezer.tick(datetime.timedelta(seconds=2))
async_fire_time_changed(hass)
_set_or_remove_state(hass, entity_a, invalid_state)
await hass.async_block_till_done()
# Advance past the original duration — should NOT fire
freezer.tick(datetime.timedelta(seconds=10))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == expected_calls
unsub()