Compare commits

..

2 Commits

Author SHA1 Message Date
Ariel Ebersberger
e937b09652 Merge branch 'dev' into python-3.14.3 2026-04-15 19:04:03 +02:00
Jan Čermák
427faf4854 Bump base image to 2026.02.0 with Python 3.14.3, use 3.14.3 in CI
This also bumps libcec used in the base image to 7.1.1, full changelog:
* https://github.com/home-assistant/docker/releases/tag/2026.02.0

Python changelog:
* https://docs.python.org/release/3.14.3/whatsnew/changelog.html
2026-04-10 10:23:28 +02:00
38 changed files with 454 additions and 921 deletions

View File

@@ -186,11 +186,15 @@ If `CHANGE_TYPE` IS "Breaking change" or "Deprecation", keep the `## Breaking ch
## Step 10: Push Branch and Create PR
Push the branch with upstream tracking, and create a PR against `home-assistant/core` with the generated title and body:
```bash
# Get branch name and GitHub username
BRANCH=$(git branch --show-current)
PUSH_REMOTE=$(git config "branch.$BRANCH.remote" 2>/dev/null || git remote | head -1)
GITHUB_USER=$(gh api user --jq .login 2>/dev/null || git remote get-url "$PUSH_REMOTE" | sed -E 's#.*[:/]([^/]+)/([^/]+)(\.git)?$#\1#')
# Create PR (gh pr create pushes the branch automatically)
gh pr create --repo home-assistant/core --base dev \
--head "$GITHUB_USER:$BRANCH" \
--draft \
--title "TITLE_HERE" \
--body "$(cat <<'EOF'

44
.github/renovate.json vendored
View File

@@ -78,50 +78,6 @@
"enabled": true,
"labels": ["dependency", "core"]
},
{
"description": "Common Python utilities (allowlisted)",
"matchPackageNames": [
"astral",
"atomicwrites-homeassistant",
"audioop-lts",
"awesomeversion",
"bcrypt",
"ciso8601",
"cronsim",
"defusedxml",
"fnv-hash-fast",
"getmac",
"ical",
"ifaddr",
"lru-dict",
"mutagen",
"propcache",
"pyserial",
"python-slugify",
"PyTurboJPEG",
"securetar",
"standard-aifc",
"standard-telnetlib",
"ulid-transform",
"url-normalize",
"xmltodict"
],
"enabled": true,
"labels": ["dependency"]
},
{
"description": "Home Assistant ecosystem packages (core-maintained, no cooldown)",
"matchPackageNames": [
"hassil",
"home-assistant-bluetooth",
"home-assistant-frontend",
"home-assistant-intents",
"infrared-protocols"
],
"enabled": true,
"minimumReleaseAge": null,
"labels": ["dependency", "core"]
},
{
"description": "Test dependencies (allowlisted)",
"matchPackageNames": [

View File

@@ -14,7 +14,7 @@ env:
UV_HTTP_TIMEOUT: 60
UV_SYSTEM_PYTHON: "true"
# Base image version from https://github.com/home-assistant/docker
BASE_IMAGE_VERSION: "2026.01.0"
BASE_IMAGE_VERSION: "2026.02.0"
ARCHITECTURES: '["amd64", "aarch64"]'
permissions: {}

View File

@@ -21,7 +21,7 @@ jobs:
steps:
- name: Check if integration label was added and extract details
id: extract
uses: actions/github-script@3a2844b7e9c422d3c10d287c895573f7108da1b3 # v9.0.0
uses: actions/github-script@ed597411d8f924073f98dfc5c65a23a2325f34cd # v8.0.0
with:
script: |
// Debug: Log the event payload
@@ -118,7 +118,7 @@ jobs:
- name: Fetch similar issues
id: fetch_similar
if: steps.extract.outputs.should_continue == 'true'
uses: actions/github-script@3a2844b7e9c422d3c10d287c895573f7108da1b3 # v9.0.0
uses: actions/github-script@ed597411d8f924073f98dfc5c65a23a2325f34cd # v8.0.0
env:
INTEGRATION_LABELS: ${{ steps.extract.outputs.integration_labels }}
CURRENT_NUMBER: ${{ steps.extract.outputs.current_number }}
@@ -285,7 +285,7 @@ jobs:
- name: Post duplicate detection results
id: post_results
if: steps.extract.outputs.should_continue == 'true' && steps.fetch_similar.outputs.has_similar == 'true'
uses: actions/github-script@3a2844b7e9c422d3c10d287c895573f7108da1b3 # v9.0.0
uses: actions/github-script@ed597411d8f924073f98dfc5c65a23a2325f34cd # v8.0.0
env:
AI_RESPONSE: ${{ steps.ai_detection.outputs.response }}
SIMILAR_ISSUES: ${{ steps.fetch_similar.outputs.similar_issues }}

View File

@@ -21,7 +21,7 @@ jobs:
steps:
- name: Check issue language
id: detect_language
uses: actions/github-script@3a2844b7e9c422d3c10d287c895573f7108da1b3 # v9.0.0
uses: actions/github-script@ed597411d8f924073f98dfc5c65a23a2325f34cd # v8.0.0
env:
ISSUE_NUMBER: ${{ github.event.issue.number }}
ISSUE_TITLE: ${{ github.event.issue.title }}
@@ -95,7 +95,7 @@ jobs:
- name: Process non-English issues
if: steps.detect_language.outputs.should_continue == 'true'
uses: actions/github-script@3a2844b7e9c422d3c10d287c895573f7108da1b3 # v9.0.0
uses: actions/github-script@ed597411d8f924073f98dfc5c65a23a2325f34cd # v8.0.0
env:
AI_RESPONSE: ${{ steps.ai_language_detection.outputs.response }}
ISSUE_NUMBER: ${{ steps.detect_language.outputs.issue_number }}

View File

@@ -22,7 +22,7 @@ jobs:
|| github.event.issue.type.name == 'Opportunity'
steps:
- name: Add no-stale label
uses: actions/github-script@3a2844b7e9c422d3c10d287c895573f7108da1b3 # v9.0.0
uses: actions/github-script@ed597411d8f924073f98dfc5c65a23a2325f34cd # v8.0.0
with:
script: |
await github.rest.issues.addLabels({
@@ -42,7 +42,7 @@ jobs:
if: github.event.issue.type.name == 'Task'
steps:
- name: Check if user is authorized
uses: actions/github-script@3a2844b7e9c422d3c10d287c895573f7108da1b3 # v9.0.0
uses: actions/github-script@ed597411d8f924073f98dfc5c65a23a2325f34cd # v8.0.0
with:
script: |
const issueAuthor = context.payload.issue.user.login;

View File

@@ -1 +1 @@
3.14.2
3.14.3

View File

@@ -6,11 +6,10 @@ from typing import Final
from homeassistant.const import STATE_OFF, STATE_ON
CONF_READ_TIMEOUT: Final = "timeout"
CONF_WRITE_TIMEOUT: Final = "write_timeout"
DEFAULT_NAME: Final = "Acer Projector"
DEFAULT_READ_TIMEOUT: Final = 1
DEFAULT_TIMEOUT: Final = 1
DEFAULT_WRITE_TIMEOUT: Final = 1
ECO_MODE: Final = "ECO Mode"

View File

@@ -5,5 +5,5 @@
"documentation": "https://www.home-assistant.io/integrations/acer_projector",
"iot_class": "local_polling",
"quality_scale": "legacy",
"requirements": ["serialx==1.2.2"]
"requirements": ["pyserial==3.5"]
}

View File

@@ -6,7 +6,7 @@ import logging
import re
from typing import Any
from serialx import Serial, SerialException
import serial
import voluptuous as vol
from homeassistant.components.switch import (
@@ -16,22 +16,21 @@ from homeassistant.components.switch import (
from homeassistant.const import (
CONF_FILENAME,
CONF_NAME,
CONF_TIMEOUT,
STATE_OFF,
STATE_ON,
STATE_UNKNOWN,
)
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from .const import (
CMD_DICT,
CONF_READ_TIMEOUT,
CONF_WRITE_TIMEOUT,
DEFAULT_NAME,
DEFAULT_READ_TIMEOUT,
DEFAULT_TIMEOUT,
DEFAULT_WRITE_TIMEOUT,
ECO_MODE,
ICON,
@@ -46,7 +45,7 @@ PLATFORM_SCHEMA = SWITCH_PLATFORM_SCHEMA.extend(
{
vol.Required(CONF_FILENAME): cv.isdevice,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_READ_TIMEOUT, default=DEFAULT_READ_TIMEOUT): cv.positive_int,
vol.Optional(CONF_TIMEOUT, default=DEFAULT_TIMEOUT): cv.positive_int,
vol.Optional(
CONF_WRITE_TIMEOUT, default=DEFAULT_WRITE_TIMEOUT
): cv.positive_int,
@@ -63,10 +62,10 @@ def setup_platform(
"""Connect with serial port and return Acer Projector."""
serial_port = config[CONF_FILENAME]
name = config[CONF_NAME]
read_timeout = config[CONF_READ_TIMEOUT]
timeout = config[CONF_TIMEOUT]
write_timeout = config[CONF_WRITE_TIMEOUT]
add_entities([AcerSwitch(serial_port, name, read_timeout, write_timeout)], True)
add_entities([AcerSwitch(serial_port, name, timeout, write_timeout)], True)
class AcerSwitch(SwitchEntity):
@@ -78,14 +77,14 @@ class AcerSwitch(SwitchEntity):
self,
serial_port: str,
name: str,
read_timeout: int,
timeout: int,
write_timeout: int,
) -> None:
"""Init of the Acer projector."""
self.serial = serial.Serial(
port=serial_port, timeout=timeout, write_timeout=write_timeout
)
self._serial_port = serial_port
self._read_timeout = read_timeout
self._write_timeout = write_timeout
self._attr_name = name
self._attributes = {
LAMP_HOURS: STATE_UNKNOWN,
@@ -95,26 +94,22 @@ class AcerSwitch(SwitchEntity):
def _write_read(self, msg: str) -> str:
"""Write to the projector and read the return."""
ret = ""
# Sometimes the projector won't answer for no reason or the projector
# was disconnected during runtime.
# This way the projector can be reconnected and will still work
try:
with Serial.from_url(
self._serial_port,
read_timeout=self._read_timeout,
write_timeout=self._write_timeout,
) as serial:
serial.write(msg.encode("utf-8"))
# Size is an experience value there is no real limit.
# AFAIK there is no limit and no end character so we will usually
# need to wait for timeout
return serial.read_until(size=20).decode("utf-8")
except (OSError, SerialException, TimeoutError) as exc:
raise HomeAssistantError(
f"Problem communicating with {self._serial_port}"
) from exc
if not self.serial.is_open:
self.serial.open()
self.serial.write(msg.encode("utf-8"))
# Size is an experience value there is no real limit.
# AFAIK there is no limit and no end character so we will usually
# need to wait for timeout
ret = self.serial.read_until(size=20).decode("utf-8")
except serial.SerialException:
_LOGGER.error("Problem communicating with %s", self._serial_port)
self.serial.close()
return ret
def _write_read_format(self, msg: str) -> str:
"""Write msg, obtain answer and format output."""

View File

@@ -7,5 +7,5 @@
"documentation": "https://www.home-assistant.io/integrations/camera",
"integration_type": "entity",
"quality_scale": "internal",
"requirements": ["PyTurboJPEG==1.8.3"]
"requirements": ["PyTurboJPEG==1.8.0"]
}

View File

@@ -4,7 +4,6 @@ from __future__ import annotations
from typing import Any
from homeassistant.core import callback
from homeassistant.helpers.entity import Entity
from .const import DOMAIN, EVENT_HDMI_CEC_UNAVAILABLE
@@ -56,10 +55,9 @@ class CecEntity(Entity):
else:
self._attr_name = f"{self._device.type_name} {self._logical_address} ({self._device.osd_name})"
@callback
def _hdmi_cec_unavailable(self, callback_event):
self._attr_available = False
self.async_write_ha_state()
self.schedule_update_ha_state(False)
async def async_added_to_hass(self) -> None:
"""Register HDMI callbacks after initialization."""

View File

@@ -3,6 +3,7 @@
from __future__ import annotations
import logging
from typing import Any
from pycec.commands import CecCommand, KeyPressCommand, KeyReleaseCommand
from pycec.const import (
@@ -30,6 +31,7 @@ from homeassistant.components.media_player import (
MediaPlayerEntity,
MediaPlayerEntityFeature,
MediaPlayerState,
MediaType,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
@@ -43,20 +45,20 @@ _LOGGER = logging.getLogger(__name__)
ENTITY_ID_FORMAT = MP_DOMAIN + ".{}"
async def async_setup_platform(
def setup_platform(
hass: HomeAssistant,
config: ConfigType,
async_add_entities: AddEntitiesCallback,
add_entities: AddEntitiesCallback,
discovery_info: DiscoveryInfoType | None = None,
) -> None:
"""Find and return HDMI devices as media players."""
"""Find and return HDMI devices as +switches."""
if discovery_info and ATTR_NEW in discovery_info:
_LOGGER.debug("Setting up HDMI devices %s", discovery_info[ATTR_NEW])
entities = []
for device in discovery_info[ATTR_NEW]:
hdmi_device = hass.data[DOMAIN][device]
entities.append(CecPlayerEntity(hdmi_device, hdmi_device.logical_address))
async_add_entities(entities, True)
add_entities(entities, True)
class CecPlayerEntity(CecEntity, MediaPlayerEntity):
@@ -77,61 +79,78 @@ class CecPlayerEntity(CecEntity, MediaPlayerEntity):
def send_playback(self, key):
"""Send playback status to CEC adapter."""
self._device.send_command(CecCommand(key, dst=self._logical_address))
self._device.async_send_command(CecCommand(key, dst=self._logical_address))
async def async_mute_volume(self, mute: bool) -> None:
def mute_volume(self, mute: bool) -> None:
"""Mute volume."""
self.send_keypress(KEY_MUTE_TOGGLE)
async def async_media_previous_track(self) -> None:
def media_previous_track(self) -> None:
"""Go to previous track."""
self.send_keypress(KEY_BACKWARD)
async def async_turn_on(self) -> None:
def turn_on(self) -> None:
"""Turn device on."""
self._device.turn_on()
self._attr_state = MediaPlayerState.ON
self.async_write_ha_state()
async def async_turn_off(self) -> None:
def clear_playlist(self) -> None:
"""Clear players playlist."""
raise NotImplementedError
def turn_off(self) -> None:
"""Turn device off."""
self._device.turn_off()
self._attr_state = MediaPlayerState.OFF
self.async_write_ha_state()
async def async_media_stop(self) -> None:
def media_stop(self) -> None:
"""Stop playback."""
self.send_keypress(KEY_STOP)
self._attr_state = MediaPlayerState.IDLE
self.async_write_ha_state()
async def async_media_next_track(self) -> None:
def play_media(
self, media_type: MediaType | str, media_id: str, **kwargs: Any
) -> None:
"""Not supported."""
raise NotImplementedError
def media_next_track(self) -> None:
"""Skip to next track."""
self.send_keypress(KEY_FORWARD)
async def async_media_pause(self) -> None:
def media_seek(self, position: float) -> None:
"""Not supported."""
raise NotImplementedError
def set_volume_level(self, volume: float) -> None:
"""Set volume level, range 0..1."""
raise NotImplementedError
def media_pause(self) -> None:
"""Pause playback."""
self.send_keypress(KEY_PAUSE)
self._attr_state = MediaPlayerState.PAUSED
self.async_write_ha_state()
async def async_media_play(self) -> None:
def select_source(self, source: str) -> None:
"""Not supported."""
raise NotImplementedError
def media_play(self) -> None:
"""Start playback."""
self.send_keypress(KEY_PLAY)
self._attr_state = MediaPlayerState.PLAYING
self.async_write_ha_state()
async def async_volume_up(self) -> None:
def volume_up(self) -> None:
"""Increase volume."""
_LOGGER.debug("%s: volume up", self._logical_address)
self.send_keypress(KEY_VOLUME_UP)
async def async_volume_down(self) -> None:
def volume_down(self) -> None:
"""Decrease volume."""
_LOGGER.debug("%s: volume down", self._logical_address)
self.send_keypress(KEY_VOLUME_DOWN)
async def async_update(self) -> None:
def update(self) -> None:
"""Update device status."""
device = self._device
if device.power_status in [POWER_OFF, 3]:

View File

@@ -20,10 +20,10 @@ _LOGGER = logging.getLogger(__name__)
ENTITY_ID_FORMAT = SWITCH_DOMAIN + ".{}"
async def async_setup_platform(
def setup_platform(
hass: HomeAssistant,
config: ConfigType,
async_add_entities: AddEntitiesCallback,
add_entities: AddEntitiesCallback,
discovery_info: DiscoveryInfoType | None = None,
) -> None:
"""Find and return HDMI devices as switches."""
@@ -33,7 +33,7 @@ async def async_setup_platform(
for device in discovery_info[ATTR_NEW]:
hdmi_device = hass.data[DOMAIN][device]
entities.append(CecSwitchEntity(hdmi_device, hdmi_device.logical_address))
async_add_entities(entities, True)
add_entities(entities, True)
class CecSwitchEntity(CecEntity, SwitchEntity):
@@ -44,19 +44,19 @@ class CecSwitchEntity(CecEntity, SwitchEntity):
CecEntity.__init__(self, device, logical)
self.entity_id = f"{SWITCH_DOMAIN}.hdmi_{hex(self._logical_address)[2:]}"
async def async_turn_on(self, **kwargs: Any) -> None:
def turn_on(self, **kwargs: Any) -> None:
"""Turn device on."""
self._device.turn_on()
self._attr_is_on = True
self.async_write_ha_state()
self.schedule_update_ha_state(force_refresh=False)
async def async_turn_off(self, **kwargs: Any) -> None:
def turn_off(self, **kwargs: Any) -> None:
"""Turn device off."""
self._device.turn_off()
self._attr_is_on = False
self.async_write_ha_state()
self.schedule_update_ha_state(force_refresh=False)
async def async_update(self) -> None:
def update(self) -> None:
"""Update device status."""
device = self._device
if device.power_status in {POWER_OFF, 3}:

View File

@@ -10,7 +10,7 @@
"loggers": ["pyhap"],
"requirements": [
"HAP-python==5.0.0",
"fnv-hash-fast==2.0.2",
"fnv-hash-fast==2.0.0",
"homekit-audio-proxy==1.2.1",
"PyQRCode==1.2.1",
"base36==0.1.1"

View File

@@ -1,8 +1,7 @@
{
"common": {
"condition_behavior_name": "Condition passes if",
"trigger_behavior_name": "Trigger when",
"trigger_for_name": "For at least"
"trigger_behavior_name": "Trigger when"
},
"conditions": {
"is_detected": {
@@ -46,9 +45,6 @@
"fields": {
"behavior": {
"name": "[%key:component::motion::common::trigger_behavior_name%]"
},
"for": {
"name": "[%key:component::motion::common::trigger_for_name%]"
}
},
"name": "Motion cleared"
@@ -58,9 +54,6 @@
"fields": {
"behavior": {
"name": "[%key:component::motion::common::trigger_behavior_name%]"
},
"for": {
"name": "[%key:component::motion::common::trigger_for_name%]"
}
},
"name": "Motion detected"

View File

@@ -9,11 +9,6 @@
- first
- last
- any
for:
required: true
default: 00:00:00
selector:
duration:
detected:
fields: *trigger_common_fields

View File

@@ -8,7 +8,7 @@
"quality_scale": "internal",
"requirements": [
"SQLAlchemy==2.0.49",
"fnv-hash-fast==2.0.2",
"fnv-hash-fast==2.0.0",
"psutil-home-assistant==0.0.1"
]
}

View File

@@ -4,5 +4,5 @@
"codeowners": ["@fabaff"],
"documentation": "https://www.home-assistant.io/integrations/serial",
"iot_class": "local_polling",
"requirements": ["serialx==1.2.2"]
"requirements": ["pyserial-asyncio-fast==0.16"]
}

View File

@@ -3,11 +3,11 @@
from __future__ import annotations
import asyncio
from asyncio import Task
import json
import logging
from serialx import Parity, SerialException, StopBits, open_serial_connection
from serial import SerialException
import serial_asyncio_fast as serial_asyncio
import voluptuous as vol
from homeassistant.components.sensor import (
@@ -18,7 +18,6 @@ from homeassistant.const import CONF_NAME, CONF_VALUE_TEMPLATE, EVENT_HOMEASSIST
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.template import Template
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
_LOGGER = logging.getLogger(__name__)
@@ -34,9 +33,9 @@ CONF_DSRDTR = "dsrdtr"
DEFAULT_NAME = "Serial Sensor"
DEFAULT_BAUDRATE = 9600
DEFAULT_BYTESIZE = 8
DEFAULT_PARITY = Parity.NONE
DEFAULT_STOPBITS = StopBits.ONE
DEFAULT_BYTESIZE = serial_asyncio.serial.EIGHTBITS
DEFAULT_PARITY = serial_asyncio.serial.PARITY_NONE
DEFAULT_STOPBITS = serial_asyncio.serial.STOPBITS_ONE
DEFAULT_XONXOFF = False
DEFAULT_RTSCTS = False
DEFAULT_DSRDTR = False
@@ -47,21 +46,28 @@ PLATFORM_SCHEMA = SENSOR_PLATFORM_SCHEMA.extend(
vol.Optional(CONF_BAUDRATE, default=DEFAULT_BAUDRATE): cv.positive_int,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
vol.Optional(CONF_VALUE_TEMPLATE): cv.template,
vol.Optional(CONF_BYTESIZE, default=DEFAULT_BYTESIZE): vol.In([5, 6, 7, 8]),
vol.Optional(CONF_BYTESIZE, default=DEFAULT_BYTESIZE): vol.In(
[
serial_asyncio.serial.FIVEBITS,
serial_asyncio.serial.SIXBITS,
serial_asyncio.serial.SEVENBITS,
serial_asyncio.serial.EIGHTBITS,
]
),
vol.Optional(CONF_PARITY, default=DEFAULT_PARITY): vol.In(
[
Parity.NONE,
Parity.EVEN,
Parity.ODD,
Parity.MARK,
Parity.SPACE,
serial_asyncio.serial.PARITY_NONE,
serial_asyncio.serial.PARITY_EVEN,
serial_asyncio.serial.PARITY_ODD,
serial_asyncio.serial.PARITY_MARK,
serial_asyncio.serial.PARITY_SPACE,
]
),
vol.Optional(CONF_STOPBITS, default=DEFAULT_STOPBITS): vol.In(
[
StopBits.ONE,
StopBits.ONE_POINT_FIVE,
StopBits.TWO,
serial_asyncio.serial.STOPBITS_ONE,
serial_asyncio.serial.STOPBITS_ONE_POINT_FIVE,
serial_asyncio.serial.STOPBITS_TWO,
]
),
vol.Optional(CONF_XONXOFF, default=DEFAULT_XONXOFF): cv.boolean,
@@ -78,17 +84,28 @@ async def async_setup_platform(
discovery_info: DiscoveryInfoType | None = None,
) -> None:
"""Set up the Serial sensor platform."""
name = config.get(CONF_NAME)
port = config.get(CONF_SERIAL_PORT)
baudrate = config.get(CONF_BAUDRATE)
bytesize = config.get(CONF_BYTESIZE)
parity = config.get(CONF_PARITY)
stopbits = config.get(CONF_STOPBITS)
xonxoff = config.get(CONF_XONXOFF)
rtscts = config.get(CONF_RTSCTS)
dsrdtr = config.get(CONF_DSRDTR)
value_template = config.get(CONF_VALUE_TEMPLATE)
sensor = SerialSensor(
name=config[CONF_NAME],
port=config[CONF_SERIAL_PORT],
baudrate=config[CONF_BAUDRATE],
bytesize=config[CONF_BYTESIZE],
parity=config[CONF_PARITY],
stopbits=config[CONF_STOPBITS],
xonxoff=config[CONF_XONXOFF],
rtscts=config[CONF_RTSCTS],
dsrdtr=config[CONF_DSRDTR],
value_template=config.get(CONF_VALUE_TEMPLATE),
name,
port,
baudrate,
bytesize,
parity,
stopbits,
xonxoff,
rtscts,
dsrdtr,
value_template,
)
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, sensor.stop_serial_read)
@@ -102,17 +119,17 @@ class SerialSensor(SensorEntity):
def __init__(
self,
name: str,
port: str,
baudrate: int,
bytesize: int,
parity: Parity,
stopbits: StopBits,
xonxoff: bool,
rtscts: bool,
dsrdtr: bool,
value_template: Template | None,
) -> None:
name,
port,
baudrate,
bytesize,
parity,
stopbits,
xonxoff,
rtscts,
dsrdtr,
value_template,
):
"""Initialize the Serial sensor."""
self._attr_name = name
self._port = port
@@ -123,12 +140,12 @@ class SerialSensor(SensorEntity):
self._xonxoff = xonxoff
self._rtscts = rtscts
self._dsrdtr = dsrdtr
self._serial_loop_task: Task[None] | None = None
self._serial_loop_task = None
self._template = value_template
async def async_added_to_hass(self) -> None:
"""Handle when an entity is about to be added to Home Assistant."""
self._serial_loop_task = self.hass.async_create_background_task(
self._serial_loop_task = self.hass.loop.create_task(
self.serial_read(
self._port,
self._baudrate,
@@ -138,31 +155,26 @@ class SerialSensor(SensorEntity):
self._xonxoff,
self._rtscts,
self._dsrdtr,
),
"Serial reader",
)
)
async def serial_read(
self,
device: str,
baudrate: int,
bytesize: int,
parity: Parity,
stopbits: StopBits,
xonxoff: bool,
rtscts: bool,
dsrdtr: bool,
device,
baudrate,
bytesize,
parity,
stopbits,
xonxoff,
rtscts,
dsrdtr,
**kwargs,
):
"""Read the data from the port."""
logged_error = False
while True:
reader = None
writer = None
try:
reader, writer = await open_serial_connection(
reader, _ = await serial_asyncio.open_serial_connection(
url=device,
baudrate=baudrate,
bytesize=bytesize,
@@ -173,7 +185,8 @@ class SerialSensor(SensorEntity):
dsrdtr=dsrdtr,
**kwargs,
)
except OSError, SerialException, TimeoutError:
except SerialException:
if not logged_error:
_LOGGER.exception(
"Unable to connect to the serial device %s. Will retry", device
@@ -184,15 +197,15 @@ class SerialSensor(SensorEntity):
_LOGGER.debug("Serial device %s connected", device)
while True:
try:
line_bytes = await reader.readline()
except OSError, SerialException:
line = await reader.readline()
except SerialException:
_LOGGER.exception(
"Error while reading serial device %s", device
)
await self._handle_error()
break
else:
line = line_bytes.decode("utf-8").strip()
line = line.decode("utf-8").strip()
try:
data = json.loads(line)
@@ -210,10 +223,6 @@ class SerialSensor(SensorEntity):
_LOGGER.debug("Received: %s", line)
self._attr_native_value = line
self.async_write_ha_state()
finally:
if writer is not None:
writer.close()
await writer.wait_closed()
async def _handle_error(self):
"""Handle error for serial connection."""

View File

@@ -7,5 +7,5 @@
"integration_type": "system",
"iot_class": "local_push",
"quality_scale": "internal",
"requirements": ["PyTurboJPEG==1.8.3", "av==16.0.1", "numpy==2.3.2"]
"requirements": ["PyTurboJPEG==1.8.0", "av==16.0.1", "numpy==2.3.2"]
}

View File

@@ -8,5 +8,5 @@
"iot_class": "cloud_polling",
"loggers": ["twentemilieu"],
"quality_scale": "silver",
"requirements": ["twentemilieu==3.0.0"]
"requirements": ["twentemilieu==2.2.1"]
}

View File

@@ -26,20 +26,24 @@ from homeassistant.core import (
from homeassistant.helpers import config_validation as cv, discovery_flow
from homeassistant.helpers.debounce import Debouncer
from homeassistant.helpers.event import async_track_time_interval
from homeassistant.helpers.service_info.usb import UsbServiceInfo
from homeassistant.helpers.service_info.usb import UsbServiceInfo as _UsbServiceInfo
from homeassistant.helpers.typing import ConfigType
from homeassistant.loader import USBMatcher, async_get_usb
from homeassistant.util.hass_dict import HassKey
from .const import DOMAIN
from .models import SerialDevice, USBDevice
from .models import (
SerialDevice, # noqa: F401
USBDevice,
)
from .utils import (
async_scan_serial_ports,
scan_serial_ports,
usb_device_from_path,
scan_serial_ports, # noqa: F401
usb_device_from_path, # noqa: F401
usb_device_from_port, # noqa: F401
usb_device_matches_matcher,
usb_service_info_from_device,
usb_unique_id_from_service_info,
usb_unique_id_from_service_info, # noqa: F401
)
_LOGGER = logging.getLogger(__name__)
@@ -52,17 +56,9 @@ REQUEST_SCAN_COOLDOWN = 10 # 10 second cooldown
ADD_REMOVE_SCAN_COOLDOWN = 5 # 5 second cooldown to give devices a chance to register
__all__ = [
"SerialDevice",
"USBCallbackMatcher",
"USBDevice",
"async_register_port_event_callback",
"async_register_scan_request_callback",
"async_scan_serial_ports",
"scan_serial_ports",
"usb_device_from_path",
"usb_device_matches_matcher",
"usb_service_info_from_device",
"usb_unique_id_from_service_info",
]
CONFIG_SCHEMA = cv.empty_config_schema(DOMAIN)
@@ -362,7 +358,7 @@ class USBDiscovery:
for matcher in matched:
for flow in self.hass.config_entries.flow.async_progress_by_init_data_type(
UsbServiceInfo,
_UsbServiceInfo,
lambda flow_service_info: flow_service_info == service_info,
):
if matcher["domain"] != flow["handler"]:

View File

@@ -7,5 +7,5 @@
"integration_type": "system",
"iot_class": "local_push",
"quality_scale": "internal",
"requirements": ["aiousbwatcher==1.1.1", "serialx==1.2.2"]
"requirements": ["aiousbwatcher==1.1.1", "pyserial==3.5"]
}

View File

@@ -3,10 +3,12 @@
from __future__ import annotations
from collections.abc import Sequence
import dataclasses
import fnmatch
import os
from serialx import SerialPortInfo, list_serial_ports
from serial.tools.list_ports import comports
from serial.tools.list_ports_common import ListPortInfo
from homeassistant.core import HomeAssistant
from homeassistant.helpers.service_info.usb import UsbServiceInfo
@@ -15,8 +17,8 @@ from homeassistant.loader import USBMatcher
from .models import SerialDevice, USBDevice
def usb_device_from_port(port: SerialPortInfo) -> USBDevice:
"""Convert serialx SerialPortInfo to USBDevice."""
def usb_device_from_port(port: ListPortInfo) -> USBDevice:
"""Convert serial ListPortInfo to USBDevice."""
assert port.vid is not None
assert port.pid is not None
@@ -26,30 +28,53 @@ def usb_device_from_port(port: SerialPortInfo) -> USBDevice:
pid=f"{hex(port.pid)[2:]:0>4}".upper(),
serial_number=port.serial_number,
manufacturer=port.manufacturer,
description=port.product,
description=port.description,
)
def serial_device_from_port(port: SerialPortInfo) -> SerialDevice:
"""Convert serialx SerialPortInfo to SerialDevice."""
def serial_device_from_port(port: ListPortInfo) -> SerialDevice:
"""Convert serial ListPortInfo to SerialDevice."""
return SerialDevice(
device=port.device,
serial_number=port.serial_number,
manufacturer=port.manufacturer,
description=port.product,
description=port.description,
)
def usb_serial_device_from_port(port: SerialPortInfo) -> USBDevice | SerialDevice:
"""Convert serialx SerialPortInfo to USBDevice or SerialDevice."""
if port.vid is not None and port.pid is not None:
def usb_serial_device_from_port(port: ListPortInfo) -> USBDevice | SerialDevice:
"""Convert serial ListPortInfo to USBDevice or SerialDevice."""
if port.vid is not None or port.pid is not None:
assert port.vid is not None
assert port.pid is not None
return usb_device_from_port(port)
return serial_device_from_port(port)
def scan_serial_ports() -> Sequence[USBDevice | SerialDevice]:
"""Scan serial ports and return USB and other serial devices."""
return [usb_serial_device_from_port(port) for port in list_serial_ports()]
# Scan all symlinks first
by_id = "/dev/serial/by-id"
realpath_to_by_id: dict[str, str] = {}
if os.path.isdir(by_id):
for path in (entry.path for entry in os.scandir(by_id) if entry.is_symlink()):
realpath_to_by_id[os.path.realpath(path)] = path
serial_ports = []
for port in comports():
device = usb_serial_device_from_port(port)
device_path = realpath_to_by_id.get(port.device, port.device)
if device_path != port.device:
# Prefer the unique /dev/serial/by-id/ path if it exists
device = dataclasses.replace(device, device=device_path)
serial_ports.append(device)
return serial_ports
async def async_scan_serial_ports(

View File

@@ -7,7 +7,6 @@ import asyncio
from collections import defaultdict
from collections.abc import Callable, Coroutine, Iterable, Mapping
from dataclasses import dataclass, field
from datetime import timedelta
import functools
import inspect
import logging
@@ -32,7 +31,6 @@ from homeassistant.const import (
CONF_ENABLED,
CONF_ENTITY_ID,
CONF_EVENT_DATA,
CONF_FOR,
CONF_ID,
CONF_OPTIONS,
CONF_PLATFORM,
@@ -76,7 +74,6 @@ from .automation import (
get_relative_description_key,
move_options_fields_to_top_level,
)
from .event import async_track_same_state
from .integration_platform import async_process_integration_platforms
from .selector import (
NumericThresholdMode,
@@ -343,7 +340,6 @@ ENTITY_STATE_TRIGGER_SCHEMA_FIRST_LAST = ENTITY_STATE_TRIGGER_SCHEMA.extend(
vol.Required(ATTR_BEHAVIOR, default=BEHAVIOR_ANY): vol.In(
[BEHAVIOR_FIRST, BEHAVIOR_LAST, BEHAVIOR_ANY]
),
vol.Optional(CONF_FOR): cv.positive_time_period_dict,
},
}
)
@@ -372,7 +368,6 @@ class EntityTriggerBase(Trigger):
if TYPE_CHECKING:
assert config.target is not None
self._options = config.options or {}
self._duration: timedelta | None = self._options.get(CONF_FOR)
self._target = config.target
def entity_filter(self, entities: set[str]) -> set[str]:
@@ -403,13 +398,16 @@ class EntityTriggerBase(Trigger):
and state.state not in self._excluded_states
)
def count_matches(self, entity_ids: set[str]) -> int:
"""Count the number of entity states that match."""
return sum(
self.is_valid_state(state)
for entity_id in entity_ids
if (state := self._hass.states.get(entity_id)) is not None
and state.state not in self._excluded_states
def check_one_match(self, entity_ids: set[str]) -> bool:
"""Check that only one entity state matches."""
return (
sum(
self.is_valid_state(state)
for entity_id in entity_ids
if (state := self._hass.states.get(entity_id)) is not None
and state.state not in self._excluded_states
)
== 1
)
@override
@@ -418,8 +416,7 @@ class EntityTriggerBase(Trigger):
) -> CALLBACK_TYPE:
"""Attach the trigger to an action runner."""
behavior: str = self._options.get(ATTR_BEHAVIOR, BEHAVIOR_ANY)
unsub_track_same: dict[str, Callable[[], None]] = {}
behavior = self._options.get(ATTR_BEHAVIOR)
@callback
def state_change_listener(
@@ -431,30 +428,6 @@ class EntityTriggerBase(Trigger):
from_state = event.data["old_state"]
to_state = event.data["new_state"]
def state_still_valid(
_: str, from_state: State | None, to_state: State | None
) -> bool:
"""Check if the state is still valid during the duration wait.
Called by async_track_same_state on each state change to
determine whether to cancel the timer.
For behavior any, checks the individual entity's state.
For behavior first/last, checks the combined state.
"""
if behavior == BEHAVIOR_LAST:
return self.check_all_match(
target_state_change_data.targeted_entity_ids
)
if behavior == BEHAVIOR_FIRST:
return (
self.count_matches(target_state_change_data.targeted_entity_ids)
>= 1
)
# Behavior any: check the individual entity's state
if not to_state:
return False
return self.is_valid_state(to_state)
if not from_state or not to_state:
return
@@ -472,65 +445,25 @@ class EntityTriggerBase(Trigger):
):
return
elif behavior == BEHAVIOR_FIRST:
# Note: It's enough to test for exactly 1 match here because if there
# were previously 2 matches the transition would not be valid and we
# would have returned already.
if (
self.count_matches(target_state_change_data.targeted_entity_ids)
!= 1
if not self.check_one_match(
target_state_change_data.targeted_entity_ids
):
return
@callback
def call_action() -> None:
"""Call action with right context."""
# After a `for` delay, keep the original triggering event payload.
# `async_track_same_state` only verifies the state remained valid
# for the configured duration before firing the action.
run_action(
{
ATTR_ENTITY_ID: entity_id,
"from_state": from_state,
"to_state": to_state,
"for": self._duration,
},
f"state of {entity_id}",
event.context,
)
if not self._duration:
call_action()
return
subscription_key = entity_id if behavior == BEHAVIOR_ANY else behavior
if subscription_key in unsub_track_same:
unsub_track_same.pop(subscription_key)()
unsub_track_same[subscription_key] = async_track_same_state(
self._hass,
self._duration,
call_action,
state_still_valid,
entity_ids=(
entity_id
if behavior == BEHAVIOR_ANY
else target_state_change_data.targeted_entity_ids
),
run_action(
{
ATTR_ENTITY_ID: entity_id,
"from_state": from_state,
"to_state": to_state,
},
f"state of {entity_id}",
event.context,
)
unsub = async_track_target_selector_state_change_event(
return async_track_target_selector_state_change_event(
self._hass, self._target, state_change_listener, self.entity_filter
)
@callback
def async_remove() -> None:
"""Remove state listeners async."""
unsub()
for async_remove in unsub_track_same.values():
async_remove()
unsub_track_same.clear()
return async_remove
class EntityTargetStateTriggerBase(EntityTriggerBase):
"""Trigger for entity state changes to a specific state.

View File

@@ -32,7 +32,7 @@ cronsim==2.7
cryptography==46.0.7
dbus-fast==4.0.4
file-read-backwards==2.0.0
fnv-hash-fast==2.0.2
fnv-hash-fast==2.0.0
go2rtc-client==0.4.0
ha-ffmpeg==3.2.2
habluetooth==6.0.0
@@ -57,13 +57,13 @@ PyJWT==2.10.1
pymicro-vad==1.0.1
PyNaCl==1.6.2
pyOpenSSL==26.0.0
pyserial==3.5
pyspeex-noise==1.0.2
python-slugify==8.0.4
PyTurboJPEG==1.8.3
PyTurboJPEG==1.8.0
PyYAML==6.0.3
requests==2.33.1
securetar==2026.4.1
serialx==1.2.2
SQLAlchemy==2.0.49
standard-aifc==3.13.0
standard-telnetlib==3.13.0

View File

@@ -44,7 +44,7 @@ dependencies = [
"certifi>=2021.5.30",
"ciso8601==2.3.3",
"cronsim==2.7",
"fnv-hash-fast==2.0.2",
"fnv-hash-fast==2.0.0",
# hass-nabucasa is imported by helpers which don't depend on the cloud
# integration
"hass-nabucasa==2.2.0",

4
requirements.txt generated
View File

@@ -22,7 +22,7 @@ certifi>=2021.5.30
ciso8601==2.3.3
cronsim==2.7
cryptography==46.0.7
fnv-hash-fast==2.0.2
fnv-hash-fast==2.0.0
ha-ffmpeg==3.2.2
hass-nabucasa==2.2.0
hassil==3.5.0
@@ -44,7 +44,7 @@ pymicro-vad==1.0.1
pyOpenSSL==26.0.0
pyspeex-noise==1.0.2
python-slugify==8.0.4
PyTurboJPEG==1.8.3
PyTurboJPEG==1.8.0
PyYAML==6.0.3
requests==2.33.1
securetar==2026.4.1

16
requirements_all.txt generated
View File

@@ -96,7 +96,7 @@ PyTransportNSW==0.1.1
# homeassistant.components.camera
# homeassistant.components.stream
PyTurboJPEG==1.8.3
PyTurboJPEG==1.8.0
# homeassistant.components.vicare
PyViCare==2.59.0
@@ -1006,7 +1006,7 @@ flux-led==1.2.0
# homeassistant.components.homekit
# homeassistant.components.recorder
fnv-hash-fast==2.0.2
fnv-hash-fast==2.0.0
# homeassistant.components.foobot
foobot_async==1.0.0
@@ -2465,6 +2465,13 @@ pysensibo==1.2.1
# homeassistant.components.senz
pysenz==1.0.2
# homeassistant.components.serial
pyserial-asyncio-fast==0.16
# homeassistant.components.acer_projector
# homeassistant.components.usb
pyserial==3.5
# homeassistant.components.sesame
pysesame2==1.0.1
@@ -2921,10 +2928,7 @@ sentence-stream==1.2.0
# homeassistant.components.sentry
sentry-sdk==2.48.0
# homeassistant.components.acer_projector
# homeassistant.components.homeassistant_hardware
# homeassistant.components.serial
# homeassistant.components.usb
# homeassistant.components.zha
serialx==1.2.2
@@ -3159,7 +3163,7 @@ tuya-device-handlers==0.0.17
tuya-device-sharing-sdk==0.2.8
# homeassistant.components.twentemilieu
twentemilieu==3.0.0
twentemilieu==2.2.1
# homeassistant.components.twilio
twilio==6.32.0

View File

@@ -93,7 +93,7 @@ PyTransportNSW==0.1.1
# homeassistant.components.camera
# homeassistant.components.stream
PyTurboJPEG==1.8.3
PyTurboJPEG==1.8.0
# homeassistant.components.vicare
PyViCare==2.59.0
@@ -894,7 +894,7 @@ flux-led==1.2.0
# homeassistant.components.homekit
# homeassistant.components.recorder
fnv-hash-fast==2.0.2
fnv-hash-fast==2.0.0
# homeassistant.components.foobot
foobot_async==1.0.0
@@ -2109,6 +2109,10 @@ pysensibo==1.2.1
# homeassistant.components.senz
pysenz==1.0.2
# homeassistant.components.acer_projector
# homeassistant.components.usb
pyserial==3.5
# homeassistant.components.seventeentrack
pyseventeentrack==1.1.3
@@ -2481,10 +2485,7 @@ sentence-stream==1.2.0
# homeassistant.components.sentry
sentry-sdk==2.48.0
# homeassistant.components.acer_projector
# homeassistant.components.homeassistant_hardware
# homeassistant.components.serial
# homeassistant.components.usb
# homeassistant.components.zha
serialx==1.2.2
@@ -2674,7 +2675,7 @@ tuya-device-handlers==0.0.17
tuya-device-sharing-sdk==0.2.8
# homeassistant.components.twentemilieu
twentemilieu==3.0.0
twentemilieu==2.2.1
# homeassistant.components.twilio
twilio==6.32.0

View File

@@ -1,5 +1,6 @@
"""Tests for the HDMI-CEC media player platform."""
from collections.abc import Callable
from typing import Any
from pycec.const import (
@@ -57,6 +58,39 @@ from homeassistant.core import HomeAssistant
from . import MockHDMIDevice, assert_key_press_release
from .conftest import CecEntityCreator, HDMINetworkCreator
type AssertState = Callable[[str, str], None]
@pytest.fixture(
name="assert_state",
params=[
False,
pytest.param(
True,
marks=pytest.mark.xfail(
reason="""State isn't updated because the function is missing the
`schedule_update_ha_state` for a correct push entity. Would still
update once the data comes back from the device."""
),
),
],
ids=["skip_assert_state", "run_assert_state"],
)
def assert_state_fixture(request: pytest.FixtureRequest) -> AssertState:
"""Allow for skipping the assert state changes.
This is broken in this entity, but we still want to test that
the rest of the code works as expected.
"""
def _test_state(state: str, expected: str) -> None:
if request.param:
assert state == expected
else:
assert True
return _test_state
async def test_load_platform(
hass: HomeAssistant,
@@ -108,6 +142,7 @@ async def test_service_on(
hass: HomeAssistant,
create_hdmi_network: HDMINetworkCreator,
create_cec_entity: CecEntityCreator,
assert_state: AssertState,
) -> None:
"""Test that media_player triggers on `on` service."""
hdmi_network = await create_hdmi_network({"platform": "media_player"})
@@ -122,17 +157,19 @@ async def test_service_on(
{ATTR_ENTITY_ID: "media_player.hdmi_3"},
blocking=True,
)
await hass.async_block_till_done()
mock_hdmi_device.turn_on.assert_called_once_with()
state = hass.states.get("media_player.hdmi_3")
assert state.state == STATE_ON
assert_state(state.state, STATE_ON)
async def test_service_off(
hass: HomeAssistant,
create_hdmi_network: HDMINetworkCreator,
create_cec_entity: CecEntityCreator,
assert_state: AssertState,
) -> None:
"""Test that media_player triggers on `off` service."""
hdmi_network = await create_hdmi_network({"platform": "media_player"})
@@ -151,7 +188,7 @@ async def test_service_off(
mock_hdmi_device.turn_off.assert_called_once_with()
state = hass.states.get("media_player.hdmi_3")
assert state.state == STATE_OFF
assert_state(state.state, STATE_OFF)
@pytest.mark.parametrize(
@@ -280,6 +317,7 @@ async def test_volume_services(
data,
blocking=True,
)
await hass.async_block_till_done()
assert mock_hdmi_device.send_command.call_count == 2
assert_key_press_release(mock_hdmi_device.send_command, dst=3, key=key)
@@ -310,6 +348,7 @@ async def test_track_change_services(
{ATTR_ENTITY_ID: "media_player.hdmi_3"},
blocking=True,
)
await hass.async_block_till_done()
assert mock_hdmi_device.send_command.call_count == 2
assert_key_press_release(mock_hdmi_device.send_command, dst=3, key=key)
@@ -334,6 +373,7 @@ async def test_playback_services(
hass: HomeAssistant,
create_hdmi_network: HDMINetworkCreator,
create_cec_entity: CecEntityCreator,
assert_state: AssertState,
service: str,
key: int,
expected_state: str,
@@ -349,12 +389,13 @@ async def test_playback_services(
{ATTR_ENTITY_ID: "media_player.hdmi_3"},
blocking=True,
)
await hass.async_block_till_done()
assert mock_hdmi_device.send_command.call_count == 2
assert_key_press_release(mock_hdmi_device.send_command, dst=3, key=key)
state = hass.states.get("media_player.hdmi_3")
assert state.state == expected_state
assert_state(state.state, expected_state)
@pytest.mark.xfail(reason="PLAY feature isn't enabled")
@@ -362,6 +403,7 @@ async def test_play_pause_service(
hass: HomeAssistant,
create_hdmi_network: HDMINetworkCreator,
create_cec_entity: CecEntityCreator,
assert_state: AssertState,
) -> None:
"""Test play pause service."""
hdmi_network = await create_hdmi_network({"platform": "media_player"})
@@ -376,12 +418,13 @@ async def test_play_pause_service(
{ATTR_ENTITY_ID: "media_player.hdmi_3"},
blocking=True,
)
await hass.async_block_till_done()
assert mock_hdmi_device.send_command.call_count == 2
assert_key_press_release(mock_hdmi_device.send_command, dst=3, key=KEY_PAUSE)
state = hass.states.get("media_player.hdmi_3")
assert state.state == STATE_PAUSED
assert_state(state.state, STATE_PAUSED)
await hass.services.async_call(
MEDIA_PLAYER_DOMAIN,
@@ -389,6 +432,7 @@ async def test_play_pause_service(
{ATTR_ENTITY_ID: "media_player.hdmi_3"},
blocking=True,
)
await hass.async_block_till_done()
assert mock_hdmi_device.send_command.call_count == 4
assert_key_press_release(mock_hdmi_device.send_command, 1, dst=3, key=KEY_PLAY)
@@ -483,6 +527,9 @@ async def test_starting_state(
assert state.state == expected_state
@pytest.mark.xfail(
reason="The code only sets the state to unavailable, doesn't set the `_attr_available` to false."
)
async def test_unavailable_status(
hass: HomeAssistant,
create_hdmi_network: HDMINetworkCreator,
@@ -494,7 +541,6 @@ async def test_unavailable_status(
await create_cec_entity(hdmi_network, mock_hdmi_device)
hass.bus.async_fire(EVENT_HDMI_CEC_UNAVAILABLE)
await hass.async_block_till_done()
state = hass.states.get("media_player.hdmi_3")
assert state.state == STATE_UNAVAILABLE

View File

@@ -2634,7 +2634,7 @@ async def help_test_reload_with_config(
"""Test reloading with supplied config."""
new_yaml_config_file = tmp_path / "configuration.yaml"
def _write_yaml_config() -> str:
def _write_yaml_config() -> None:
new_yaml_config = yaml.dump(config)
new_yaml_config_file.write_text(new_yaml_config)
assert new_yaml_config_file.read_text() == new_yaml_config

View File

@@ -536,6 +536,7 @@ async def test_loading_subentries(
async def test_loading_subentry_with_bad_component_schema(
hass: HomeAssistant,
mqtt_mock_entry: MqttMockHAClientGenerator,
mqtt_config_subentries_data: tuple[dict[str, Any]],
device_registry: dr.DeviceRegistry,
caplog: pytest.LogCaptureFixture,
) -> None:
@@ -564,9 +565,10 @@ async def test_loading_subentry_with_bad_component_schema(
)
],
)
async def test_qos_on_mqtt_device_from_subentry(
async def test_qos_on_mqt_device_from_subentry(
hass: HomeAssistant,
mqtt_mock_entry: MqttMockHAClientGenerator,
mqtt_config_subentries_data: tuple[dict[str, Any]],
device_registry: dr.DeviceRegistry,
) -> None:
"""Test QoS is set correctly on entities from MQTT device."""

View File

@@ -7,7 +7,6 @@ import os
from unittest.mock import MagicMock, Mock, call, patch, sentinel
import pytest
from serialx import SerialPortInfo
from homeassistant import config_entries
from homeassistant.components import usb
@@ -1297,55 +1296,112 @@ async def test_register_port_event_callback_failure(
assert "Failure 2" in caplog.text
async def test_async_scan_serial_ports(hass: HomeAssistant) -> None:
"""Test async_scan_serial_ports parsing."""
with patch(
"homeassistant.components.usb.utils.list_serial_ports",
return_value=[
SerialPortInfo(
device="/dev/ttyAMA1",
resolved_device="/dev/ttyAMA1",
vid=None,
pid=None,
serial_number=None,
manufacturer=None,
product=None,
bcd_device=None,
interface_description=None,
interface_num=None,
),
SerialPortInfo(
device="/dev/serial/by-id/usb-Nabu_Casa_ZBT-2_10B41DE589FC-if00",
resolved_device="/dev/ttyACM0",
vid=12346,
pid=16385,
serial_number="10B41DE589FC",
manufacturer="Nabu Casa",
product="ZBT-2",
bcd_device=257,
interface_description="Nabu Casa ZBT-2",
interface_num=0,
),
],
async def test_async_scan_serial_ports_with_unique_symlinks(
hass: HomeAssistant,
) -> None:
"""Test async_scan_serial_ports returns devices with unique /dev/serial/by-id paths."""
entry1 = MagicMock(spec_set=os.DirEntry)
entry1.is_symlink.return_value = True
entry1.path = "/dev/serial/by-id/usb-device1"
entry2 = MagicMock(spec_set=os.DirEntry)
entry2.is_symlink.return_value = True
entry2.path = "/dev/serial/by-id/usb-device2"
mock_port1 = MagicMock()
mock_port1.device = "/dev/ttyUSB0"
mock_port1.vid = 0x1234
mock_port1.pid = 0x5678
mock_port1.serial_number = "ABC123"
mock_port1.manufacturer = "Test Manufacturer"
mock_port1.description = "Test Device"
mock_port2 = MagicMock()
mock_port2.device = "/dev/ttyUSB1"
mock_port2.vid = 0xABCD
mock_port2.pid = 0xEF01
mock_port2.serial_number = "XYZ789"
mock_port2.manufacturer = "Another Manufacturer"
mock_port2.description = "Another Device"
def mock_realpath(path: str) -> str:
realpath_map = {
"/dev/serial/by-id/usb-device1": "/dev/ttyUSB0",
"/dev/serial/by-id/usb-device2": "/dev/ttyUSB1",
}
return realpath_map.get(path, path)
with (
patch("os.path.isdir", return_value=True),
patch("os.scandir", return_value=[entry1, entry2]),
patch("os.path.realpath", side_effect=mock_realpath),
patch(
"homeassistant.components.usb.utils.comports",
return_value=[mock_port1, mock_port2],
),
):
devices = await async_scan_serial_ports(hass)
assert devices == [
SerialDevice(
device="/dev/ttyAMA1",
serial_number=None,
manufacturer=None,
description=None,
assert len(devices) == 2
assert devices[0].device == "/dev/serial/by-id/usb-device1"
assert devices[0].vid == "1234"
assert devices[1].device == "/dev/serial/by-id/usb-device2"
assert devices[1].vid == "ABCD"
async def test_async_scan_serial_ports_without_unique_symlinks(
hass: HomeAssistant,
) -> None:
"""Test async_scan_serial_ports returns devices with original paths when no symlinks exist."""
mock_port = MagicMock()
mock_port.device = "/dev/ttyUSB0"
mock_port.vid = 0x1234
mock_port.pid = 0x5678
mock_port.serial_number = "ABC123"
mock_port.manufacturer = "Test Manufacturer"
mock_port.description = "Test Device"
with (
patch("os.path.isdir", return_value=False),
patch("os.path.realpath", side_effect=lambda x: x),
patch(
"homeassistant.components.usb.utils.comports",
return_value=[mock_port],
),
USBDevice(
device="/dev/serial/by-id/usb-Nabu_Casa_ZBT-2_10B41DE589FC-if00",
vid="303A",
pid="4001",
serial_number="10B41DE589FC",
manufacturer="Nabu Casa",
description="ZBT-2",
):
devices = await async_scan_serial_ports(hass)
assert len(devices) == 1
assert devices[0].device == "/dev/ttyUSB0"
assert devices[0].vid == "1234"
async def test_async_scan_serial_ports_no_vid_pid(hass: HomeAssistant) -> None:
"""Test async_scan_serial_ports returns devices without VID:PID."""
mock_port = MagicMock()
mock_port.device = "/dev/ttyAMA1"
mock_port.vid = None
mock_port.pid = None
mock_port.serial_number = None
mock_port.manufacturer = None
mock_port.description = None
with (
patch("os.path.isdir", return_value=False),
patch("os.path.realpath", side_effect=lambda x: x),
patch(
"homeassistant.components.usb.utils.comports",
return_value=[mock_port],
),
]
):
devices = await async_scan_serial_ports(hass)
assert len(devices) == 1
assert isinstance(devices[0], SerialDevice)
assert devices[0].device == "/dev/ttyAMA1"
assert devices[0].serial_number is None
assert devices[0].manufacturer is None
assert devices[0].description is None
def test_usb_device_from_path_finds_by_symlink() -> None:

View File

@@ -17,6 +17,7 @@ from unittest.mock import (
import uuid
import pytest
from serial.tools.list_ports_common import ListPortInfo
from zha.application.const import RadioType
from zigpy.application import ControllerApplication
from zigpy.backups import BackupManager
@@ -32,7 +33,7 @@ import zigpy.types
from homeassistant import config_entries
from homeassistant.components.hassio import AddonError, AddonState
from homeassistant.components.usb import SerialDevice, USBDevice
from homeassistant.components.usb import USBDevice
from homeassistant.components.zha import config_flow, radio_manager
from homeassistant.components.zha.const import (
CONF_BAUDRATE,
@@ -65,7 +66,9 @@ from homeassistant.helpers.service_info.zeroconf import ZeroconfServiceInfo
from tests.common import MockConfigEntry
type RadioPicker = Callable[[RadioType], Coroutine[Any, Any, ConfigFlowResult]]
type RadioPicker = Callable[
[RadioType], Coroutine[Any, Any, tuple[ConfigFlowResult, ListPortInfo]]
]
PROBE_FUNCTION_PATH = "zigbee.application.ControllerApplication.probe"
@@ -165,14 +168,15 @@ def mock_detect_radio_type(
return detect
def com_port(device="/dev/ttyUSB1234") -> SerialDevice:
def com_port(device="/dev/ttyUSB1234") -> ListPortInfo:
"""Mock of a serial port."""
return SerialDevice(
device=device,
serial_number="1234",
manufacturer="Virtual serial port",
description="Some serial port",
)
port = ListPortInfo(device)
port.serial_number = "1234"
port.manufacturer = "Virtual serial port"
port.device = device
port.description = "Some serial port"
return port
def usb_port(device="/dev/ttyUSB1234") -> USBDevice:
@@ -1125,7 +1129,7 @@ async def test_user_flow_not_detected(hass: HomeAssistant) -> None:
"""Test user flow, radio not detected."""
port = com_port()
port_select = f"{port.device} - {port.description}, s/n: {port.serial_number} - {port.manufacturer}"
port_select = f"{port}, s/n: {port.serial_number} - {port.manufacturer}"
result = await hass.config_entries.flow.async_init(
DOMAIN,
@@ -1696,12 +1700,14 @@ def test_prevent_overwrite_ezsp_ieee() -> None:
@pytest.fixture
def advanced_pick_radio(hass: HomeAssistant) -> Generator[RadioPicker]:
def advanced_pick_radio(
hass: HomeAssistant,
) -> Generator[RadioPicker]:
"""Fixture for the first step of the config flow (where a radio is picked)."""
async def wrapper(radio_type: RadioType) -> ConfigFlowResult:
async def wrapper(radio_type: RadioType) -> tuple[ConfigFlowResult, ListPortInfo]:
port = com_port()
port_select = f"{port.device} - {port.description}, s/n: {port.serial_number} - {port.manufacturer}"
port_select = f"{port}, s/n: {port.serial_number} - {port.manufacturer}"
with patch(
"homeassistant.components.zha.radio_manager.ZhaRadioManager.detect_radio_type",

View File

@@ -4,6 +4,7 @@ from collections.abc import Generator
from unittest.mock import AsyncMock, MagicMock, create_autospec, patch
import pytest
import serial.tools.list_ports
from zha.application.const import RadioType
from zigpy.backups import BackupManager
import zigpy.config
@@ -76,6 +77,17 @@ def mock_detect_radio_type(
return detect
def com_port(device="/dev/ttyUSB1234"):
"""Mock of a serial port."""
port = serial.tools.list_ports_common.ListPortInfo("/dev/ttyUSB1234")
port.serial_number = "1234"
port.manufacturer = "Virtual serial port"
port.device = device
port.description = "Some serial port"
return port
@pytest.fixture
def mock_create_zigpy_app() -> Generator[MagicMock]:
"""Mock the radio connection."""

View File

@@ -2,13 +2,11 @@
from collections.abc import Mapping
from contextlib import AbstractContextManager, nullcontext as does_not_raise
import datetime
import io
import logging
from typing import Any
from unittest.mock import ANY, AsyncMock, MagicMock, Mock, call, patch
from freezegun.api import FrozenDateTimeFactory
import pytest
from pytest_unordered import unordered
import voluptuous as vol
@@ -22,7 +20,6 @@ from homeassistant.const import (
ATTR_DEVICE_CLASS,
ATTR_UNIT_OF_MEASUREMENT,
CONF_ENTITY_ID,
CONF_FOR,
CONF_OPTIONS,
CONF_PLATFORM,
CONF_TARGET,
@@ -75,13 +72,7 @@ from homeassistant.setup import async_setup_component
from homeassistant.util.unit_conversion import TemperatureConverter
from homeassistant.util.yaml.loader import parse_yaml
from tests.common import (
MockModule,
MockPlatform,
async_fire_time_changed,
mock_integration,
mock_platform,
)
from tests.common import MockModule, MockPlatform, mock_integration, mock_platform
from tests.typing import WebSocketGenerator
@@ -3119,7 +3110,6 @@ async def _arm_off_to_on_trigger(
entity_ids: list[str],
behavior: str,
calls: list[dict[str, Any]],
duration: dict[str, int] | None,
) -> CALLBACK_TYPE:
"""Set up _OffToOnTrigger via async_initialize_triggers."""
@@ -3131,14 +3121,10 @@ async def _arm_off_to_on_trigger(
mock_integration(hass, MockModule("test"))
mock_platform(hass, "test.trigger", Mock(async_get_triggers=async_get_triggers))
options: dict[str, Any] = {ATTR_BEHAVIOR: behavior}
if duration is not None:
options[CONF_FOR] = duration
trigger_config = {
CONF_PLATFORM: "test.off_to_on",
CONF_TARGET: {CONF_ENTITY_ID: entity_ids},
CONF_OPTIONS: options,
CONF_OPTIONS: {ATTR_BEHAVIOR: behavior},
}
log = logging.getLogger(__name__)
@@ -3172,23 +3158,18 @@ def _set_or_remove_state(
async def test_entity_trigger_fires_on_valid_transition(
hass: HomeAssistant, behavior: str
) -> None:
"""Test EntityTriggerBase fires immediately on a valid off→on transition without duration."""
"""Test EntityTriggerBase fires on a valid off→on transition."""
entity_id = "test.entity_1"
hass.states.async_set(entity_id, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_off_to_on_trigger(
hass, [entity_id], behavior, calls, duration=None
)
unsub = await _arm_off_to_on_trigger(hass, [entity_id], behavior, calls)
hass.states.async_set(entity_id, STATE_ON)
await hass.async_block_till_done()
assert len(calls) == 1
assert calls[0]["entity_id"] == entity_id
assert calls[0]["from_state"].state == STATE_OFF
assert calls[0]["to_state"].state == STATE_ON
assert calls[0]["for"] is None
# Transition back and trigger again
calls.clear()
@@ -3216,9 +3197,7 @@ async def test_entity_trigger_from_invalid_initial_state(
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_off_to_on_trigger(
hass, [entity_id], behavior, calls, duration=None
)
unsub = await _arm_off_to_on_trigger(hass, [entity_id], behavior, calls)
# Transition to "on" from the invalid initial state
_set_or_remove_state(hass, entity_id, STATE_ON)
@@ -3249,7 +3228,7 @@ async def test_entity_trigger_last_requires_all(
calls: list[dict[str, Any]] = []
unsub = await _arm_off_to_on_trigger(
hass, [entity_a, entity_b], BEHAVIOR_LAST, calls, duration=None
hass, [entity_a, entity_b], BEHAVIOR_LAST, calls
)
# Turn only A on — not all match, should not fire
@@ -3277,7 +3256,7 @@ async def test_entity_trigger_first_requires_exactly_one(
calls: list[dict[str, Any]] = []
unsub = await _arm_off_to_on_trigger(
hass, [entity_a, entity_b], BEHAVIOR_FIRST, calls, duration=None
hass, [entity_a, entity_b], BEHAVIOR_FIRST, calls
)
# Turn A on — exactly one matches, should fire
@@ -3298,7 +3277,7 @@ async def test_entity_trigger_first_requires_exactly_one(
[STATE_UNAVAILABLE, STATE_UNKNOWN],
ids=["unavailable", "unknown"],
)
async def test_entity_trigger_last_ignores_unavailable_and_unknown_entity(
async def test_entity_trigger_last_ignores_unavailable_and_unknownentity(
hass: HomeAssistant, invalid_state: str
) -> None:
"""Test behavior last: unavailable/unknown entities are excluded from check_all_match.
@@ -3318,7 +3297,7 @@ async def test_entity_trigger_last_ignores_unavailable_and_unknown_entity(
calls: list[dict[str, Any]] = []
unsub = await _arm_off_to_on_trigger(
hass, [entity_a, entity_b, entity_c], BEHAVIOR_LAST, calls, duration=None
hass, [entity_a, entity_b, entity_c], BEHAVIOR_LAST, calls
)
# Turn A on — B is unavailable and skipped, only A is on → all doesn't match
@@ -3370,7 +3349,7 @@ async def test_entity_trigger_first_ignores_unavailable_and_unknown_entity(
calls: list[dict[str, Any]] = []
unsub = await _arm_off_to_on_trigger(
hass, [entity_a, entity_b, entity_c], BEHAVIOR_FIRST, calls, duration=None
hass, [entity_a, entity_b, entity_c], BEHAVIOR_FIRST, calls
)
# Turn A on — B is unavailable and skipped, only A matches → exactly one
@@ -3385,498 +3364,3 @@ async def test_entity_trigger_first_ignores_unavailable_and_unknown_entity(
assert len(calls) == 1
unsub()
@pytest.mark.parametrize("behavior", [BEHAVIOR_ANY, BEHAVIOR_FIRST, BEHAVIOR_LAST])
async def test_entity_trigger_with_duration(
hass: HomeAssistant, freezer: FrozenDateTimeFactory, behavior: str
) -> None:
"""Test EntityTriggerBase waits for duration before firing."""
entity_id = "test.entity_1"
hass.states.async_set(entity_id, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_off_to_on_trigger(
hass, [entity_id], behavior, calls, duration={"seconds": 5}
)
# Turn on — should NOT fire immediately
hass.states.async_set(entity_id, STATE_ON)
await hass.async_block_till_done()
assert len(calls) == 0
# Advance time past duration — should fire
freezer.tick(datetime.timedelta(seconds=6))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 1
assert calls[0]["entity_id"] == entity_id
assert calls[0]["from_state"].state == STATE_OFF
assert calls[0]["to_state"].state == STATE_ON
assert calls[0]["for"] == datetime.timedelta(seconds=5)
unsub()
@pytest.mark.parametrize("behavior", [BEHAVIOR_ANY, BEHAVIOR_FIRST, BEHAVIOR_LAST])
async def test_entity_trigger_duration_cancelled_on_state_change(
hass: HomeAssistant, freezer: FrozenDateTimeFactory, behavior: str
) -> None:
"""Test that the duration timer is cancelled if state changes back."""
entity_id = "test.entity_1"
hass.states.async_set(entity_id, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_off_to_on_trigger(
hass, [entity_id], behavior, calls, duration={"seconds": 5}
)
# Turn on, then back off before duration expires
hass.states.async_set(entity_id, STATE_ON)
await hass.async_block_till_done()
freezer.tick(datetime.timedelta(seconds=2))
async_fire_time_changed(hass)
await hass.async_block_till_done()
hass.states.async_set(entity_id, STATE_OFF)
await hass.async_block_till_done()
# Advance past the original duration — should NOT fire
freezer.tick(datetime.timedelta(seconds=10))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 0
unsub()
async def test_entity_trigger_duration_any_independent(
hass: HomeAssistant, freezer: FrozenDateTimeFactory
) -> None:
"""Test behavior any tracks per-entity durations independently."""
entity_a = "test.entity_a"
entity_b = "test.entity_b"
hass.states.async_set(entity_a, STATE_OFF)
hass.states.async_set(entity_b, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_off_to_on_trigger(
hass, [entity_a, entity_b], BEHAVIOR_ANY, calls, duration={"seconds": 5}
)
# Turn A on
hass.states.async_set(entity_a, STATE_ON)
await hass.async_block_till_done()
assert len(calls) == 0
# Turn B on 2 seconds later
freezer.tick(datetime.timedelta(seconds=2))
async_fire_time_changed(hass)
hass.states.async_set(entity_b, STATE_ON)
await hass.async_block_till_done()
assert len(calls) == 0
# After 5s from A's turn-on, A should fire
freezer.tick(datetime.timedelta(seconds=3))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 1
assert calls[0]["entity_id"] == entity_a
# After 5s from B's turn-on (2 more seconds), B should fire
freezer.tick(datetime.timedelta(seconds=2))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 2
assert calls[1]["entity_id"] == entity_b
unsub()
async def test_entity_trigger_duration_any_entity_off_cancels_only_that_entity(
hass: HomeAssistant, freezer: FrozenDateTimeFactory
) -> None:
"""Test behavior any: turning off one entity doesn't cancel the other's timer."""
entity_a = "test.entity_a"
entity_b = "test.entity_b"
hass.states.async_set(entity_a, STATE_OFF)
hass.states.async_set(entity_b, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_off_to_on_trigger(
hass, [entity_a, entity_b], BEHAVIOR_ANY, calls, duration={"seconds": 5}
)
# Turn both on
hass.states.async_set(entity_a, STATE_ON)
hass.states.async_set(entity_b, STATE_ON)
await hass.async_block_till_done()
# Turn A off after 2 seconds — cancels A's timer but not B's
freezer.tick(datetime.timedelta(seconds=2))
async_fire_time_changed(hass)
hass.states.async_set(entity_a, STATE_OFF)
await hass.async_block_till_done()
# After 5s total, B should fire but A should not
freezer.tick(datetime.timedelta(seconds=3))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 1
assert calls[0]["entity_id"] == entity_b
unsub()
async def test_entity_trigger_duration_last_requires_all(
hass: HomeAssistant, freezer: FrozenDateTimeFactory
) -> None:
"""Test behavior last: trigger fires only when ALL entities are on for duration."""
entity_a = "test.entity_a"
entity_b = "test.entity_b"
hass.states.async_set(entity_a, STATE_OFF)
hass.states.async_set(entity_b, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_off_to_on_trigger(
hass, [entity_a, entity_b], BEHAVIOR_LAST, calls, duration={"seconds": 5}
)
# Turn only A on — should not start timer (not all match)
hass.states.async_set(entity_a, STATE_ON)
await hass.async_block_till_done()
freezer.tick(datetime.timedelta(seconds=6))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 0
# Turn B on — now all match, timer starts
hass.states.async_set(entity_b, STATE_ON)
await hass.async_block_till_done()
freezer.tick(datetime.timedelta(seconds=6))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 1
unsub()
async def test_entity_trigger_duration_last_cancelled_when_one_turns_off(
hass: HomeAssistant, freezer: FrozenDateTimeFactory
) -> None:
"""Test behavior last: timer is cancelled when one entity turns off."""
entity_a = "test.entity_a"
entity_b = "test.entity_b"
hass.states.async_set(entity_a, STATE_OFF)
hass.states.async_set(entity_b, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_off_to_on_trigger(
hass, [entity_a, entity_b], BEHAVIOR_LAST, calls, duration={"seconds": 5}
)
# Turn both on
hass.states.async_set(entity_a, STATE_ON)
hass.states.async_set(entity_b, STATE_ON)
await hass.async_block_till_done()
# Turn A off after 2 seconds
freezer.tick(datetime.timedelta(seconds=2))
async_fire_time_changed(hass)
hass.states.async_set(entity_a, STATE_OFF)
await hass.async_block_till_done()
# Advance past original duration — should NOT fire
freezer.tick(datetime.timedelta(seconds=10))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 0
unsub()
async def test_entity_trigger_duration_last_timer_reset(
hass: HomeAssistant, freezer: FrozenDateTimeFactory
) -> None:
"""Test behavior last: timer resets when combined state goes off and back on."""
entity_a = "test.entity_a"
entity_b = "test.entity_b"
hass.states.async_set(entity_a, STATE_OFF)
hass.states.async_set(entity_b, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_off_to_on_trigger(
hass, [entity_a, entity_b], BEHAVIOR_LAST, calls, duration={"seconds": 5}
)
# Turn both on — combined state "all on", timer starts
hass.states.async_set(entity_a, STATE_ON)
await hass.async_block_till_done()
hass.states.async_set(entity_b, STATE_ON)
await hass.async_block_till_done()
# After 2 seconds, B turns off — combined state breaks, timer cancelled
freezer.tick(datetime.timedelta(seconds=2))
async_fire_time_changed(hass)
hass.states.async_set(entity_b, STATE_OFF)
await hass.async_block_till_done()
# B turns back on — combined state restored, timer restarts
freezer.tick(datetime.timedelta(seconds=1))
async_fire_time_changed(hass)
hass.states.async_set(entity_b, STATE_ON)
await hass.async_block_till_done()
# 4 seconds after restart (not enough) — should NOT fire
freezer.tick(datetime.timedelta(seconds=4))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 0
# 1 more second (5 total from restart) — should fire
freezer.tick(datetime.timedelta(seconds=1))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 1
unsub()
async def test_entity_trigger_duration_first_fires_when_any_on(
hass: HomeAssistant, freezer: FrozenDateTimeFactory
) -> None:
"""Test behavior first: trigger fires when first entity turns on for duration."""
entity_a = "test.entity_a"
entity_b = "test.entity_b"
hass.states.async_set(entity_a, STATE_OFF)
hass.states.async_set(entity_b, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_off_to_on_trigger(
hass, [entity_a, entity_b], BEHAVIOR_FIRST, calls, duration={"seconds": 5}
)
# Turn A on — combined state goes to "at least one on", timer starts
hass.states.async_set(entity_a, STATE_ON)
await hass.async_block_till_done()
assert len(calls) == 0
# Advance past duration — should fire
freezer.tick(datetime.timedelta(seconds=6))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 1
unsub()
async def test_entity_trigger_duration_first_not_cancelled_by_second(
hass: HomeAssistant, freezer: FrozenDateTimeFactory
) -> None:
"""Test behavior first: second entity turning on doesn't restart timer."""
entity_a = "test.entity_a"
entity_b = "test.entity_b"
hass.states.async_set(entity_a, STATE_OFF)
hass.states.async_set(entity_b, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_off_to_on_trigger(
hass, [entity_a, entity_b], BEHAVIOR_FIRST, calls, duration={"seconds": 5}
)
# Turn A on
hass.states.async_set(entity_a, STATE_ON)
await hass.async_block_till_done()
# Turn B on 3 seconds later — combined state was already "any on",
# so this should NOT restart the timer
freezer.tick(datetime.timedelta(seconds=3))
async_fire_time_changed(hass)
hass.states.async_set(entity_b, STATE_ON)
await hass.async_block_till_done()
assert len(calls) == 0
# 2 more seconds (5 total from A) — should fire
freezer.tick(datetime.timedelta(seconds=2))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 1
unsub()
async def test_entity_trigger_duration_first_not_cancelled_by_partial_off(
hass: HomeAssistant, freezer: FrozenDateTimeFactory
) -> None:
"""Test behavior first: one entity off doesn't cancel if another is still on."""
entity_a = "test.entity_a"
entity_b = "test.entity_b"
hass.states.async_set(entity_a, STATE_OFF)
hass.states.async_set(entity_b, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_off_to_on_trigger(
hass, [entity_a, entity_b], BEHAVIOR_FIRST, calls, duration={"seconds": 5}
)
# Turn both on
hass.states.async_set(entity_a, STATE_ON)
await hass.async_block_till_done()
hass.states.async_set(entity_b, STATE_ON)
await hass.async_block_till_done()
# Turn A off after 2 seconds — combined state still "any on" (B is on)
freezer.tick(datetime.timedelta(seconds=2))
async_fire_time_changed(hass)
hass.states.async_set(entity_a, STATE_OFF)
await hass.async_block_till_done()
# Advance past duration — should still fire (combined state never went to "none on")
freezer.tick(datetime.timedelta(seconds=4))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 1
unsub()
async def test_entity_trigger_duration_first_cancelled_when_all_off(
hass: HomeAssistant, freezer: FrozenDateTimeFactory
) -> None:
"""Test behavior first: timer cancelled when ALL entities turn off."""
entity_a = "test.entity_a"
entity_b = "test.entity_b"
hass.states.async_set(entity_a, STATE_OFF)
hass.states.async_set(entity_b, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_off_to_on_trigger(
hass, [entity_a, entity_b], BEHAVIOR_FIRST, calls, duration={"seconds": 5}
)
# Turn both on
hass.states.async_set(entity_a, STATE_ON)
hass.states.async_set(entity_b, STATE_ON)
await hass.async_block_till_done()
# Turn both off after 2 seconds — combined state goes to "none on"
freezer.tick(datetime.timedelta(seconds=2))
async_fire_time_changed(hass)
hass.states.async_set(entity_a, STATE_OFF)
hass.states.async_set(entity_b, STATE_OFF)
await hass.async_block_till_done()
# Advance past original duration — should NOT fire
freezer.tick(datetime.timedelta(seconds=10))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 0
unsub()
async def test_entity_trigger_duration_any_retrigger_resets_timer(
hass: HomeAssistant, freezer: FrozenDateTimeFactory
) -> None:
"""Test behavior any: turning an entity off and on resets its timer."""
entity_id = "test.entity_1"
hass.states.async_set(entity_id, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_off_to_on_trigger(
hass, [entity_id], BEHAVIOR_ANY, calls, duration={"seconds": 5}
)
# Turn on
hass.states.async_set(entity_id, STATE_ON)
await hass.async_block_till_done()
# After 3 seconds, turn off and on again — resets the timer
freezer.tick(datetime.timedelta(seconds=3))
async_fire_time_changed(hass)
hass.states.async_set(entity_id, STATE_OFF)
await hass.async_block_till_done()
hass.states.async_set(entity_id, STATE_ON)
await hass.async_block_till_done()
# 3 more seconds (6 from start, but only 3 from retrigger) — should NOT fire
freezer.tick(datetime.timedelta(seconds=3))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 0
# 2 more seconds (5 from retrigger) — should fire
freezer.tick(datetime.timedelta(seconds=2))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == 1
unsub()
@pytest.mark.parametrize(
("behavior", "expected_calls"),
[(BEHAVIOR_ANY, 0), (BEHAVIOR_FIRST, 0), (BEHAVIOR_LAST, 1)],
)
@pytest.mark.parametrize(
"invalid_state",
[STATE_UNAVAILABLE, STATE_UNKNOWN, None],
ids=["unavailable", "unknown", "removed"],
)
async def test_entity_trigger_duration_cancelled_on_invalid_state(
hass: HomeAssistant,
freezer: FrozenDateTimeFactory,
behavior: str,
expected_calls: int,
invalid_state: str | None,
) -> None:
"""Test if the duration timer is cancelled if entity becomes unavailable, unknown, or is removed.
This is expected to happen in first and any modes, but not in last mode.
"""
entity_a = "test.entity_a"
entity_b = "test.entity_b"
_set_or_remove_state(hass, entity_a, STATE_OFF)
_set_or_remove_state(hass, entity_b, STATE_OFF)
await hass.async_block_till_done()
calls: list[dict[str, Any]] = []
unsub = await _arm_off_to_on_trigger(
hass, [entity_a, entity_b], behavior, calls, duration={"seconds": 5}
)
# Turn on the entities needed to start the timer
_set_or_remove_state(hass, entity_a, STATE_ON)
await hass.async_block_till_done()
if behavior == BEHAVIOR_LAST:
_set_or_remove_state(hass, entity_b, STATE_ON)
await hass.async_block_till_done()
# Entity A becomes invalid during the wait
freezer.tick(datetime.timedelta(seconds=2))
async_fire_time_changed(hass)
_set_or_remove_state(hass, entity_a, invalid_state)
await hass.async_block_till_done()
# Advance past the original duration — should NOT fire
freezer.tick(datetime.timedelta(seconds=10))
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert len(calls) == expected_calls
unsub()