Compare commits

..

7 Commits

Author SHA1 Message Date
Erik
7917f0a2b9 Adjust entity condition strings 2026-01-16 11:04:55 +01:00
epenet
7c6a31861e Improve type hints in egardia (#161048) 2026-01-16 10:08:24 +01:00
Robert Resch
b2b25ca28c Revert "Add SmartThings media-player audio notifications" (#161049) 2026-01-16 10:06:30 +01:00
epenet
ad9efab16a Improve type hints in concord232 (#161045) 2026-01-16 09:46:53 +01:00
Matthias Alphart
e967d33911 Update knx-frontend to 2026.1.15.112308 (#161004) 2026-01-16 09:37:09 +01:00
epenet
86bacdbdde Use shorthand attributes in oasa_telematics (#160990) 2026-01-16 09:34:51 +01:00
Robert Resch
644a40674d Make shebang matcher stricter (#160986) 2026-01-16 09:21:19 +01:00
26 changed files with 112 additions and 1875 deletions

View File

@@ -247,17 +247,11 @@ jobs:
&& github.event.inputs.audit-licenses-only != 'true'
steps:
- *checkout
- name: Register yamllint problem matcher
- name: Register problem matchers
run: |
echo "::add-matcher::.github/workflows/matchers/yamllint.json"
- name: Register check-json problem matcher
run: |
echo "::add-matcher::.github/workflows/matchers/check-json.json"
- name: Register check executables problem matcher
run: |
echo "::add-matcher::.github/workflows/matchers/check-executables-have-shebangs.json"
- name: Register codespell problem matcher
run: |
echo "::add-matcher::.github/workflows/matchers/codespell.json"
- name: Run prek
uses: j178/prek-action@9d6a3097e0c1865ecce00cfb89fe80f2ee91b547 # v1.0.12

View File

@@ -4,7 +4,7 @@
"owner": "check-executables-have-shebangs",
"pattern": [
{
"regexp": "^(.+):\\s(.+)$",
"regexp": "^(.+):\\s(marked executable but has no \\(or invalid\\) shebang!.*)$",
"file": 1,
"message": 2
}

View File

@@ -14,7 +14,7 @@
"name": "[%key:component::alarm_control_panel::common::condition_behavior_name%]"
}
},
"name": "If an alarm is armed"
"name": "Alarm is armed"
},
"is_armed_away": {
"description": "Tests if one or more alarms are armed in away mode.",
@@ -24,7 +24,7 @@
"name": "[%key:component::alarm_control_panel::common::condition_behavior_name%]"
}
},
"name": "If an alarm is armed away"
"name": "Alarm is armed away"
},
"is_armed_home": {
"description": "Tests if one or more alarms are armed in home mode.",
@@ -34,7 +34,7 @@
"name": "[%key:component::alarm_control_panel::common::condition_behavior_name%]"
}
},
"name": "If an alarm is armed home"
"name": "Alarm is armed home"
},
"is_armed_night": {
"description": "Tests if one or more alarms are armed in night mode.",
@@ -44,7 +44,7 @@
"name": "[%key:component::alarm_control_panel::common::condition_behavior_name%]"
}
},
"name": "If an alarm is armed night"
"name": "Alarm is armed night"
},
"is_armed_vacation": {
"description": "Tests if one or more alarms are armed in vacation mode.",
@@ -54,7 +54,7 @@
"name": "[%key:component::alarm_control_panel::common::condition_behavior_name%]"
}
},
"name": "If an alarm is armed vacation"
"name": "Alarm is armed vacation"
},
"is_disarmed": {
"description": "Tests if one or more alarms are disarmed.",
@@ -64,7 +64,7 @@
"name": "[%key:component::alarm_control_panel::common::condition_behavior_name%]"
}
},
"name": "If an alarm is disarmed"
"name": "Alarm is disarmed"
},
"is_triggered": {
"description": "Tests if one or more alarms are triggered.",
@@ -74,7 +74,7 @@
"name": "[%key:component::alarm_control_panel::common::condition_behavior_name%]"
}
},
"name": "If an alarm is triggered"
"name": "Alarm is triggered"
}
},
"device_automation": {

View File

@@ -14,7 +14,7 @@
"name": "[%key:component::assist_satellite::common::condition_behavior_name%]"
}
},
"name": "If a satellite is idle"
"name": "Satellite is idle"
},
"is_listening": {
"description": "Tests if one or more Assist satellites are listening.",
@@ -24,7 +24,7 @@
"name": "[%key:component::assist_satellite::common::condition_behavior_name%]"
}
},
"name": "If a satellite is listening"
"name": "Satellite is listening"
},
"is_processing": {
"description": "Tests if one or more Assist satellites are processing.",
@@ -34,7 +34,7 @@
"name": "[%key:component::assist_satellite::common::condition_behavior_name%]"
}
},
"name": "If a satellite is processing"
"name": "Satellite is processing"
},
"is_responding": {
"description": "Tests if one or more Assist satellites are responding.",
@@ -44,7 +44,7 @@
"name": "[%key:component::assist_satellite::common::condition_behavior_name%]"
}
},
"name": "If a satellite is responding"
"name": "Satellite is responding"
}
},
"entity_component": {

View File

@@ -19,7 +19,6 @@ import attr
from hass_nabucasa import AlreadyConnectedError, Cloud, auth
from hass_nabucasa.const import STATE_DISCONNECTED
from hass_nabucasa.voice_data import TTS_VOICES
import psutil_home_assistant as ha_psutil
import voluptuous as vol
from homeassistant.components import websocket_api
@@ -28,7 +27,6 @@ from homeassistant.components.alexa import (
errors as alexa_errors,
)
from homeassistant.components.google_assistant import helpers as google_helpers
from homeassistant.components.hassio import get_addons_stats, get_supervisor_info
from homeassistant.components.homeassistant import exposed_entities
from homeassistant.components.http import KEY_HASS, HomeAssistantView, require_admin
from homeassistant.components.http.data_validator import RequestDataValidator
@@ -39,7 +37,6 @@ from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.dispatcher import async_dispatcher_send
from homeassistant.helpers.hassio import is_hassio
from homeassistant.loader import (
async_get_custom_components,
async_get_loaded_integration,
@@ -574,11 +571,6 @@ class DownloadSupportPackageView(HomeAssistantView):
"</details>\n\n"
)
markdown += await self._get_host_resources_markdown(hass)
if is_hassio(hass):
markdown += await self._get_addon_resources_markdown(hass)
log_handler = hass.data[DATA_CLOUD_LOG_HANDLER]
logs = "\n".join(await log_handler.get_logs(hass))
markdown += (
@@ -592,103 +584,6 @@ class DownloadSupportPackageView(HomeAssistantView):
return markdown
async def _get_host_resources_markdown(self, hass: HomeAssistant) -> str:
"""Get host resource usage markdown using psutil."""
def _collect_system_stats() -> dict[str, Any]:
"""Collect system stats."""
psutil_wrapper = ha_psutil.PsutilWrapper()
psutil_mod = psutil_wrapper.psutil
cpu_percent = psutil_mod.cpu_percent(interval=0.1)
memory = psutil_mod.virtual_memory()
disk = psutil_mod.disk_usage("/")
return {
"cpu_percent": cpu_percent,
"memory_total": memory.total,
"memory_used": memory.used,
"memory_available": memory.available,
"memory_percent": memory.percent,
"disk_total": disk.total,
"disk_used": disk.used,
"disk_free": disk.free,
"disk_percent": disk.percent,
}
markdown = ""
try:
stats = await hass.async_add_executor_job(_collect_system_stats)
markdown += "## Host resource usage\n\n"
markdown += "Resource | Value\n"
markdown += "--- | ---\n"
markdown += f"CPU usage | {stats['cpu_percent']}%\n"
memory_total_gb = round(stats["memory_total"] / (1024**3), 2)
memory_used_gb = round(stats["memory_used"] / (1024**3), 2)
memory_available_gb = round(stats["memory_available"] / (1024**3), 2)
markdown += f"Memory total | {memory_total_gb} GB\n"
markdown += (
f"Memory used | {memory_used_gb} GB ({stats['memory_percent']}%)\n"
)
markdown += f"Memory available | {memory_available_gb} GB\n"
disk_total_gb = round(stats["disk_total"] / (1024**3), 2)
disk_used_gb = round(stats["disk_used"] / (1024**3), 2)
disk_free_gb = round(stats["disk_free"] / (1024**3), 2)
markdown += f"Disk total | {disk_total_gb} GB\n"
markdown += f"Disk used | {disk_used_gb} GB ({stats['disk_percent']}%)\n"
markdown += f"Disk free | {disk_free_gb} GB\n"
markdown += "\n"
except Exception: # noqa: BLE001
# Broad exception catch for robustness in support package generation
markdown += "## Host resource usage\n\n"
markdown += "Unable to collect host resource information\n\n"
return markdown
async def _get_addon_resources_markdown(self, hass: HomeAssistant) -> str:
"""Get add-on resource usage markdown for hassio."""
markdown = ""
try:
supervisor_info = get_supervisor_info(hass) or {}
addons_stats = get_addons_stats(hass)
addons = supervisor_info.get("addons", [])
if addons:
markdown += "## Add-on resource usage\n\n"
markdown += "<details><summary>Add-on resources</summary>\n\n"
markdown += "Add-on | Version | State | CPU | Memory\n"
markdown += "--- | --- | --- | --- | ---\n"
for addon in addons:
slug = addon.get("slug", "unknown")
name = addon.get("name", slug)
version = addon.get("version", "unknown")
state = addon.get("state", "unknown")
addon_stats = addons_stats.get(slug, {})
cpu = addon_stats.get("cpu_percent")
memory = addon_stats.get("memory_percent")
cpu_str = f"{cpu}%" if cpu is not None else "N/A"
memory_str = f"{memory}%" if memory is not None else "N/A"
markdown += (
f"{name} | {version} | {state} | {cpu_str} | {memory_str}\n"
)
markdown += "\n</details>\n\n"
except Exception: # noqa: BLE001
# Broad exception catch for robustness in support package generation
markdown += "## Add-on resource usage\n\n"
markdown += "Unable to collect add-on resource information\n\n"
return markdown
async def get(self, request: web.Request) -> web.Response:
"""Download support package file."""

View File

@@ -5,8 +5,7 @@
"alexa",
"assist_pipeline",
"backup",
"google_assistant",
"hassio"
"google_assistant"
],
"codeowners": ["@home-assistant/cloud"],
"dependencies": ["auth", "http", "repairs", "webhook", "web_rtc"],
@@ -14,6 +13,6 @@
"integration_type": "system",
"iot_class": "cloud_push",
"loggers": ["acme", "hass_nabucasa", "snitun"],
"requirements": ["hass-nabucasa==1.9.0", "psutil-home-assistant==0.0.1"],
"requirements": ["hass-nabucasa==1.9.0"],
"single_config_entry": true
}

View File

@@ -49,11 +49,11 @@ def setup_platform(
discovery_info: DiscoveryInfoType | None = None,
) -> None:
"""Set up the Concord232 alarm control panel platform."""
name = config[CONF_NAME]
code = config.get(CONF_CODE)
mode = config[CONF_MODE]
host = config[CONF_HOST]
port = config[CONF_PORT]
name: str = config[CONF_NAME]
code: str | None = config.get(CONF_CODE)
mode: str = config[CONF_MODE]
host: str = config[CONF_HOST]
port: int = config[CONF_PORT]
url = f"http://{host}:{port}"
@@ -72,7 +72,7 @@ class Concord232Alarm(AlarmControlPanelEntity):
| AlarmControlPanelEntityFeature.ARM_AWAY
)
def __init__(self, url, name, code, mode):
def __init__(self, url: str, name: str, code: str | None, mode: str) -> None:
"""Initialize the Concord232 alarm panel."""
self._attr_name = name
@@ -125,7 +125,7 @@ class Concord232Alarm(AlarmControlPanelEntity):
return
self._alarm.arm("away")
def _validate_code(self, code, state):
def _validate_code(self, code: str | None, state: AlarmControlPanelState) -> bool:
"""Validate given code."""
if self._code is None:
return True

View File

@@ -4,6 +4,7 @@ from __future__ import annotations
import datetime
import logging
from typing import Any
from concord232 import client as concord232_client
import requests
@@ -29,8 +30,7 @@ CONF_ZONE_TYPES = "zone_types"
DEFAULT_HOST = "localhost"
DEFAULT_NAME = "Alarm"
DEFAULT_PORT = "5007"
DEFAULT_SSL = False
DEFAULT_PORT = 5007
SCAN_INTERVAL = datetime.timedelta(seconds=10)
@@ -56,10 +56,10 @@ def setup_platform(
) -> None:
"""Set up the Concord232 binary sensor platform."""
host = config[CONF_HOST]
port = config[CONF_PORT]
exclude = config[CONF_EXCLUDE_ZONES]
zone_types = config[CONF_ZONE_TYPES]
host: str = config[CONF_HOST]
port: int = config[CONF_PORT]
exclude: list[int] = config[CONF_EXCLUDE_ZONES]
zone_types: dict[int, BinarySensorDeviceClass] = config[CONF_ZONE_TYPES]
sensors = []
try:
@@ -84,7 +84,6 @@ def setup_platform(
if zone["number"] not in exclude:
sensors.append(
Concord232ZoneSensor(
hass,
client,
zone,
zone_types.get(zone["number"], get_opening_type(zone)),
@@ -110,26 +109,25 @@ def get_opening_type(zone):
class Concord232ZoneSensor(BinarySensorEntity):
"""Representation of a Concord232 zone as a sensor."""
def __init__(self, hass, client, zone, zone_type):
def __init__(
self,
client: concord232_client.Client,
zone: dict[str, Any],
zone_type: BinarySensorDeviceClass,
) -> None:
"""Initialize the Concord232 binary sensor."""
self._hass = hass
self._client = client
self._zone = zone
self._number = zone["number"]
self._zone_type = zone_type
self._attr_device_class = zone_type
@property
def device_class(self) -> BinarySensorDeviceClass:
"""Return the class of this sensor, from DEVICE_CLASSES."""
return self._zone_type
@property
def name(self):
def name(self) -> str:
"""Return the name of the binary sensor."""
return self._zone["name"]
@property
def is_on(self):
def is_on(self) -> bool:
"""Return true if the binary sensor is on."""
# True means "faulted" or "open" or "abnormal state"
return bool(self._zone["state"] != "Normal")
@@ -145,5 +143,5 @@ class Concord232ZoneSensor(BinarySensorEntity):
if hasattr(self._client, "zones"):
self._zone = next(
(x for x in self._client.zones if x["number"] == self._number), None
x for x in self._client.zones if x["number"] == self._number
)

View File

@@ -18,6 +18,7 @@ from homeassistant.const import (
from homeassistant.core import HomeAssistant
from homeassistant.helpers import config_validation as cv, discovery
from homeassistant.helpers.typing import ConfigType
from homeassistant.util.hass_dict import HassKey
_LOGGER = logging.getLogger(__name__)
@@ -35,7 +36,7 @@ DEFAULT_REPORT_SERVER_PORT = 52010
DEFAULT_VERSION = "GATE-01"
DOMAIN = "egardia"
EGARDIA_DEVICE = "egardiadevice"
EGARDIA_DEVICE: HassKey[egardiadevice.EgardiaDevice] = HassKey(DOMAIN)
EGARDIA_NAME = "egardianame"
EGARDIA_REPORT_SERVER_CODES = "egardia_rs_codes"
EGARDIA_REPORT_SERVER_ENABLED = "egardia_rs_enabled"

View File

@@ -4,6 +4,7 @@ from __future__ import annotations
import logging
from pythonegardia.egardiadevice import EgardiaDevice
import requests
from homeassistant.components.alarm_control_panel import (
@@ -11,6 +12,7 @@ from homeassistant.components.alarm_control_panel import (
AlarmControlPanelEntityFeature,
AlarmControlPanelState,
)
from homeassistant.const import CONF_NAME
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
@@ -47,10 +49,10 @@ def setup_platform(
if discovery_info is None:
return
device = EgardiaAlarm(
discovery_info["name"],
discovery_info[CONF_NAME],
hass.data[EGARDIA_DEVICE],
discovery_info[CONF_REPORT_SERVER_ENABLED],
discovery_info.get(CONF_REPORT_SERVER_CODES),
discovery_info[CONF_REPORT_SERVER_CODES],
discovery_info[CONF_REPORT_SERVER_PORT],
)
@@ -67,8 +69,13 @@ class EgardiaAlarm(AlarmControlPanelEntity):
)
def __init__(
self, name, egardiasystem, rs_enabled=False, rs_codes=None, rs_port=52010
):
self,
name: str,
egardiasystem: EgardiaDevice,
rs_enabled: bool,
rs_codes: dict[str, list[str]],
rs_port: int,
) -> None:
"""Initialize the Egardia alarm."""
self._attr_name = name
self._egardiasystem = egardiasystem
@@ -85,9 +92,7 @@ class EgardiaAlarm(AlarmControlPanelEntity):
@property
def should_poll(self) -> bool:
"""Poll if no report server is enabled."""
if not self._rs_enabled:
return True
return False
return not self._rs_enabled
def handle_status_event(self, event):
"""Handle the Egardia system status event."""

View File

@@ -2,11 +2,12 @@
from __future__ import annotations
from pythonegardia.egardiadevice import EgardiaDevice
from homeassistant.components.binary_sensor import (
BinarySensorDeviceClass,
BinarySensorEntity,
)
from homeassistant.const import STATE_OFF, STATE_ON
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
@@ -51,30 +52,20 @@ async def async_setup_platform(
class EgardiaBinarySensor(BinarySensorEntity):
"""Represents a sensor based on an Egardia sensor (IR, Door Contact)."""
def __init__(self, sensor_id, name, egardia_system, device_class):
def __init__(
self,
sensor_id: str,
name: str,
egardia_system: EgardiaDevice,
device_class: BinarySensorDeviceClass | None,
) -> None:
"""Initialize the sensor device."""
self._id = sensor_id
self._name = name
self._state = None
self._device_class = device_class
self._attr_name = name
self._attr_device_class = device_class
self._egardia_system = egardia_system
def update(self) -> None:
"""Update the status."""
egardia_input = self._egardia_system.getsensorstate(self._id)
self._state = STATE_ON if egardia_input else STATE_OFF
@property
def name(self):
"""Return the name of the device."""
return self._name
@property
def is_on(self):
"""Whether the device is switched on."""
return self._state == STATE_ON
@property
def device_class(self) -> BinarySensorDeviceClass | None:
"""Return the device class."""
return self._device_class
self._attr_is_on = bool(egardia_input)

View File

@@ -14,7 +14,7 @@
"name": "[%key:component::fan::common::condition_behavior_name%]"
}
},
"name": "If a fan is off"
"name": "Fan is off"
},
"is_on": {
"description": "Tests if one or more fans are on.",
@@ -24,7 +24,7 @@
"name": "[%key:component::fan::common::condition_behavior_name%]"
}
},
"name": "If a fan is on"
"name": "Fan is on"
}
},
"device_automation": {

View File

@@ -13,7 +13,7 @@
"requirements": [
"xknx==3.14.0",
"xknxproject==3.8.2",
"knx-frontend==2025.12.30.151231"
"knx-frontend==2026.1.15.112308"
],
"single_config_entry": true
}

View File

@@ -49,7 +49,7 @@
"name": "[%key:component::light::common::condition_behavior_name%]"
}
},
"name": "If a light is off"
"name": "Light is off"
},
"is_on": {
"description": "Tests if one or more lights are on.",
@@ -59,7 +59,7 @@
"name": "[%key:component::light::common::condition_behavior_name%]"
}
},
"name": "If a light is on"
"name": "Light is on"
}
},
"device_automation": {

View File

@@ -2,9 +2,10 @@
from __future__ import annotations
from datetime import timedelta
from datetime import datetime, timedelta
import logging
from operator import itemgetter
from typing import Any
import oasatelematics
import voluptuous as vol
@@ -55,9 +56,9 @@ def setup_platform(
discovery_info: DiscoveryInfoType | None = None,
) -> None:
"""Set up the OASA Telematics sensor."""
name = config[CONF_NAME]
stop_id = config[CONF_STOP_ID]
route_id = config.get(CONF_ROUTE_ID)
name: str = config[CONF_NAME]
stop_id: str = config[CONF_STOP_ID]
route_id: str = config[CONF_ROUTE_ID]
data = OASATelematicsData(stop_id, route_id)
@@ -68,42 +69,31 @@ class OASATelematicsSensor(SensorEntity):
"""Implementation of the OASA Telematics sensor."""
_attr_attribution = "Data retrieved from telematics.oasa.gr"
_attr_device_class = SensorDeviceClass.TIMESTAMP
_attr_icon = "mdi:bus"
def __init__(self, data, stop_id, route_id, name):
def __init__(
self, data: OASATelematicsData, stop_id: str, route_id: str, name: str
) -> None:
"""Initialize the sensor."""
self.data = data
self._name = name
self._attr_name = name
self._stop_id = stop_id
self._route_id = route_id
self._name_data = self._times = self._state = None
self._name_data: dict[str, Any] | None = None
self._times: list[dict[str, Any]] | None = None
@property
def name(self):
"""Return the name of the sensor."""
return self._name
@property
def device_class(self) -> SensorDeviceClass:
"""Return the class of this sensor."""
return SensorDeviceClass.TIMESTAMP
@property
def native_value(self):
"""Return the state of the sensor."""
return self._state
@property
def extra_state_attributes(self):
def extra_state_attributes(self) -> dict[str, Any]:
"""Return the state attributes."""
params = {}
if self._times is not None:
next_arrival_data = self._times[0]
if ATTR_NEXT_ARRIVAL in next_arrival_data:
next_arrival = next_arrival_data[ATTR_NEXT_ARRIVAL]
next_arrival: datetime = next_arrival_data[ATTR_NEXT_ARRIVAL]
params.update({ATTR_NEXT_ARRIVAL: next_arrival.isoformat()})
if len(self._times) > 1:
second_next_arrival_time = self._times[1][ATTR_NEXT_ARRIVAL]
second_next_arrival_time: datetime = self._times[1][ATTR_NEXT_ARRIVAL]
if second_next_arrival_time is not None:
second_arrival = second_next_arrival_time
params.update(
@@ -115,12 +105,13 @@ class OASATelematicsSensor(SensorEntity):
ATTR_STOP_ID: self._stop_id,
}
)
params.update(
{
ATTR_ROUTE_NAME: self._name_data[ATTR_ROUTE_NAME],
ATTR_STOP_NAME: self._name_data[ATTR_STOP_NAME],
}
)
if self._name_data is not None:
params.update(
{
ATTR_ROUTE_NAME: self._name_data[ATTR_ROUTE_NAME],
ATTR_STOP_NAME: self._name_data[ATTR_STOP_NAME],
}
)
return {k: v for k, v in params.items() if v}
def update(self) -> None:
@@ -130,7 +121,7 @@ class OASATelematicsSensor(SensorEntity):
self._name_data = self.data.name_data
next_arrival_data = self._times[0]
if ATTR_NEXT_ARRIVAL in next_arrival_data:
self._state = next_arrival_data[ATTR_NEXT_ARRIVAL]
self._attr_native_value = next_arrival_data[ATTR_NEXT_ARRIVAL]
class OASATelematicsData:

View File

@@ -1,265 +0,0 @@
"""Audio helper for SmartThings audio notifications."""
from __future__ import annotations
import asyncio
import contextlib
from dataclasses import dataclass
from datetime import timedelta
import logging
import secrets
from aiohttp import hdrs, web
from homeassistant.components import ffmpeg
from homeassistant.components.http import HomeAssistantView
from homeassistant.core import HomeAssistant, callback
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.network import NoURLAvailableError, get_url
from .const import DOMAIN
_LOGGER = logging.getLogger(__name__)
PCM_SAMPLE_RATE = 24000
PCM_SAMPLE_WIDTH = 2
PCM_CHANNELS = 1
PCM_MIME = "audio/L16"
PCM_EXTENSION = ".pcm"
WARNING_DURATION_SECONDS = 40
FFMPEG_MAX_DURATION_SECONDS = 10 * 60
TRANSCODE_TIMEOUT_SECONDS = WARNING_DURATION_SECONDS + 10
_TRUNCATION_EPSILON = 1 / PCM_SAMPLE_RATE
ENTRY_TTL = timedelta(minutes=5)
MAX_STORED_ENTRIES = 4 # Limit the number of cached notifications.
PCM_FRAME_BYTES = PCM_SAMPLE_WIDTH * PCM_CHANNELS
DATA_AUDIO_MANAGER = "audio_manager"
class SmartThingsAudioError(HomeAssistantError):
"""Error raised when SmartThings audio preparation fails."""
@dataclass
class _AudioEntry:
"""Stored PCM audio entry."""
pcm: bytes
created: float
expires: float
class SmartThingsAudioManager(HomeAssistantView):
"""Manage PCM proxy URLs for SmartThings audio notifications."""
url = "/api/smartthings/audio/{token}"
name = "api:smartthings:audio"
requires_auth = False
def __init__(self, hass: HomeAssistant) -> None:
"""Initialize the manager."""
self.hass = hass
self._entries: dict[str, _AudioEntry] = {}
self._cleanup_handle: asyncio.TimerHandle | None = None
async def async_prepare_notification(self, source_url: str) -> str:
"""Generate an externally accessible PCM URL for SmartThings."""
pcm, duration, truncated = await self._transcode_to_pcm(source_url)
if not pcm:
raise SmartThingsAudioError("Converted audio is empty")
if truncated:
_LOGGER.warning(
"SmartThings audio notification truncated to %s seconds (output length %.1fs); longer sources may be cut off",
FFMPEG_MAX_DURATION_SECONDS,
duration,
)
elif duration > WARNING_DURATION_SECONDS:
_LOGGER.warning(
"SmartThings audio notification is %.1fs; playback over %s seconds may be cut off",
duration,
WARNING_DURATION_SECONDS,
)
token = secrets.token_urlsafe(
16
) # Shorter tokens avoid playback issues in some devices.
now = self.hass.loop.time()
entry = _AudioEntry(
pcm=pcm,
created=now,
expires=now + ENTRY_TTL.total_seconds(),
)
self._cleanup(now)
while token in self._entries:
token = secrets.token_urlsafe(16)
self._entries[token] = entry
while len(self._entries) > MAX_STORED_ENTRIES:
dropped_token = next(iter(self._entries))
self._entries.pop(dropped_token, None)
_LOGGER.debug(
"Dropped oldest SmartThings audio token %s to cap cache",
dropped_token,
)
self._schedule_cleanup()
path = f"/api/smartthings/audio/{token}{PCM_EXTENSION}"
try:
base_url = get_url(
self.hass,
allow_internal=True,
allow_external=True,
allow_cloud=True,
prefer_external=False, # Prevent NAT loopback failures; may break non-local access for devices outside the LAN.
prefer_cloud=True,
)
except NoURLAvailableError as err:
self._entries.pop(token, None)
self._schedule_cleanup()
raise SmartThingsAudioError(
"SmartThings audio notifications require an accessible Home Assistant URL"
) from err
return f"{base_url}{path}"
async def get(self, request: web.Request, token: str) -> web.StreamResponse:
"""Serve a PCM audio response."""
token = token.removesuffix(PCM_EXTENSION)
now = self.hass.loop.time()
self._cleanup(now)
self._schedule_cleanup()
entry = self._entries.get(token)
if entry is None:
raise web.HTTPNotFound
_LOGGER.debug("Serving SmartThings audio token=%s to %s", token, request.remote)
response = web.Response(body=entry.pcm, content_type=PCM_MIME)
response.headers[hdrs.CACHE_CONTROL] = "no-store"
response.headers[hdrs.ACCEPT_RANGES] = "none"
response.headers[hdrs.CONTENT_DISPOSITION] = (
f'inline; filename="{token}{PCM_EXTENSION}"'
)
return response
async def _transcode_to_pcm(self, source_url: str) -> tuple[bytes, float, bool]:
"""Use ffmpeg to convert the source media to 24kHz mono PCM."""
manager = ffmpeg.get_ffmpeg_manager(self.hass)
command = [
manager.binary,
"-hide_banner",
"-loglevel",
"error",
"-nostdin",
"-i",
source_url,
"-ac",
str(PCM_CHANNELS),
"-ar",
str(PCM_SAMPLE_RATE),
"-c:a",
"pcm_s16le",
"-t",
str(FFMPEG_MAX_DURATION_SECONDS),
"-f",
"s16le",
"pipe:1",
]
try:
process = await asyncio.create_subprocess_exec(
*command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
except FileNotFoundError as err:
raise SmartThingsAudioError(
"FFmpeg is required for SmartThings audio notifications"
) from err
try:
stdout, stderr = await asyncio.wait_for(
process.communicate(), timeout=TRANSCODE_TIMEOUT_SECONDS
)
except TimeoutError:
_LOGGER.warning(
"FFmpeg timed out after %s seconds while converting SmartThings audio from %s",
TRANSCODE_TIMEOUT_SECONDS,
source_url,
)
with contextlib.suppress(ProcessLookupError):
process.kill()
stdout, stderr = await process.communicate()
if process.returncode != 0:
message = stderr.decode().strip() or "unknown error"
_LOGGER.error(
"FFmpeg failed to convert SmartThings audio from %s: %s",
source_url,
message,
)
raise SmartThingsAudioError(
"Unable to convert audio to PCM for SmartThings"
)
if not stdout:
return b"", 0.0, False
frame_count, remainder = divmod(len(stdout), PCM_FRAME_BYTES)
if remainder:
_LOGGER.debug(
"SmartThings audio conversion produced misaligned PCM: dropping %s extra byte(s)",
remainder,
)
stdout = stdout[: len(stdout) - remainder]
frame_count = len(stdout) // PCM_FRAME_BYTES
if frame_count == 0:
return b"", 0.0, False
duration = frame_count / PCM_SAMPLE_RATE
truncated = duration >= (FFMPEG_MAX_DURATION_SECONDS - _TRUNCATION_EPSILON)
return stdout, duration, truncated
@callback
def _schedule_cleanup(self) -> None:
"""Schedule the next cleanup based on entry expiry."""
if self._cleanup_handle is not None:
self._cleanup_handle.cancel()
self._cleanup_handle = None
if not self._entries:
return
next_expiry = min(entry.expires for entry in self._entries.values())
delay = max(0.0, next_expiry - self.hass.loop.time())
self._cleanup_handle = self.hass.loop.call_later(delay, self._cleanup_callback)
@callback
def _cleanup_callback(self) -> None:
"""Run a cleanup pass."""
self._cleanup_handle = None
now = self.hass.loop.time()
self._cleanup(now)
self._schedule_cleanup()
def _cleanup(self, now: float) -> None:
"""Remove expired entries."""
expired = [
token for token, entry in self._entries.items() if entry.expires <= now
]
for token in expired:
self._entries.pop(token, None)
async def async_get_audio_manager(hass: HomeAssistant) -> SmartThingsAudioManager:
"""Return the shared SmartThings audio manager."""
domain_data = hass.data.setdefault(DOMAIN, {})
if (manager := domain_data.get(DATA_AUDIO_MANAGER)) is None:
manager = SmartThingsAudioManager(hass)
hass.http.register_view(manager)
domain_data[DATA_AUDIO_MANAGER] = manager
return manager

View File

@@ -3,7 +3,7 @@
"name": "SmartThings",
"codeowners": ["@joostlek"],
"config_flow": true,
"dependencies": ["application_credentials", "http", "ffmpeg"],
"dependencies": ["application_credentials"],
"dhcp": [
{
"hostname": "st*",

View File

@@ -6,22 +6,17 @@ from typing import Any
from pysmartthings import Attribute, Capability, Category, Command, SmartThings
from homeassistant.components import media_source
from homeassistant.components.media_player import (
MediaPlayerDeviceClass,
MediaPlayerEntity,
MediaPlayerEntityFeature,
MediaPlayerState,
MediaType,
RepeatMode,
async_process_play_media_url,
)
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from . import FullDevice, SmartThingsConfigEntry
from .audio import SmartThingsAudioError, async_get_audio_manager
from .const import MAIN
from .entity import SmartThingsEntity
@@ -89,7 +84,6 @@ class SmartThingsMediaPlayer(SmartThingsEntity, MediaPlayerEntity):
Capability.AUDIO_MUTE,
Capability.AUDIO_TRACK_DATA,
Capability.AUDIO_VOLUME,
Capability.AUDIO_NOTIFICATION,
Capability.MEDIA_INPUT_SOURCE,
Capability.MEDIA_PLAYBACK,
Capability.MEDIA_PLAYBACK_REPEAT,
@@ -134,8 +128,6 @@ class SmartThingsMediaPlayer(SmartThingsEntity, MediaPlayerEntity):
flags |= MediaPlayerEntityFeature.SHUFFLE_SET
if self.supports_capability(Capability.MEDIA_PLAYBACK_REPEAT):
flags |= MediaPlayerEntityFeature.REPEAT_SET
if self.supports_capability(Capability.AUDIO_NOTIFICATION):
flags |= MediaPlayerEntityFeature.PLAY_MEDIA
return flags
async def async_turn_off(self, **kwargs: Any) -> None:
@@ -241,40 +233,6 @@ class SmartThingsMediaPlayer(SmartThingsEntity, MediaPlayerEntity):
argument=HA_REPEAT_MODE_TO_SMARTTHINGS[repeat],
)
async def async_play_media(
self, media_type: MediaType | str, media_id: str, **kwargs: Any
) -> None:
"""Play media using SmartThings audio notifications."""
if not self.supports_capability(Capability.AUDIO_NOTIFICATION):
raise HomeAssistantError("Device does not support audio notifications")
if media_type not in (MediaType.MUSIC,):
raise HomeAssistantError(
"Unsupported media type for SmartThings audio notification"
)
if media_source.is_media_source_id(media_id):
play_item = await media_source.async_resolve_media(
self.hass, media_id, self.entity_id
)
media_id = async_process_play_media_url(self.hass, play_item.url)
else:
media_id = async_process_play_media_url(self.hass, media_id)
audio_manager = await async_get_audio_manager(self.hass)
try:
proxy_url = await audio_manager.async_prepare_notification(media_id)
except SmartThingsAudioError as err:
raise HomeAssistantError(str(err)) from err
command = Command("playTrackAndResume")
await self.execute_device_command(
Capability.AUDIO_NOTIFICATION,
command,
argument=[proxy_url],
)
@property
def media_title(self) -> str | None:
"""Title of current playing media."""

3
requirements_all.txt generated
View File

@@ -1351,7 +1351,7 @@ kiwiki-client==0.1.1
knocki==0.4.2
# homeassistant.components.knx
knx-frontend==2025.12.30.151231
knx-frontend==2026.1.15.112308
# homeassistant.components.konnected
konnected==1.2.0
@@ -1776,7 +1776,6 @@ prowlpy==1.1.1
# homeassistant.components.proxmoxve
proxmoxer==2.0.1
# homeassistant.components.cloud
# homeassistant.components.hardware
# homeassistant.components.recorder
# homeassistant.components.systemmonitor

View File

@@ -1185,7 +1185,7 @@ kegtron-ble==1.0.2
knocki==0.4.2
# homeassistant.components.knx
knx-frontend==2025.12.30.151231
knx-frontend==2026.1.15.112308
# homeassistant.components.konnected
konnected==1.2.0
@@ -1522,7 +1522,6 @@ prometheus-client==0.21.0
# homeassistant.components.prowl
prowlpy==1.1.1
# homeassistant.components.cloud
# homeassistant.components.hardware
# homeassistant.components.recorder
# homeassistant.components.systemmonitor

View File

@@ -87,18 +87,6 @@
</details>
## Host resource usage
Resource | Value
--- | ---
CPU usage | 25.5%
Memory total | 16.0 GB
Memory used | 8.0 GB (50.0%)
Memory available | 8.0 GB
Disk total | 500.0 GB
Disk used | 200.0 GB (40.0%)
Disk free | 300.0 GB
## Full logs
<details><summary>Logs</summary>
@@ -193,18 +181,6 @@
</details>
## Host resource usage
Resource | Value
--- | ---
CPU usage | 25.5%
Memory total | 16.0 GB
Memory used | 8.0 GB (50.0%)
Memory available | 8.0 GB
Disk total | 500.0 GB
Disk used | 200.0 GB (40.0%)
Disk free | 300.0 GB
## Full logs
<details><summary>Logs</summary>
@@ -220,252 +196,6 @@
'''
# ---
# name: test_download_support_package_hassio
'''
## System Information
version | core-2025.2.0
--- | ---
installation_type | Home Assistant OS
dev | False
hassio | True
docker | True
container_arch | aarch64
user | root
virtualenv | False
python_version | 3.13.1
os_name | Linux
os_version | 6.12.9
arch | aarch64
timezone | US/Pacific
config_dir | config
## Active Integrations
Built-in integrations: 23
Custom integrations: 1
<details><summary>Built-in integrations</summary>
Domain | Name
--- | ---
ai_task | AI Task
auth | Auth
binary_sensor | Binary Sensor
cloud | Home Assistant Cloud
cloud.ai_task | Unknown
cloud.binary_sensor | Unknown
cloud.conversation | Unknown
cloud.stt | Unknown
cloud.tts | Unknown
conversation | Conversation
ffmpeg | FFmpeg
hassio | hassio
homeassistant | Home Assistant Core Integration
http | HTTP
intent | Intent
media_source | Media Source
mock_no_info_integration | mock_no_info_integration
repairs | Repairs
stt | Speech-to-text (STT)
system_health | System Health
tts | Text-to-speech (TTS)
web_rtc | WebRTC
webhook | Webhook
</details>
<details><summary>Custom integrations</summary>
Domain | Name | Version | Documentation
--- | --- | --- | ---
test | Test Components | 1.2.3 | http://example.com
</details>
<details><summary>hassio</summary>
host_os | Home Assistant OS 14.0
--- | ---
update_channel | stable
supervisor_version | supervisor-2025.01.0
agent_version | 1.6.0
docker_version | 27.4.1
disk_total | 128.5 GB
disk_used | 45.2 GB
healthy | True
supported | True
host_connectivity | True
supervisor_connectivity | True
board | green
supervisor_api | ok
version_api | ok
installed_addons | Mosquitto broker (6.4.1), Samba share (12.3.2), Visual Studio Code (5.21.2)
</details>
<details><summary>mock_no_info_integration</summary>
No information available
</details>
<details><summary>cloud</summary>
logged_in | True
--- | ---
subscription_expiration | 2025-01-17T11:19:31+00:00
relayer_connected | True
relayer_region | xx-earth-616
remote_enabled | True
remote_connected | False
alexa_enabled | True
google_enabled | False
cloud_ice_servers_enabled | True
remote_server | us-west-1
certificate_status | ready
instance_id | 12345678901234567890
can_reach_cert_server | Exception: Unexpected exception
can_reach_cloud_auth | Failed: unreachable
can_reach_cloud | ok
</details>
## Host resource usage
Resource | Value
--- | ---
CPU usage | 25.5%
Memory total | 16.0 GB
Memory used | 8.0 GB (50.0%)
Memory available | 8.0 GB
Disk total | 500.0 GB
Disk used | 200.0 GB (40.0%)
Disk free | 300.0 GB
## Add-on resource usage
<details><summary>Add-on resources</summary>
Add-on | Version | State | CPU | Memory
--- | --- | --- | --- | ---
Mosquitto broker | 6.4.1 | started | 0.5% | 1.2%
Samba share | 12.3.2 | started | 0.1% | 0.8%
Visual Studio Code | 5.21.2 | stopped | N/A | N/A
</details>
## Full logs
<details><summary>Logs</summary>
```logs
2025-02-10 12:00:00.000 INFO (MainThread) [hass_nabucasa.iot] Hass nabucasa log
2025-02-10 12:00:00.000 WARNING (MainThread) [snitun.utils.aiohttp_client] Snitun log
2025-02-10 12:00:00.000 ERROR (MainThread) [homeassistant.components.cloud.client] Cloud log
```
</details>
'''
# ---
# name: test_download_support_package_host_resources
'''
## System Information
version | core-2025.2.0
--- | ---
installation_type | Home Assistant Container
dev | False
hassio | False
docker | True
container_arch | x86_64
user | root
virtualenv | False
python_version | 3.13.1
os_name | Linux
os_version | 6.12.9
arch | x86_64
timezone | US/Pacific
config_dir | config
## Active Integrations
Built-in integrations: 21
Custom integrations: 0
<details><summary>Built-in integrations</summary>
Domain | Name
--- | ---
ai_task | AI Task
auth | Auth
binary_sensor | Binary Sensor
cloud | Home Assistant Cloud
cloud.ai_task | Unknown
cloud.binary_sensor | Unknown
cloud.conversation | Unknown
cloud.stt | Unknown
cloud.tts | Unknown
conversation | Conversation
ffmpeg | FFmpeg
homeassistant | Home Assistant Core Integration
http | HTTP
intent | Intent
media_source | Media Source
repairs | Repairs
stt | Speech-to-text (STT)
system_health | System Health
tts | Text-to-speech (TTS)
web_rtc | WebRTC
webhook | Webhook
</details>
<details><summary>cloud</summary>
logged_in | True
--- | ---
subscription_expiration | 2025-01-17T11:19:31+00:00
relayer_connected | True
relayer_region | xx-earth-616
remote_enabled | True
remote_connected | False
alexa_enabled | True
google_enabled | False
cloud_ice_servers_enabled | True
remote_server | us-west-1
certificate_status | ready
instance_id | 12345678901234567890
can_reach_cert_server | Exception: Unexpected exception
can_reach_cloud_auth | Failed: unreachable
can_reach_cloud | ok
</details>
## Host resource usage
Resource | Value
--- | ---
CPU usage | 25.5%
Memory total | 16.0 GB
Memory used | 8.0 GB (50.0%)
Memory available | 8.0 GB
Disk total | 500.0 GB
Disk used | 200.0 GB (40.0%)
Disk free | 300.0 GB
## Full logs
<details><summary>Logs</summary>
```logs
2025-02-10 12:00:00.000 INFO (MainThread) [hass_nabucasa.iot] Hass nabucasa log
```
</details>
'''
# ---
# name: test_download_support_package_integration_load_error
'''
## System Information
@@ -516,18 +246,6 @@
</details>
## Host resource usage
Resource | Value
--- | ---
CPU usage | 25.5%
Memory total | 16.0 GB
Memory used | 8.0 GB (50.0%)
Memory available | 8.0 GB
Disk total | 500.0 GB
Disk used | 200.0 GB (40.0%)
Disk free | 300.0 GB
## Full logs
<details><summary>Logs</summary>

View File

@@ -1,6 +1,6 @@
"""Tests for the HTTP API for the cloud component."""
from collections.abc import Callable, Coroutine, Generator
from collections.abc import Callable, Coroutine
from copy import deepcopy
import datetime
from http import HTTPStatus
@@ -114,36 +114,6 @@ PIPELINE_DATA_OTHER = {
SUBSCRIPTION_INFO_URL = "https://api-test.hass.io/payments/subscription_info"
@pytest.fixture
def mock_psutil_wrapper() -> Generator[MagicMock]:
"""Fixture to mock psutil for support package tests."""
mock_memory = MagicMock()
mock_memory.total = 16 * 1024**3 # 16 GB
mock_memory.used = 8 * 1024**3 # 8 GB
mock_memory.available = 8 * 1024**3 # 8 GB
mock_memory.percent = 50.0
mock_disk = MagicMock()
mock_disk.total = 500 * 1024**3 # 500 GB
mock_disk.used = 200 * 1024**3 # 200 GB
mock_disk.free = 300 * 1024**3 # 300 GB
mock_disk.percent = 40.0
mock_psutil = MagicMock()
mock_psutil.cpu_percent = MagicMock(return_value=25.5)
mock_psutil.virtual_memory = MagicMock(return_value=mock_memory)
mock_psutil.disk_usage = MagicMock(return_value=mock_disk)
mock_wrapper = MagicMock()
mock_wrapper.psutil = mock_psutil
with patch(
"homeassistant.components.cloud.http_api.ha_psutil.PsutilWrapper",
return_value=mock_wrapper,
):
yield mock_wrapper
@pytest.fixture(name="setup_cloud")
async def setup_cloud_fixture(hass: HomeAssistant, cloud: MagicMock) -> None:
"""Fixture that sets up cloud."""
@@ -1876,7 +1846,7 @@ async def test_logout_view_dispatch_event(
@patch("homeassistant.components.cloud.helpers.FixedSizeQueueLogHandler.MAX_RECORDS", 3)
@pytest.mark.usefixtures("enable_custom_integrations", "mock_psutil_wrapper")
@pytest.mark.usefixtures("enable_custom_integrations")
async def test_download_support_package(
hass: HomeAssistant,
cloud: MagicMock,
@@ -1989,7 +1959,7 @@ async def test_download_support_package(
assert await req.text() == snapshot
@pytest.mark.usefixtures("enable_custom_integrations", "mock_psutil_wrapper")
@pytest.mark.usefixtures("enable_custom_integrations")
async def test_download_support_package_custom_components_error(
hass: HomeAssistant,
cloud: MagicMock,
@@ -2016,7 +1986,7 @@ async def test_download_support_package_custom_components_error(
async def mock_empty_info(hass: HomeAssistant) -> dict[str, Any]:
return {}
register.async_register_info(mock_empty_info, "/mock_integration")
register.async_register_info(mock_empty_info, "/config/mock_integration")
mock_platform(
hass,
@@ -2101,7 +2071,7 @@ async def test_download_support_package_custom_components_error(
assert await req.text() == snapshot
@pytest.mark.usefixtures("enable_custom_integrations", "mock_psutil_wrapper")
@pytest.mark.usefixtures("enable_custom_integrations")
async def test_download_support_package_integration_load_error(
hass: HomeAssistant,
cloud: MagicMock,
@@ -2128,7 +2098,7 @@ async def test_download_support_package_integration_load_error(
async def mock_empty_info(hass: HomeAssistant) -> dict[str, Any]:
return {}
register.async_register_info(mock_empty_info, "/mock_integration")
register.async_register_info(mock_empty_info, "/config/mock_integration")
mock_platform(
hass,
@@ -2218,277 +2188,6 @@ async def test_download_support_package_integration_load_error(
assert await req.text() == snapshot
@patch("homeassistant.components.cloud.helpers.FixedSizeQueueLogHandler.MAX_RECORDS", 3)
@pytest.mark.usefixtures("enable_custom_integrations", "mock_psutil_wrapper")
async def test_download_support_package_hassio(
hass: HomeAssistant,
cloud: MagicMock,
set_cloud_prefs: Callable[[dict[str, Any]], Coroutine[Any, Any, None]],
hass_client: ClientSessionGenerator,
aioclient_mock: AiohttpClientMocker,
freezer: FrozenDateTimeFactory,
snapshot: SnapshotAssertion,
) -> None:
"""Test downloading a support package file with hassio resources."""
aioclient_mock.get("https://cloud.bla.com/status", text="")
aioclient_mock.get(
"https://cert-server/directory", exc=Exception("Unexpected exception")
)
aioclient_mock.get(
"https://cognito-idp.us-east-1.amazonaws.com/AAAA/.well-known/jwks.json",
exc=aiohttp.ClientError,
)
def async_register_hassio_platform(
hass: HomeAssistant,
register: system_health.SystemHealthRegistration,
) -> None:
async def mock_hassio_info(hass: HomeAssistant) -> dict[str, Any]:
return {
"host_os": "Home Assistant OS 14.0",
"update_channel": "stable",
"supervisor_version": "supervisor-2025.01.0",
"agent_version": "1.6.0",
"docker_version": "27.4.1",
"disk_total": "128.5 GB",
"disk_used": "45.2 GB",
"healthy": True,
"supported": True,
"host_connectivity": True,
"supervisor_connectivity": True,
"board": "green",
"supervisor_api": "ok",
"version_api": "ok",
"installed_addons": "Mosquitto broker (6.4.1), Samba share (12.3.2), Visual Studio Code (5.21.2)",
}
register.async_register_info(mock_hassio_info, "/hassio/system")
mock_platform(
hass,
"hassio.system_health",
MagicMock(async_register=async_register_hassio_platform),
)
hass.config.components.add("hassio")
def async_register_mock_platform(
hass: HomeAssistant,
register: system_health.SystemHealthRegistration,
) -> None:
async def mock_empty_info(hass: HomeAssistant) -> dict[str, Any]:
return {}
register.async_register_info(mock_empty_info, "/config/mock_integration")
mock_platform(
hass,
"mock_no_info_integration.system_health",
MagicMock(async_register=async_register_mock_platform),
)
hass.config.components.add("mock_no_info_integration")
hass.config.components.add("test")
assert await async_setup_component(hass, "system_health", {})
with patch("uuid.UUID.hex", new_callable=PropertyMock) as hexmock:
hexmock.return_value = "12345678901234567890"
assert await async_setup_component(
hass,
DOMAIN,
{
DOMAIN: {
"user_pool_id": "AAAA",
"region": "us-east-1",
"acme_server": "cert-server",
"relayer_server": "cloud.bla.com",
},
},
)
await hass.async_block_till_done()
await cloud.login("test-user", "test-pass")
cloud.remote.snitun_server = "us-west-1"
cloud.remote.certificate_status = CertificateStatus.READY
cloud.expiration_date = dt_util.parse_datetime("2025-01-17T11:19:31.0+00:00")
await cloud.client.async_system_message({"region": "xx-earth-616"})
await set_cloud_prefs(
{
"alexa_enabled": True,
"google_enabled": False,
"remote_enabled": True,
"cloud_ice_servers_enabled": True,
}
)
now = dt_util.utcnow()
tz = now.astimezone().tzinfo
freezer.move_to(datetime.datetime(2025, 2, 10, 12, 0, 0, tzinfo=tz))
logging.getLogger("hass_nabucasa.iot").info(
"This message will be dropped since this test patches MAX_RECORDS"
)
logging.getLogger("hass_nabucasa.iot").info("Hass nabucasa log")
logging.getLogger("snitun.utils.aiohttp_client").warning("Snitun log")
logging.getLogger("homeassistant.components.cloud.client").error("Cloud log")
freezer.move_to(now)
cloud_client = await hass_client()
with (
patch.object(hass.config, "config_dir", new="config"),
patch(
"homeassistant.components.homeassistant.system_health.system_info.async_get_system_info",
return_value={
"installation_type": "Home Assistant OS",
"version": "2025.2.0",
"dev": False,
"hassio": True,
"virtualenv": False,
"python_version": "3.13.1",
"docker": True,
"container_arch": "aarch64",
"arch": "aarch64",
"timezone": "US/Pacific",
"os_name": "Linux",
"os_version": "6.12.9",
"user": "root",
},
),
patch(
"homeassistant.components.cloud.http_api.get_supervisor_info",
return_value={
"addons": [
{
"slug": "core_mosquitto",
"name": "Mosquitto broker",
"version": "6.4.1",
"state": "started",
},
{
"slug": "core_samba",
"name": "Samba share",
"version": "12.3.2",
"state": "started",
},
{
"slug": "a0d7b954_vscode",
"name": "Visual Studio Code",
"version": "5.21.2",
"state": "stopped",
},
],
},
),
patch(
"homeassistant.components.cloud.http_api.get_addons_stats",
return_value={
"core_mosquitto": {
"cpu_percent": 0.5,
"memory_percent": 1.2,
},
"core_samba": {
"cpu_percent": 0.1,
"memory_percent": 0.8,
},
# No stats for vscode (stopped)
},
),
):
req = await cloud_client.get("/api/cloud/support_package")
assert req.status == HTTPStatus.OK
assert await req.text() == snapshot
@patch("homeassistant.components.cloud.helpers.FixedSizeQueueLogHandler.MAX_RECORDS", 3)
@pytest.mark.usefixtures("mock_psutil_wrapper")
async def test_download_support_package_host_resources(
hass: HomeAssistant,
cloud: MagicMock,
set_cloud_prefs: Callable[[dict[str, Any]], Coroutine[Any, Any, None]],
hass_client: ClientSessionGenerator,
aioclient_mock: AiohttpClientMocker,
freezer: FrozenDateTimeFactory,
snapshot: SnapshotAssertion,
) -> None:
"""Test downloading a support package file with psutil host resources (non-hassio)."""
aioclient_mock.get("https://cloud.bla.com/status", text="")
aioclient_mock.get(
"https://cert-server/directory", exc=Exception("Unexpected exception")
)
aioclient_mock.get(
"https://cognito-idp.us-east-1.amazonaws.com/AAAA/.well-known/jwks.json",
exc=aiohttp.ClientError,
)
assert await async_setup_component(hass, "system_health", {})
with patch("uuid.UUID.hex", new_callable=PropertyMock) as hexmock:
hexmock.return_value = "12345678901234567890"
assert await async_setup_component(
hass,
DOMAIN,
{
DOMAIN: {
"user_pool_id": "AAAA",
"region": "us-east-1",
"acme_server": "cert-server",
"relayer_server": "cloud.bla.com",
},
},
)
await hass.async_block_till_done()
await cloud.login("test-user", "test-pass")
cloud.remote.snitun_server = "us-west-1"
cloud.remote.certificate_status = CertificateStatus.READY
cloud.expiration_date = dt_util.parse_datetime("2025-01-17T11:19:31.0+00:00")
await cloud.client.async_system_message({"region": "xx-earth-616"})
await set_cloud_prefs(
{
"alexa_enabled": True,
"google_enabled": False,
"remote_enabled": True,
"cloud_ice_servers_enabled": True,
}
)
now = dt_util.utcnow()
tz = now.astimezone().tzinfo
freezer.move_to(datetime.datetime(2025, 2, 10, 12, 0, 0, tzinfo=tz))
logging.getLogger("hass_nabucasa.iot").info("Hass nabucasa log")
freezer.move_to(now)
cloud_client = await hass_client()
with (
patch.object(hass.config, "config_dir", new="config"),
patch(
"homeassistant.components.homeassistant.system_health.system_info.async_get_system_info",
return_value={
"installation_type": "Home Assistant Container",
"version": "2025.2.0",
"dev": False,
"hassio": False,
"virtualenv": False,
"python_version": "3.13.1",
"docker": True,
"container_arch": "x86_64",
"arch": "x86_64",
"timezone": "US/Pacific",
"os_name": "Linux",
"os_version": "6.12.9",
"user": "root",
},
),
):
req = await cloud_client.get("/api/cloud/support_package")
assert req.status == HTTPStatus.OK
assert await req.text() == snapshot
async def test_websocket_ice_servers(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,

View File

@@ -1,7 +1,5 @@
"""Tests for the SmartThings integration."""
import sys
import types
from typing import Any
from unittest.mock import AsyncMock
@@ -92,38 +90,3 @@ async def trigger_health_update(
if call[0][0] == device_id:
call[0][1](event)
await hass.async_block_till_done()
def ensure_haffmpeg_stubs() -> None:
"""Ensure haffmpeg stubs are available for SmartThings tests."""
if "haffmpeg" in sys.modules:
return
haffmpeg_module = types.ModuleType("haffmpeg")
haffmpeg_core_module = types.ModuleType("haffmpeg.core")
haffmpeg_tools_module = types.ModuleType("haffmpeg.tools")
class _StubHAFFmpeg: ...
class _StubFFVersion:
def __init__(self, bin_path: str | None = None) -> None:
self.bin_path = bin_path
async def get_version(self) -> str:
return "4.0.0"
class _StubImageFrame: ...
haffmpeg_core_module.HAFFmpeg = _StubHAFFmpeg
haffmpeg_tools_module.IMAGE_JPEG = b""
haffmpeg_tools_module.FFVersion = _StubFFVersion
haffmpeg_tools_module.ImageFrame = _StubImageFrame
haffmpeg_module.core = haffmpeg_core_module
haffmpeg_module.tools = haffmpeg_tools_module
sys.modules["haffmpeg"] = haffmpeg_module
sys.modules["haffmpeg.core"] = haffmpeg_core_module
sys.modules["haffmpeg.tools"] = haffmpeg_tools_module
ensure_haffmpeg_stubs()

View File

@@ -37,7 +37,7 @@
'platform': 'smartthings',
'previous_unique_id': None,
'suggested_object_id': None,
'supported_features': <MediaPlayerEntityFeature: 24461>,
'supported_features': <MediaPlayerEntityFeature: 23949>,
'translation_key': None,
'unique_id': 'afcf3b91-0000-1111-2222-ddff2a0a6577_main',
'unit_of_measurement': None,
@@ -59,7 +59,7 @@
'HDMI2',
'digital',
]),
'supported_features': <MediaPlayerEntityFeature: 24461>,
'supported_features': <MediaPlayerEntityFeature: 23949>,
'volume_level': 0.01,
}),
'context': <ANY>,
@@ -101,7 +101,7 @@
'platform': 'smartthings',
'previous_unique_id': None,
'suggested_object_id': None,
'supported_features': <MediaPlayerEntityFeature: 318989>,
'supported_features': <MediaPlayerEntityFeature: 318477>,
'translation_key': None,
'unique_id': 'c9276e43-fe3c-88c3-1dcc-2eb79e292b8c_main',
'unit_of_measurement': None,
@@ -115,7 +115,7 @@
'is_volume_muted': False,
'repeat': <RepeatMode.OFF: 'off'>,
'shuffle': False,
'supported_features': <MediaPlayerEntityFeature: 318989>,
'supported_features': <MediaPlayerEntityFeature: 318477>,
'volume_level': 0.52,
}),
'context': <ANY>,
@@ -157,7 +157,7 @@
'platform': 'smartthings',
'previous_unique_id': None,
'suggested_object_id': None,
'supported_features': <MediaPlayerEntityFeature: 22029>,
'supported_features': <MediaPlayerEntityFeature: 21517>,
'translation_key': None,
'unique_id': 'c85fced9-c474-4a47-93c2-037cc7829536_main',
'unit_of_measurement': None,
@@ -171,7 +171,7 @@
'is_volume_muted': False,
'media_artist': 'David Guetta',
'media_title': 'Forever Young',
'supported_features': <MediaPlayerEntityFeature: 22029>,
'supported_features': <MediaPlayerEntityFeature: 21517>,
'volume_level': 0.15,
}),
'context': <ANY>,
@@ -213,7 +213,7 @@
'platform': 'smartthings',
'previous_unique_id': None,
'suggested_object_id': None,
'supported_features': <MediaPlayerEntityFeature: 22413>,
'supported_features': <MediaPlayerEntityFeature: 21901>,
'translation_key': None,
'unique_id': '0d94e5db-8501-2355-eb4f-214163702cac_main',
'unit_of_measurement': None,
@@ -228,7 +228,7 @@
'media_artist': '',
'media_title': '',
'source': 'HDMI1',
'supported_features': <MediaPlayerEntityFeature: 22413>,
'supported_features': <MediaPlayerEntityFeature: 21901>,
'volume_level': 0.17,
}),
'context': <ANY>,
@@ -270,7 +270,7 @@
'platform': 'smartthings',
'previous_unique_id': None,
'suggested_object_id': None,
'supported_features': <MediaPlayerEntityFeature: 1932>,
'supported_features': <MediaPlayerEntityFeature: 1420>,
'translation_key': None,
'unique_id': 'a75cb1e1-03fd-3c77-ca9f-d4e56c4096c6_main',
'unit_of_measurement': None,
@@ -281,7 +281,7 @@
'attributes': ReadOnlyDict({
'device_class': 'speaker',
'friendly_name': 'Soundbar',
'supported_features': <MediaPlayerEntityFeature: 1932>,
'supported_features': <MediaPlayerEntityFeature: 1420>,
}),
'context': <ANY>,
'entity_id': 'media_player.soundbar',

View File

@@ -1,531 +0,0 @@
"""Tests for SmartThings audio helper."""
from __future__ import annotations
import asyncio
import logging
from types import SimpleNamespace
from unittest.mock import AsyncMock, patch
from urllib.parse import urlsplit
import pytest
from homeassistant.components.smartthings.audio import (
FFMPEG_MAX_DURATION_SECONDS,
MAX_STORED_ENTRIES,
PCM_CHANNELS,
PCM_MIME,
PCM_SAMPLE_RATE,
PCM_SAMPLE_WIDTH,
TRANSCODE_TIMEOUT_SECONDS,
WARNING_DURATION_SECONDS,
SmartThingsAudioError,
async_get_audio_manager,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers.network import NoURLAvailableError
from tests.typing import ClientSessionGenerator
class _FakeProcess:
"""Async subprocess stand-in that provides communicate."""
def __init__(self, stdout: bytes, stderr: bytes, returncode: int) -> None:
self._stdout = stdout
self._stderr = stderr
self.returncode = returncode
self.killed = False
async def communicate(self) -> tuple[bytes, bytes]:
return self._stdout, self._stderr
def kill(self) -> None:
self.killed = True
def _build_pcm(
duration_seconds: float = 1.0,
*,
sample_rate: int = PCM_SAMPLE_RATE,
sample_width: int = PCM_SAMPLE_WIDTH,
channels: int = PCM_CHANNELS,
) -> bytes:
"""Generate silent raw PCM bytes for testing."""
frame_count = int(sample_rate * duration_seconds)
return b"\x00" * frame_count * sample_width * channels
async def test_prepare_notification_creates_url(
hass: HomeAssistant,
hass_client_no_auth: ClientSessionGenerator,
) -> None:
"""Ensure PCM proxy URLs are generated and served."""
hass.config.external_url = "https://example.com"
manager = await async_get_audio_manager(hass)
pcm_bytes = _build_pcm()
with patch.object(
manager, "_transcode_to_pcm", AsyncMock(return_value=(pcm_bytes, 1.0, False))
):
url = await manager.async_prepare_notification("https://example.com/source.mp3")
parsed = urlsplit(url)
assert parsed.path.endswith(".pcm")
assert not parsed.query
client = await hass_client_no_auth()
response = await client.get(parsed.path)
assert response.status == 200
assert response.headers["Content-Type"] == PCM_MIME
assert response.headers["Cache-Control"] == "no-store"
body = await response.read()
assert body == pcm_bytes
@pytest.mark.asyncio
async def test_prepare_notification_uses_internal_url_when_external_missing(
hass: HomeAssistant,
) -> None:
"""Fallback to the internal URL if no external URL is available."""
hass.config.external_url = None
hass.config.internal_url = "http://homeassistant.local:8123"
manager = await async_get_audio_manager(hass)
pcm_bytes = _build_pcm()
with patch.object(
manager, "_transcode_to_pcm", AsyncMock(return_value=(pcm_bytes, 1.0, False))
):
url = await manager.async_prepare_notification("https://example.com/source.mp3")
parsed = urlsplit(url)
assert parsed.scheme == "http"
assert parsed.netloc == "homeassistant.local:8123"
assert parsed.path.endswith(".pcm")
@pytest.mark.asyncio
async def test_prepare_notification_requires_accessible_url(
hass: HomeAssistant,
) -> None:
"""Fail if neither external nor internal URLs are available."""
hass.config.external_url = None
hass.config.internal_url = None
manager = await async_get_audio_manager(hass)
pcm_bytes = _build_pcm()
with (
patch.object(
manager,
"_transcode_to_pcm",
AsyncMock(return_value=(pcm_bytes, 1.0, False)),
),
patch(
"homeassistant.components.smartthings.audio.get_url",
side_effect=NoURLAvailableError,
) as mock_get_url,
pytest.raises(SmartThingsAudioError),
):
await manager.async_prepare_notification("https://example.com/source.mp3")
assert mock_get_url.called
# Stored entry should be cleaned up after failure so subsequent requests
# don't leak memory or serve stale audio.
assert not manager._entries
async def test_audio_view_returns_404_for_unknown_token(
hass: HomeAssistant,
hass_client_no_auth: ClientSessionGenerator,
) -> None:
"""Unknown tokens should return 404."""
await async_get_audio_manager(hass)
client = await hass_client_no_auth()
response = await client.get("/api/smartthings/audio/invalid-token.pcm")
assert response.status == 404
@pytest.mark.asyncio
async def test_prepare_notification_raises_when_transcode_empty(
hass: HomeAssistant,
) -> None:
"""Transcoding empty audio results in an error."""
hass.config.external_url = "https://example.com"
manager = await async_get_audio_manager(hass)
with (
patch.object(
manager, "_transcode_to_pcm", AsyncMock(return_value=(b"", 0.0, False))
),
pytest.raises(SmartThingsAudioError, match="Converted audio is empty"),
):
await manager.async_prepare_notification("https://example.com/source.mp3")
@pytest.mark.asyncio
async def test_prepare_notification_warns_when_duration_exceeds_max(
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Warn when transcoded audio exceeds the SmartThings duration limit."""
hass.config.external_url = "https://example.com"
manager = await async_get_audio_manager(hass)
pcm_bytes = b"pcm"
caplog.set_level(logging.WARNING)
with patch.object(
manager,
"_transcode_to_pcm",
AsyncMock(return_value=(pcm_bytes, FFMPEG_MAX_DURATION_SECONDS + 1.0, True)),
):
await manager.async_prepare_notification("https://example.com/source.mp3")
assert any("truncated" in record.message for record in caplog.records)
@pytest.mark.asyncio
async def test_prepare_notification_warns_when_duration_exceeds_warning(
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Warn when transcoded audio exceeds the SmartThings warning threshold."""
hass.config.external_url = "https://example.com"
manager = await async_get_audio_manager(hass)
pcm_bytes = _build_pcm(duration_seconds=WARNING_DURATION_SECONDS + 1)
caplog.set_level(logging.WARNING)
with patch.object(
manager,
"_transcode_to_pcm",
AsyncMock(return_value=(pcm_bytes, WARNING_DURATION_SECONDS + 1.0, False)),
):
await manager.async_prepare_notification("https://example.com/source.mp3")
assert any(
"playback over" in record.message and "truncated" not in record.message
for record in caplog.records
)
@pytest.mark.asyncio
async def test_prepare_notification_regenerates_token_on_collision(
hass: HomeAssistant,
) -> None:
"""Regenerate tokens when a collision is detected."""
hass.config.external_url = "https://example.com"
manager = await async_get_audio_manager(hass)
pcm_bytes = _build_pcm()
with (
patch.object(
manager,
"_transcode_to_pcm",
AsyncMock(return_value=(pcm_bytes, 1.0, False)),
),
patch(
"homeassistant.components.smartthings.audio.secrets.token_urlsafe",
side_effect=["dup", "dup", "unique"],
),
):
url1 = await manager.async_prepare_notification(
"https://example.com/source.mp3"
)
url2 = await manager.async_prepare_notification(
"https://example.com/source.mp3"
)
assert urlsplit(url1).path.endswith("/dup.pcm")
assert urlsplit(url2).path.endswith("/unique.pcm")
@pytest.mark.asyncio
async def test_prepare_notification_schedules_cleanup(
hass: HomeAssistant,
) -> None:
"""Ensure cached entries are scheduled for cleanup."""
hass.config.external_url = "https://example.com"
manager = await async_get_audio_manager(hass)
pcm_bytes = _build_pcm()
with patch.object(
manager,
"_transcode_to_pcm",
AsyncMock(return_value=(pcm_bytes, 1.0, False)),
):
await manager.async_prepare_notification("https://example.com/source.mp3")
assert manager._cleanup_handle is not None
for entry in manager._entries.values():
entry.expires = 0
manager._cleanup_callback()
assert not manager._entries
@pytest.mark.asyncio
async def test_prepare_notification_caps_entry_count(
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Ensure cached entries are capped."""
hass.config.external_url = "https://example.com"
manager = await async_get_audio_manager(hass)
pcm_bytes = _build_pcm()
caplog.set_level(logging.DEBUG)
with patch.object(
manager,
"_transcode_to_pcm",
AsyncMock(return_value=(pcm_bytes, 1.0, False)),
):
for _ in range(MAX_STORED_ENTRIES + 2):
await manager.async_prepare_notification("https://example.com/source.mp3")
assert len(manager._entries) == MAX_STORED_ENTRIES
assert any(
"Dropped oldest SmartThings audio token" in record.message
for record in caplog.records
)
@pytest.mark.asyncio
async def test_transcode_to_pcm_handles_missing_ffmpeg(
hass: HomeAssistant,
) -> None:
"""Raise friendly error when ffmpeg is unavailable."""
manager = await async_get_audio_manager(hass)
with (
patch(
"homeassistant.components.smartthings.audio.ffmpeg.get_ffmpeg_manager",
return_value=SimpleNamespace(binary="ffmpeg"),
),
patch(
"homeassistant.components.smartthings.audio.asyncio.create_subprocess_exec",
side_effect=FileNotFoundError,
),
pytest.raises(SmartThingsAudioError, match="FFmpeg is required"),
):
await manager._transcode_to_pcm("https://example.com/source.mp3")
@pytest.mark.asyncio
async def test_transcode_to_pcm_handles_process_failure(
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Raise when ffmpeg reports an error."""
manager = await async_get_audio_manager(hass)
caplog.set_level(logging.ERROR)
fake_process = _FakeProcess(stdout=b"", stderr=b"boom", returncode=1)
with (
patch(
"homeassistant.components.smartthings.audio.ffmpeg.get_ffmpeg_manager",
return_value=SimpleNamespace(binary="ffmpeg"),
),
patch(
"homeassistant.components.smartthings.audio.asyncio.create_subprocess_exec",
AsyncMock(return_value=fake_process),
),
pytest.raises(SmartThingsAudioError, match="Unable to convert"),
):
await manager._transcode_to_pcm("https://example.com/source.mp3")
assert any("FFmpeg failed" in record.message for record in caplog.records)
@pytest.mark.asyncio
async def test_transcode_to_pcm_times_out_and_kills_process(
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Kill ffmpeg when the transcode times out."""
manager = await async_get_audio_manager(hass)
fake_process = _FakeProcess(stdout=b"\x00\x00", stderr=b"", returncode=0)
caplog.set_level(logging.WARNING)
with (
patch(
"homeassistant.components.smartthings.audio.ffmpeg.get_ffmpeg_manager",
return_value=SimpleNamespace(binary="ffmpeg"),
),
patch(
"homeassistant.components.smartthings.audio.asyncio.create_subprocess_exec",
AsyncMock(return_value=fake_process),
),
patch(
"homeassistant.components.smartthings.audio.asyncio.wait_for",
side_effect=TimeoutError,
),
):
pcm, duration, truncated = await manager._transcode_to_pcm(
"https://example.com/source.mp3"
)
assert fake_process.killed is True
assert pcm == b"\x00\x00"
assert duration == pytest.approx(1 / PCM_SAMPLE_RATE)
assert truncated is False
assert any("FFmpeg timed out" in record.message for record in caplog.records)
@pytest.mark.asyncio
async def test_transcode_to_pcm_returns_empty_audio(
hass: HomeAssistant,
) -> None:
"""Return empty payload when ffmpeg produced nothing."""
manager = await async_get_audio_manager(hass)
fake_process = _FakeProcess(stdout=b"", stderr=b"", returncode=0)
with (
patch(
"homeassistant.components.smartthings.audio.ffmpeg.get_ffmpeg_manager",
return_value=SimpleNamespace(binary="ffmpeg"),
),
patch(
"homeassistant.components.smartthings.audio.asyncio.create_subprocess_exec",
AsyncMock(return_value=fake_process),
) as mock_exec,
):
pcm, duration, truncated = await manager._transcode_to_pcm(
"https://example.com/source.mp3"
)
assert pcm == b""
assert duration == 0.0
assert truncated is False
mock_exec.assert_awaited_once()
@pytest.mark.asyncio
async def test_transcode_to_pcm_enforces_duration_cap(
hass: HomeAssistant,
) -> None:
"""Ensure ffmpeg is instructed to limit duration and timeout is enforced."""
manager = await async_get_audio_manager(hass)
pcm_bytes = _build_pcm(duration_seconds=FFMPEG_MAX_DURATION_SECONDS)
fake_process = _FakeProcess(stdout=pcm_bytes, stderr=b"", returncode=0)
timeouts: list[float] = []
original_wait_for = asyncio.wait_for
async def _wait_for(awaitable, timeout):
timeouts.append(timeout)
return await original_wait_for(awaitable, timeout)
mock_exec = AsyncMock(return_value=fake_process)
with (
patch(
"homeassistant.components.smartthings.audio.ffmpeg.get_ffmpeg_manager",
return_value=SimpleNamespace(binary="ffmpeg"),
),
patch(
"homeassistant.components.smartthings.audio.asyncio.create_subprocess_exec",
mock_exec,
),
patch(
"homeassistant.components.smartthings.audio.asyncio.wait_for",
new=_wait_for,
),
):
pcm, duration, truncated = await manager._transcode_to_pcm(
"https://example.com/source.mp3"
)
command = list(mock_exec.await_args.args)
assert "-t" in command
assert command[command.index("-t") + 1] == str(FFMPEG_MAX_DURATION_SECONDS)
assert timeouts == [TRANSCODE_TIMEOUT_SECONDS]
assert pcm == pcm_bytes
assert duration == pytest.approx(FFMPEG_MAX_DURATION_SECONDS)
assert truncated is True
async def test_transcode_to_pcm_logs_misaligned_pcm(
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Log debug output when ffmpeg output contains a partial frame."""
manager = await async_get_audio_manager(hass)
caplog.set_level(logging.DEBUG)
pcm_bytes = _build_pcm() + b"\xaa"
fake_process = _FakeProcess(stdout=pcm_bytes, stderr=b"", returncode=0)
with (
patch(
"homeassistant.components.smartthings.audio.ffmpeg.get_ffmpeg_manager",
return_value=SimpleNamespace(binary="ffmpeg"),
),
patch(
"homeassistant.components.smartthings.audio.asyncio.create_subprocess_exec",
AsyncMock(return_value=fake_process),
),
):
pcm, duration, truncated = await manager._transcode_to_pcm(
"https://example.com/source.mp3"
)
assert pcm == _build_pcm()
assert duration > 0
assert truncated is False
assert any("misaligned PCM" in record.message for record in caplog.records)
@pytest.mark.asyncio
async def test_transcode_to_pcm_drops_partial_frame_payload(
hass: HomeAssistant,
caplog: pytest.LogCaptureFixture,
) -> None:
"""Drop audio entirely when ffmpeg returns fewer bytes than a full frame."""
manager = await async_get_audio_manager(hass)
caplog.set_level(logging.DEBUG)
fake_process = _FakeProcess(stdout=b"\x00", stderr=b"", returncode=0)
with (
patch(
"homeassistant.components.smartthings.audio.ffmpeg.get_ffmpeg_manager",
return_value=SimpleNamespace(binary="ffmpeg"),
),
patch(
"homeassistant.components.smartthings.audio.asyncio.create_subprocess_exec",
AsyncMock(return_value=fake_process),
),
):
pcm, duration, truncated = await manager._transcode_to_pcm(
"https://example.com/source.mp3"
)
assert pcm == b""
assert duration == 0.0
assert truncated is False
assert any("misaligned PCM" in record.message for record in caplog.records)

View File

@@ -1,7 +1,6 @@
"""Test for the SmartThings media player platform."""
from types import SimpleNamespace
from unittest.mock import AsyncMock, patch
from unittest.mock import AsyncMock
from pysmartthings import Attribute, Capability, Command, Status
from pysmartthings.models import HealthStatus
@@ -10,19 +9,14 @@ from syrupy.assertion import SnapshotAssertion
from homeassistant.components.media_player import (
ATTR_INPUT_SOURCE,
ATTR_MEDIA_CONTENT_ID,
ATTR_MEDIA_CONTENT_TYPE,
ATTR_MEDIA_REPEAT,
ATTR_MEDIA_SHUFFLE,
ATTR_MEDIA_VOLUME_LEVEL,
ATTR_MEDIA_VOLUME_MUTED,
DOMAIN as MEDIA_PLAYER_DOMAIN,
SERVICE_PLAY_MEDIA,
SERVICE_SELECT_SOURCE,
MediaType,
RepeatMode,
)
from homeassistant.components.smartthings.audio import SmartThingsAudioError
from homeassistant.components.smartthings.const import MAIN
from homeassistant.const import (
ATTR_ENTITY_ID,
@@ -45,7 +39,6 @@ from homeassistant.const import (
Platform,
)
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import entity_registry as er
from . import (
@@ -205,176 +198,6 @@ async def test_volume_down(
)
@pytest.mark.parametrize("device_fixture", ["hw_q80r_soundbar"])
async def test_play_media_notification(
hass: HomeAssistant,
devices: AsyncMock,
mock_config_entry: MockConfigEntry,
) -> None:
"""Test playing media via SmartThings audio notification."""
await setup_integration(hass, mock_config_entry)
manager = AsyncMock()
manager.async_prepare_notification.return_value = "https://example.com/audio.pcm"
with (
patch(
"homeassistant.components.smartthings.media_player.async_get_audio_manager",
AsyncMock(return_value=manager),
),
patch(
"homeassistant.components.smartthings.media_player.async_process_play_media_url",
return_value="https://example.com/source.mp3",
),
):
await hass.services.async_call(
MEDIA_PLAYER_DOMAIN,
SERVICE_PLAY_MEDIA,
{
ATTR_ENTITY_ID: "media_player.soundbar",
ATTR_MEDIA_CONTENT_TYPE: MediaType.MUSIC,
ATTR_MEDIA_CONTENT_ID: "https://example.com/source.mp3",
},
blocking=True,
)
expected_command = Command("playTrackAndResume")
devices.execute_device_command.assert_called_once_with(
"afcf3b91-0000-1111-2222-ddff2a0a6577",
Capability.AUDIO_NOTIFICATION,
expected_command,
MAIN,
argument=["https://example.com/audio.pcm"],
)
@pytest.mark.parametrize("device_fixture", ["hw_q80r_soundbar"])
async def test_play_media_requires_audio_notification_capability(
hass: HomeAssistant,
devices: AsyncMock,
mock_config_entry: MockConfigEntry,
) -> None:
"""Expect an error if the device lacks audio notification support."""
devices.get_device_status.return_value[MAIN].pop(
Capability.AUDIO_NOTIFICATION, None
)
await setup_integration(hass, mock_config_entry)
entity = hass.data["entity_components"][MEDIA_PLAYER_DOMAIN].get_entity(
"media_player.soundbar"
)
assert entity is not None
with pytest.raises(
HomeAssistantError, match="Device does not support audio notifications"
):
await entity.async_play_media(MediaType.MUSIC, "https://example.com/source.mp3")
@pytest.mark.parametrize("device_fixture", ["hw_q80r_soundbar"])
async def test_play_media_rejects_unsupported_media_type(
hass: HomeAssistant,
devices: AsyncMock,
mock_config_entry: MockConfigEntry,
) -> None:
"""Unsupported media types should raise an error."""
await setup_integration(hass, mock_config_entry)
entity = hass.data["entity_components"][MEDIA_PLAYER_DOMAIN].get_entity(
"media_player.soundbar"
)
assert entity is not None
with pytest.raises(
HomeAssistantError, match="Unsupported media type for SmartThings audio"
):
await entity.async_play_media(
MediaType.TVSHOW, "https://example.com/source.mp3"
)
@pytest.mark.parametrize("device_fixture", ["hw_q80r_soundbar"])
async def test_play_media_uses_media_source_resolution(
hass: HomeAssistant,
devices: AsyncMock,
mock_config_entry: MockConfigEntry,
) -> None:
"""Media source IDs are resolved and processed before playback."""
await setup_integration(hass, mock_config_entry)
manager = AsyncMock()
manager.async_prepare_notification.return_value = "https://example.com/audio.pcm"
with (
patch(
"homeassistant.components.smartthings.media_player.async_get_audio_manager",
AsyncMock(return_value=manager),
),
patch(
"homeassistant.components.smartthings.media_player.async_process_play_media_url",
return_value="https://example.com/processed.mp3",
) as mock_process,
patch(
"homeassistant.components.smartthings.media_player.media_source.is_media_source_id",
return_value=True,
) as mock_is_media,
patch(
"homeassistant.components.smartthings.media_player.media_source.async_resolve_media",
AsyncMock(
return_value=SimpleNamespace(url="https://example.com/from_source")
),
) as mock_resolve,
):
entity = hass.data["entity_components"][MEDIA_PLAYER_DOMAIN].get_entity(
"media_player.soundbar"
)
assert entity is not None
await entity.async_play_media(MediaType.MUSIC, "media-source://foo")
mock_is_media.assert_called_once()
mock_resolve.assert_called_once()
mock_process.assert_called_with(hass, "https://example.com/from_source")
devices.execute_device_command.assert_called_once()
@pytest.mark.parametrize("device_fixture", ["hw_q80r_soundbar"])
async def test_play_media_wraps_audio_errors(
hass: HomeAssistant,
devices: AsyncMock,
mock_config_entry: MockConfigEntry,
) -> None:
"""SmartThings audio errors propagate as HomeAssistantError."""
await setup_integration(hass, mock_config_entry)
manager = AsyncMock()
manager.async_prepare_notification.side_effect = SmartThingsAudioError("boom")
entity = hass.data["entity_components"][MEDIA_PLAYER_DOMAIN].get_entity(
"media_player.soundbar"
)
assert entity is not None
with (
patch(
"homeassistant.components.smartthings.media_player.async_get_audio_manager",
AsyncMock(return_value=manager),
),
patch(
"homeassistant.components.smartthings.media_player.async_process_play_media_url",
return_value="https://example.com/source.mp3",
),
pytest.raises(HomeAssistantError, match="boom"),
):
await entity.async_play_media(MediaType.MUSIC, "https://example.com/source.mp3")
@pytest.mark.parametrize("device_fixture", ["hw_q80r_soundbar"])
async def test_media_play(
hass: HomeAssistant,