Compare commits

..

4 Commits

Author SHA1 Message Date
Paul Bottein dd86e84470 Fix comments and bump lib 2026-05-27 10:49:39 +02:00
Paul Bottein b9f98274af Improve tests 2026-05-27 10:16:54 +02:00
Paul Bottein 02c7582fb6 Fix media type 2026-05-27 10:08:26 +02:00
Paul Bottein 00ebccf168 Add browse and play media support to Yoto 2026-05-27 00:20:42 +02:00
22 changed files with 768 additions and 729 deletions
+18 -52
View File
@@ -5,12 +5,8 @@ from typing import Any
import voluptuous as vol
from homeassistant.components import labs, websocket_api
from homeassistant.components.hassio import HassioNotReadyError
from homeassistant.config_entries import SOURCE_SYSTEM, ConfigEntry
from homeassistant.core import HomeAssistant, callback
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.helpers import discovery_flow
from homeassistant.helpers.start import async_at_started
from homeassistant.const import EVENT_HOMEASSISTANT_STARTED
from homeassistant.core import Event, HomeAssistant, callback
from homeassistant.helpers.typing import ConfigType
from homeassistant.util.hass_dict import HassKey
@@ -53,7 +49,6 @@ CONFIG_SCHEMA = vol.Schema(
)
DATA_COMPONENT: HassKey[Analytics] = HassKey(DOMAIN)
_DATA_SNAPSHOTS_URL: HassKey[str | None] = HassKey(f"{DOMAIN}_snapshots_url")
LABS_SNAPSHOT_FEATURE = "snapshots"
@@ -62,39 +57,18 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the analytics integration."""
analytics_config = config.get(DOMAIN, {})
snapshots_url: str | None = None
if CONF_SNAPSHOTS_URL in analytics_config:
await labs.async_update_preview_feature(
hass, DOMAIN, LABS_SNAPSHOT_FEATURE, enabled=True
)
snapshots_url = analytics_config[CONF_SNAPSHOTS_URL]
else:
snapshots_url = None
hass.data[_DATA_SNAPSHOTS_URL] = snapshots_url
discovery_flow.async_create_flow(
hass, DOMAIN, context={"source": SOURCE_SYSTEM}, data={}
)
websocket_api.async_register_command(hass, websocket_analytics)
websocket_api.async_register_command(hass, websocket_analytics_preferences)
hass.http.register_view(AnalyticsDevicesView)
return True
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up Analytics from a config entry."""
snapshots_url = hass.data.get(_DATA_SNAPSHOTS_URL)
analytics = Analytics(hass, snapshots_url)
try:
await analytics.load()
except HassioNotReadyError as err:
raise ConfigEntryNotReady(
translation_domain=DOMAIN,
translation_key="supervisor_not_ready",
) from err
# Load stored data
await analytics.load()
started = False
@@ -106,30 +80,26 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
if started:
await analytics.async_schedule()
async def start_schedule(hass: HomeAssistant) -> None:
"""Start the send schedule once Home Assistant has started."""
async def start_schedule(_event: Event) -> None:
"""Start the send schedule after the started event."""
nonlocal started
started = True
await analytics.async_schedule()
entry.async_on_unload(
labs.async_subscribe_preview_feature(
hass, DOMAIN, LABS_SNAPSHOT_FEATURE, _async_handle_labs_update
)
labs.async_subscribe_preview_feature(
hass, DOMAIN, LABS_SNAPSHOT_FEATURE, _async_handle_labs_update
)
entry.async_on_unload(async_at_started(hass, start_schedule))
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STARTED, start_schedule)
websocket_api.async_register_command(hass, websocket_analytics)
websocket_api.async_register_command(hass, websocket_analytics_preferences)
hass.http.register_view(AnalyticsDevicesView)
hass.data[DATA_COMPONENT] = analytics
return True
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload an Analytics config entry."""
analytics = hass.data.pop(DATA_COMPONENT)
analytics.cancel_scheduled()
return True
@callback
@websocket_api.require_admin
@websocket_api.websocket_command({vol.Required("type"): "analytics"})
@@ -139,9 +109,7 @@ def websocket_analytics(
msg: dict[str, Any],
) -> None:
"""Return analytics preferences."""
if (analytics := hass.data.get(DATA_COMPONENT)) is None:
connection.send_error(msg["id"], websocket_api.ERR_NOT_FOUND, "Not loaded")
return
analytics = hass.data[DATA_COMPONENT]
connection.send_result(
msg["id"],
{ATTR_PREFERENCES: analytics.preferences, ATTR_ONBOARDED: analytics.onboarded},
@@ -162,10 +130,8 @@ async def websocket_analytics_preferences(
msg: dict[str, Any],
) -> None:
"""Update analytics preferences."""
if (analytics := hass.data.get(DATA_COMPONENT)) is None:
connection.send_error(msg["id"], websocket_api.ERR_NOT_FOUND, "Not loaded")
return
preferences = msg[ATTR_PREFERENCES]
analytics = hass.data[DATA_COMPONENT]
await analytics.save_preferences(preferences)
await analytics.async_schedule()
+10 -16
View File
@@ -299,8 +299,12 @@ class Analytics:
self._data = AnalyticsData.from_dict(stored)
if self.supervisor and not self.onboarded:
# This may raise HassioNotReadyError if Supervisor was unreachable.
# The caller is responsible for handling this and triggering a retry.
# This may raise HassioNotReadyError if Supervisor was unreachable
# during setup of the Supervisor integration. That will fail setup
# of this integration. However there is no better option at this time
# since we need to get the diagnostic setting from Supervisor to correctly
# setup this integration and we can't raise ConfigEntryNotReady to
# trigger a retry from async_setup.
supervisor_info = hassio.get_supervisor_info(self._hass)
# User have not configured analytics, get this setting from the supervisor
@@ -345,10 +349,10 @@ class Analytics:
await self._save()
if self.supervisor:
# Try to pull Supervisor information, but don't fail if some or all
# of it is unavailable due to setup failures in the hassio integration.
with contextlib.suppress(hassio.HassioNotReadyError):
supervisor_info = hassio.get_supervisor_info(hass)
# get_supervisor_info was called during setup so we can't get here
# if it raised. The others may raise HassioNotReadyError if only some
# data was successfully fetched from Supervisor
supervisor_info = hassio.get_supervisor_info(hass)
with contextlib.suppress(hassio.HassioNotReadyError):
operating_system_info = hassio.get_os_info(hass)
with contextlib.suppress(hassio.HassioNotReadyError):
@@ -626,16 +630,6 @@ class Analytics:
err,
)
@callback
def cancel_scheduled(self) -> None:
"""Cancel all scheduled analytics tasks."""
if self._basic_scheduled is not None:
self._basic_scheduled()
self._basic_scheduled = None
if self._snapshot_scheduled is not None:
self._snapshot_scheduled()
self._snapshot_scheduled = None
async def async_schedule(self) -> None:
"""Schedule analytics."""
if not self.onboarded:
@@ -1,19 +0,0 @@
"""Config flow for Analytics integration."""
from typing import Any
from homeassistant.config_entries import ConfigFlow, ConfigFlowResult
from .const import DOMAIN
class AnalyticsConfigFlow(ConfigFlow, domain=DOMAIN):
"""Handle a config flow for Analytics."""
VERSION = 1
async def async_step_system(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle the initial step."""
return self.async_create_entry(title="Analytics", data={})
@@ -3,7 +3,6 @@
"name": "Analytics",
"after_dependencies": ["energy", "hassio", "recorder"],
"codeowners": ["@home-assistant/core"],
"config_flow": true,
"dependencies": ["api", "websocket_api", "http"],
"documentation": "https://www.home-assistant.io/integrations/analytics",
"integration_type": "system",
@@ -15,6 +14,5 @@
"report_issue_url": "https://github.com/OHF-Device-Database/device-database/issues/new"
}
},
"quality_scale": "internal",
"single_config_entry": true
"quality_scale": "internal"
}
@@ -1,9 +1,4 @@
{
"exceptions": {
"supervisor_not_ready": {
"message": "Supervisor was not ready during setup, will retry"
}
},
"preview_features": {
"snapshots": {
"description": "We're creating the [Open Home Foundation Device Database](https://www.home-assistant.io/blog/2026/02/02/about-device-database/): a free, open source community-powered resource to help users find practical information about how smart home devices perform in real installations.\n\nYou can help us build it by opting in to share anonymized data about your devices. This data will only ever include device-specific details (like model or manufacturer) never personally identifying information (like the names you assign).\n\nFind out how we process your data (should you choose to contribute) in our [Data Use Statement](https://www.openhomefoundation.org/device-database-data-use-statement).",
+1 -17
View File
@@ -15,7 +15,6 @@ from homeassistant.helpers.service_info.dhcp import DhcpServiceInfo
from homeassistant.helpers.service_info.zeroconf import ZeroconfServiceInfo
from .const import DOMAIN
from .validation import UnsupportedBoardError, async_get_supported_board_info
_LOGGER = logging.getLogger(__name__)
@@ -44,11 +43,6 @@ class DucoConfigFlow(ConfigFlow, domain=DOMAIN):
try:
box_name, _ = await self._validate_input(discovery_info.ip)
except UnsupportedBoardError:
_LOGGER.debug(
"Unsupported Duco board discovered via DHCP at %s", discovery_info.ip
)
return self.async_abort(reason="unsupported_board")
except DucoConnectionError:
return self.async_abort(reason="cannot_connect")
except DucoError:
@@ -67,12 +61,6 @@ class DucoConfigFlow(ConfigFlow, domain=DOMAIN):
"""Handle zeroconf discovery."""
try:
box_name, mac = await self._validate_input(discovery_info.host)
except UnsupportedBoardError:
_LOGGER.debug(
"Unsupported Duco board discovered via zeroconf at %s",
discovery_info.host,
)
return self.async_abort(reason="unsupported_board")
except DucoConnectionError:
return self.async_abort(reason="cannot_connect")
except DucoError:
@@ -114,8 +102,6 @@ class DucoConfigFlow(ConfigFlow, domain=DOMAIN):
if user_input is not None:
try:
box_name, mac = await self._validate_input(user_input[CONF_HOST])
except UnsupportedBoardError:
errors["base"] = "unsupported_board"
except DucoConnectionError:
errors["base"] = "cannot_connect"
except DucoError:
@@ -147,8 +133,6 @@ class DucoConfigFlow(ConfigFlow, domain=DOMAIN):
if user_input is not None:
try:
box_name, mac = await self._validate_input(user_input[CONF_HOST])
except UnsupportedBoardError:
errors["base"] = "unsupported_board"
except DucoConnectionError:
errors["base"] = "cannot_connect"
except DucoError:
@@ -178,6 +162,6 @@ class DucoConfigFlow(ConfigFlow, domain=DOMAIN):
session=async_get_clientsession(self.hass),
host=host,
)
board_info = await async_get_supported_board_info(client)
board_info = await client.async_get_board_info()
lan_info = await client.async_get_lan_info()
return board_info.box_name, lan_info.mac
+16 -18
View File
@@ -4,11 +4,7 @@ from dataclasses import dataclass
import logging
from duco_connectivity import DucoClient
from duco_connectivity.exceptions import (
DucoConnectionError,
DucoError,
DucoResponseError,
)
from duco_connectivity.exceptions import DucoConnectionError, DucoError
from duco_connectivity.models import BoardInfo, Node
from homeassistant.config_entries import ConfigEntry
@@ -17,7 +13,6 @@ from homeassistant.exceptions import ConfigEntryError
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import DOMAIN, SCAN_INTERVAL
from .validation import UnsupportedBoardError, async_get_supported_board_info
_LOGGER = logging.getLogger(__name__)
@@ -57,18 +52,7 @@ class DucoCoordinator(DataUpdateCoordinator[DucoData]):
async def _async_setup(self) -> None:
"""Fetch board info once during initial setup."""
try:
self.board_info = await async_get_supported_board_info(self.client)
except UnsupportedBoardError as err:
raise ConfigEntryError(
translation_domain=DOMAIN,
translation_key="unsupported_board",
) from err
except DucoResponseError as err:
raise ConfigEntryError(
translation_domain=DOMAIN,
translation_key="api_error",
translation_placeholders={"error": repr(err)},
) from err
self.board_info = await self.client.async_get_board_info()
except DucoConnectionError as err:
raise UpdateFailed(
translation_domain=DOMAIN,
@@ -86,6 +70,20 @@ class DucoCoordinator(DataUpdateCoordinator[DucoData]):
"""Fetch node data from the Duco box."""
try:
nodes = await self.client.async_get_nodes()
except DucoConnectionError as err:
raise UpdateFailed(
translation_domain=DOMAIN,
translation_key="cannot_connect",
translation_placeholders={"error": repr(err)},
) from err
except DucoError as err:
raise UpdateFailed(
translation_domain=DOMAIN,
translation_key="api_error",
translation_placeholders={"error": repr(err)},
) from err
try:
lan_info = await self.client.async_get_lan_info()
except DucoConnectionError as err:
raise UpdateFailed(
+2 -7
View File
@@ -6,13 +6,11 @@
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
"reconfigure_successful": "[%key:common::config_flow::abort::reconfigure_successful%]",
"unique_id_mismatch": "The device you entered belongs to a different Duco box.",
"unknown": "[%key:common::config_flow::error::unknown%]",
"unsupported_board": "This Duco system is not supported by this integration. The integration requires a Duco Connectivity Board running public API 2.1 or newer."
"unknown": "[%key:common::config_flow::error::unknown%]"
},
"error": {
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
"unknown": "[%key:common::config_flow::error::unknown%]",
"unsupported_board": "[%key:component::duco::config::abort::unsupported_board%]"
"unknown": "[%key:common::config_flow::error::unknown%]"
},
"step": {
"discovery_confirm": {
@@ -100,9 +98,6 @@
},
"rate_limit_exceeded": {
"message": "The Duco device has reached its daily write limit. Try again tomorrow."
},
"unsupported_board": {
"message": "[%key:component::duco::config::abort::unsupported_board%]"
}
},
"system_health": {
@@ -1,58 +0,0 @@
"""Validation helpers for supported Duco systems."""
from awesomeversion import (
AwesomeVersion,
AwesomeVersionStrategy,
AwesomeVersionStrategyException,
)
from duco_connectivity import DucoClient
from duco_connectivity.exceptions import DucoResponseError
from duco_connectivity.models import BoardInfo
# Newer Connectivity boards expose /info with PublicApiVersion. We use that
# endpoint to distinguish supported Connectivity hardware from older
# Communication board V1 hardware.
_MIN_PUBLIC_API_VERSION = AwesomeVersion(
"2.1", ensure_strategy=AwesomeVersionStrategy.SIMPLEVER
)
class UnsupportedBoardError(Exception):
"""Raised when the Duco system is not supported by this integration."""
def validate_board_support(board_info: BoardInfo) -> None:
"""Raise UnsupportedBoardError if the board does not meet support requirements."""
version = board_info.public_api_version
if version is None:
raise UnsupportedBoardError("Board did not report a public API version")
try:
parsed_version = AwesomeVersion(
version, ensure_strategy=AwesomeVersionStrategy.SIMPLEVER
)
except AwesomeVersionStrategyException as err:
raise UnsupportedBoardError(
f"Board reported malformed public API version: {version}"
) from err
if parsed_version < _MIN_PUBLIC_API_VERSION:
raise UnsupportedBoardError(
"Board public API version "
f"{version} is below the supported minimum {_MIN_PUBLIC_API_VERSION}"
)
async def async_get_supported_board_info(client: DucoClient) -> BoardInfo:
"""Fetch and validate board info for a supported Duco system."""
try:
board_info = await client.async_get_board_info()
except DucoResponseError as err:
if err.status == 404:
# Duco indicated that Communication board V1 does not implement
# /info, so a 404 is enough to treat the device as unsupported.
raise UnsupportedBoardError(
"Board does not expose the /info endpoint"
) from err
raise
validate_board_support(board_info)
return board_info
+1 -1
View File
@@ -10,5 +10,5 @@
"iot_class": "cloud_push",
"loggers": ["yoto_api"],
"quality_scale": "bronze",
"requirements": ["yoto-api==3.1.3"]
"requirements": ["yoto-api==3.1.4"]
}
+256 -2
View File
@@ -4,22 +4,28 @@ from collections.abc import Awaitable, Callable
from datetime import datetime
from typing import Any
from yoto_api import Card, PlaybackStatus, YotoError, YotoPlayer
from yoto_api import Card, Chapter, PlaybackStatus, Track, YotoError, YotoPlayer
from homeassistant.components.media_player import (
BrowseError,
BrowseMedia,
MediaClass,
MediaPlayerDeviceClass,
MediaPlayerEntity,
MediaPlayerEntityFeature,
MediaPlayerState,
MediaType,
)
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.exceptions import HomeAssistantError, ServiceValidationError
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from .const import DOMAIN
from .coordinator import YotoConfigEntry, YotoDataUpdateCoordinator
from .entity import YotoEntity
URI_SCHEME = "yoto"
PARALLEL_UPDATES = 0
# Yoto players expose 16 hardware volume steps.
@@ -56,6 +62,8 @@ class YotoMediaPlayer(YotoEntity, MediaPlayerEntity):
MediaPlayerEntityFeature.PLAY
| MediaPlayerEntityFeature.PAUSE
| MediaPlayerEntityFeature.STOP
| MediaPlayerEntityFeature.PLAY_MEDIA
| MediaPlayerEntityFeature.BROWSE_MEDIA
| MediaPlayerEntityFeature.VOLUME_SET
| MediaPlayerEntityFeature.VOLUME_STEP
| MediaPlayerEntityFeature.PREVIOUS_TRACK
@@ -169,6 +177,220 @@ class YotoMediaPlayer(YotoEntity, MediaPlayerEntity):
"""Skip to the previous track on the active card."""
await self._async_run(self.coordinator.client.previous_track, self._player_id)
async def async_play_media(
self, media_type: MediaType | str, media_id: str, **kwargs: Any
) -> None:
"""Play a Yoto card, chapter, or track from the browse tree."""
try:
card_id, chapter_key, track_key = _parse_uri(media_id)
except ValueError as err:
raise ServiceValidationError(
translation_domain=DOMAIN,
translation_key="invalid_media_id",
translation_placeholders={"media_id": media_id},
) from err
client = self.coordinator.client
card = client.library.get(card_id)
if card is None:
raise ServiceValidationError(
translation_domain=DOMAIN,
translation_key="unknown_card",
translation_placeholders={"card_id": card_id},
)
if chapter_key is not None:
# Library list may not include chapters yet; fetch detail on demand.
if not card.chapters:
try:
await client.update_card_detail(card_id)
except YotoError as err:
raise HomeAssistantError(
translation_domain=DOMAIN,
translation_key="card_detail_failed",
translation_placeholders={"error": str(err)},
) from err
chapter = card.chapters.get(chapter_key)
if chapter is None:
raise ServiceValidationError(
translation_domain=DOMAIN,
translation_key="unknown_chapter",
translation_placeholders={
"chapter_key": chapter_key,
"card_id": card_id,
},
)
if track_key is not None and track_key not in chapter.tracks:
raise ServiceValidationError(
translation_domain=DOMAIN,
translation_key="unknown_track",
translation_placeholders={
"track_key": track_key,
"card_id": card_id,
},
)
# A chapter plays from its first track.
if track_key is None and chapter.tracks:
track_key = next(iter(chapter.tracks))
# Chapter/track plays start at 0; a card play keeps its resume point.
seconds_in = 0 if track_key is not None else None
try:
await client.play_card(
self._player_id,
card_id,
chapter_key=chapter_key,
track_key=track_key,
seconds_in=seconds_in,
)
except YotoError as err:
raise HomeAssistantError(
translation_domain=DOMAIN,
translation_key="play_failed",
translation_placeholders={"error": str(err)},
) from err
async def async_browse_media(
self,
media_content_type: MediaType | str | None = None,
media_content_id: str | None = None,
) -> BrowseMedia:
"""Browse the Yoto card library."""
if not media_content_id:
return self._browse_root()
try:
card_id, chapter_key, _ = _parse_uri(media_content_id)
except ValueError as err:
raise BrowseError(
translation_domain=DOMAIN,
translation_key="invalid_media_id",
translation_placeholders={"media_id": media_content_id},
) from err
card = self.coordinator.client.library.get(card_id)
if card is None:
raise BrowseError(
translation_domain=DOMAIN,
translation_key="unknown_card",
translation_placeholders={"card_id": card_id},
)
if not card.chapters:
try:
await self.coordinator.client.update_card_detail(card_id)
except YotoError as err:
raise BrowseError(
translation_domain=DOMAIN,
translation_key="card_detail_failed",
translation_placeholders={"error": str(err)},
) from err
if chapter_key is not None:
chapter = card.chapters.get(chapter_key)
if chapter is None:
raise BrowseError(
translation_domain=DOMAIN,
translation_key="unknown_chapter",
translation_placeholders={
"chapter_key": chapter_key,
"card_id": card_id,
},
)
return self._browse_chapter(card_id, chapter_key, chapter)
return self._browse_card(card)
def _browse_root(self) -> BrowseMedia:
"""List every card in the user's library."""
return BrowseMedia(
media_class=MediaClass.DIRECTORY,
media_content_id="",
media_content_type=MediaType.MUSIC,
title="Yoto library",
can_play=False,
can_expand=True,
children=[
self._card_node(card)
for card in self.coordinator.client.library.values()
],
children_media_class=MediaClass.ALBUM,
)
def _browse_card(self, card: Card) -> BrowseMedia:
"""List a card's chapters, collapsing single-chapter cards to tracks."""
chapters = card.chapters
# Single-chapter cards expand straight to tracks (skip a one-item level).
if len(chapters) == 1:
chapter_key, chapter = next(iter(chapters.items()))
children = [
self._track_node(card.id, chapter_key, track_key, track)
for track_key, track in chapter.tracks.items()
]
else:
children = [
self._chapter_node(card.id, chapter_key, chapter)
for chapter_key, chapter in chapters.items()
]
node = self._card_node(card)
node.children = children
return node
def _browse_chapter(
self, card_id: str, chapter_key: str, chapter: Chapter
) -> BrowseMedia:
"""List the tracks of a chapter."""
node = self._chapter_node(card_id, chapter_key, chapter)
node.can_expand = True
node.children = [
self._track_node(card_id, chapter_key, track_key, track)
for track_key, track in chapter.tracks.items()
]
return node
def _card_node(self, card: Card) -> BrowseMedia:
"""Build a browse node for a card."""
# MUSIC (not ALBUM) so children render in list view with thumbnails.
return BrowseMedia(
media_class=MediaClass.MUSIC,
media_content_id=_build_uri(card.id),
media_content_type=MediaType.MUSIC,
title=card.title or card.id,
can_play=True,
can_expand=True,
thumbnail=card.cover_image_large,
)
def _chapter_node(
self, card_id: str, chapter_key: str, chapter: Chapter
) -> BrowseMedia:
"""Build a browse node for a chapter."""
# Single-track chapters aren't expandable: click plays the track.
return BrowseMedia(
media_class=MediaClass.MUSIC,
media_content_id=_build_uri(card_id, chapter_key),
media_content_type=MediaType.MUSIC,
title=chapter.title or chapter_key,
can_play=True,
can_expand=len(chapter.tracks) > 1,
thumbnail=chapter.icon,
)
def _track_node(
self, card_id: str, chapter_key: str, track_key: str, track: Track
) -> BrowseMedia:
"""Build a browse node for a track."""
return BrowseMedia(
media_class=MediaClass.MUSIC,
media_content_id=_build_uri(card_id, chapter_key, track_key),
media_content_type=MediaType.MUSIC,
title=track.title or track_key,
can_play=True,
can_expand=False,
thumbnail=track.icon,
)
async def _async_run(
self, func: Callable[..., Awaitable[Any]], /, *args: Any
) -> None:
@@ -181,3 +403,35 @@ class YotoMediaPlayer(YotoEntity, MediaPlayerEntity):
translation_key="command_failed",
translation_placeholders={"error": str(err)},
) from err
def _build_uri(
card_id: str,
chapter_key: str | None = None,
track_key: str | None = None,
) -> str:
"""Build a yoto:// URI from card/chapter/track parts."""
segments = [card_id]
if chapter_key is not None:
segments.append(chapter_key)
if track_key is not None:
segments.append(track_key)
return f"{URI_SCHEME}://{'/'.join(segments)}"
def _parse_uri(media_id: str) -> tuple[str, str | None, str | None]:
"""Parse a yoto:// URI into card/chapter/track parts.
Parsed manually because URL parsers lower-case the authority and Yoto
IDs are case-sensitive.
"""
prefix = f"{URI_SCHEME}://"
if not media_id.startswith(prefix):
raise ValueError(f"Not a Yoto media identifier: {media_id}")
parts = [segment for segment in media_id[len(prefix) :].split("/") if segment]
if not parts or len(parts) > 3:
raise ValueError(f"Not a Yoto media identifier: {media_id}")
card_id = parts[0]
chapter_key = parts[1] if len(parts) > 1 else None
track_key = parts[2] if len(parts) > 2 else None
return card_id, chapter_key, track_key
@@ -31,12 +31,30 @@
}
},
"exceptions": {
"card_detail_failed": {
"message": "Could not load Yoto card details: {error}"
},
"command_failed": {
"message": "Yoto command failed: {error}"
},
"invalid_media_id": {
"message": "Not a Yoto media identifier: {media_id}"
},
"oauth2_implementation_unavailable": {
"message": "[%key:common::exceptions::oauth2_implementation_unavailable::message%]"
},
"play_failed": {
"message": "Failed to play Yoto media: {error}"
},
"unknown_card": {
"message": "Unknown Yoto card: {card_id}"
},
"unknown_chapter": {
"message": "Unknown chapter {chapter_key} on card {card_id}"
},
"unknown_track": {
"message": "Unknown track {track_key} on card {card_id}"
},
"update_error": {
"message": "Error communicating with Yoto: {error}"
}
-1
View File
@@ -59,7 +59,6 @@ FLOWS = {
"amberelectric",
"ambient_network",
"ambient_station",
"analytics",
"analytics_insights",
"android_ip_webcam",
"androidtv",
+1 -1
View File
@@ -3411,7 +3411,7 @@ yeelightsunflower==0.0.10
yolink-api==0.6.5
# homeassistant.components.yoto
yoto-api==3.1.3
yoto-api==3.1.4
# homeassistant.components.youless
youless-api==2.2.0
+1 -151
View File
@@ -6,18 +6,15 @@ from unittest.mock import patch
import pytest
from homeassistant.components.analytics import CONF_SNAPSHOTS_URL, LABS_SNAPSHOT_FEATURE
from homeassistant.components.analytics import LABS_SNAPSHOT_FEATURE
from homeassistant.components.analytics.const import (
BASIC_ENDPOINT_URL,
BASIC_ENDPOINT_URL_DEV,
DOMAIN,
SNAPSHOT_DEFAULT_URL,
SNAPSHOT_URL_PATH,
STORAGE_KEY,
)
from homeassistant.components.hassio import HassioNotReadyError
from homeassistant.components.labs import async_update_preview_feature
from homeassistant.config_entries import ConfigEntryState
from homeassistant.const import EVENT_HOMEASSISTANT_STARTED
from homeassistant.core import HomeAssistant
from homeassistant.setup import async_setup_component
@@ -40,153 +37,6 @@ async def test_setup(hass: HomeAssistant) -> None:
assert DOMAIN in hass.data
async def test_setup_with_snapshots_url(
hass: HomeAssistant,
hass_storage: dict[str, Any],
hass_ws_client: WebSocketGenerator,
aioclient_mock: AiohttpClientMocker,
) -> None:
"""Test setup with snapshots_url in YAML config sends snapshots to that URL."""
custom_url = "https://custom-snapshot-endpoint.example.com"
snapshot_endpoint = custom_url + SNAPSHOT_URL_PATH
aioclient_mock.post(snapshot_endpoint, status=200, json={})
with patch(
"homeassistant.components.analytics.analytics._async_snapshot_payload",
return_value={"mock": {}},
):
assert await async_setup_component(hass, "labs", {})
assert await async_setup_component(
hass, DOMAIN, {DOMAIN: {CONF_SNAPSHOTS_URL: custom_url}}
)
await hass.async_block_till_done()
ws_client = await hass_ws_client(hass)
await ws_client.send_json_auto_id(
{"type": "analytics/preferences", "preferences": {"snapshots": True}}
)
assert (await ws_client.receive_json())["success"]
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(hours=25))
await hass.async_block_till_done()
assert any(str(call[1]) == snapshot_endpoint for call in aioclient_mock.mock_calls)
async def test_setup_entry_supervisor_not_ready(hass: HomeAssistant) -> None:
"""Test that HassioNotReadyError raises ConfigEntryNotReady."""
with (
patch(
"homeassistant.components.analytics.analytics.is_hassio",
return_value=True,
),
patch(
"homeassistant.components.hassio.get_supervisor_info",
side_effect=HassioNotReadyError,
),
):
assert await async_setup_component(hass, DOMAIN, {DOMAIN: {}})
await hass.async_block_till_done()
entry = hass.config_entries.async_entries(DOMAIN)[0]
assert entry.state is ConfigEntryState.SETUP_RETRY
async def test_schedule_starts_and_sends_analytics(
hass: HomeAssistant,
hass_storage: dict[str, Any],
hass_ws_client: WebSocketGenerator,
aioclient_mock: AiohttpClientMocker,
) -> None:
"""Test that the analytics schedule fires and sends analytics after time travel."""
aioclient_mock.post(BASIC_ENDPOINT_URL, status=200)
aioclient_mock.post(BASIC_ENDPOINT_URL_DEV, status=200)
assert await async_setup_component(hass, DOMAIN, {DOMAIN: {}})
await hass.async_block_till_done()
ws_client = await hass_ws_client(hass)
with patch("homeassistant.components.analytics.analytics.HA_VERSION", MOCK_VERSION):
await ws_client.send_json_auto_id(
{"type": "analytics/preferences", "preferences": {"base": True}}
)
assert (await ws_client.receive_json())["success"]
assert len(aioclient_mock.mock_calls) == 0
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=901))
await hass.async_block_till_done()
assert len(aioclient_mock.mock_calls) == 1
async def test_unload_entry(
hass: HomeAssistant,
hass_storage: dict[str, Any],
hass_ws_client: WebSocketGenerator,
aioclient_mock: AiohttpClientMocker,
) -> None:
"""Test that unloading the config entry stops the analytics schedule."""
aioclient_mock.post(BASIC_ENDPOINT_URL, status=200)
aioclient_mock.post(BASIC_ENDPOINT_URL_DEV, status=200)
assert await async_setup_component(hass, DOMAIN, {DOMAIN: {}})
await hass.async_block_till_done()
ws_client = await hass_ws_client(hass)
with patch("homeassistant.components.analytics.analytics.HA_VERSION", MOCK_VERSION):
await ws_client.send_json_auto_id(
{"type": "analytics/preferences", "preferences": {"base": True}}
)
await ws_client.receive_json()
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(seconds=901))
await hass.async_block_till_done()
assert len(aioclient_mock.mock_calls) == 1
entry = hass.config_entries.async_entries(DOMAIN)[0]
await hass.config_entries.async_unload(entry.entry_id)
await hass.async_block_till_done()
async_fire_time_changed(hass, dt_util.utcnow() + timedelta(days=2))
await hass.async_block_till_done()
assert len(aioclient_mock.mock_calls) == 1
@pytest.mark.parametrize(
("ws_type", "ws_options"),
[("analytics", {}), ("analytics/preferences", {"preferences": {"base": True}})],
)
async def test_websocket_not_loaded(
hass: HomeAssistant,
hass_ws_client: WebSocketGenerator,
ws_type: str,
ws_options: dict[str, Any],
) -> None:
"""Test websocket returns error when analytics entry failed to load."""
with (
patch(
"homeassistant.components.analytics.analytics.is_hassio",
return_value=True,
),
patch(
"homeassistant.components.hassio.get_supervisor_info",
side_effect=HassioNotReadyError,
),
):
assert await async_setup_component(hass, DOMAIN, {DOMAIN: {}})
await hass.async_block_till_done()
ws_client = await hass_ws_client(hass)
await ws_client.send_json_auto_id({"type": ws_type} | ws_options)
response = await ws_client.receive_json()
assert not response["success"]
assert response["error"]["code"] == "not_found"
@pytest.mark.usefixtures("mock_snapshot_payload")
async def test_labs_feature_toggle(
hass: HomeAssistant,
-42
View File
@@ -30,48 +30,6 @@ TEST_MAC = "aa:bb:cc:dd:ee:ff"
USER_INPUT = {CONF_HOST: TEST_HOST}
UNSUPPORTED_BOARD_INFOS = [
pytest.param(
BoardInfo(
box_name="SILENT_CONNECT",
box_sub_type_name="Eu",
serial_board_box="ABC123",
serial_board_comm="DEF456",
serial_duco_box="GHI789",
serial_duco_comm="JKL012",
time=1700000000,
public_api_version="2.0",
),
id="version-too-low",
),
pytest.param(
BoardInfo(
box_name="SILENT_CONNECT",
box_sub_type_name="Eu",
serial_board_box="ABC123",
serial_board_comm="DEF456",
serial_duco_box="GHI789",
serial_duco_comm="JKL012",
time=1700000000,
public_api_version=None,
),
id="missing-version",
),
pytest.param(
BoardInfo(
box_name="SILENT_CONNECT",
box_sub_type_name="Eu",
serial_board_box="ABC123",
serial_board_comm="DEF456",
serial_duco_box="GHI789",
serial_duco_comm="JKL012",
time=1700000000,
public_api_version="2.1.0-beta",
),
id="malformed-version",
),
]
def _node_from_dict(data: dict[str, Any]) -> Node:
"""Convert a node fixture payload into a Duco node model."""
+2 -250
View File
@@ -3,13 +3,7 @@
from ipaddress import IPv4Address
from unittest.mock import ANY, AsyncMock, patch
from duco_connectivity import (
BoardInfo,
DucoConnectionError,
DucoError,
DucoResponseError,
LanInfo,
)
from duco_connectivity import BoardInfo, DucoConnectionError, DucoError, LanInfo
import pytest
from homeassistant.components.duco.const import DOMAIN
@@ -20,7 +14,7 @@ from homeassistant.data_entry_flow import FlowResultType
from homeassistant.helpers.service_info.dhcp import DhcpServiceInfo
from homeassistant.helpers.service_info.zeroconf import ZeroconfServiceInfo
from .conftest import TEST_HOST, TEST_MAC, UNSUPPORTED_BOARD_INFOS, USER_INPUT
from .conftest import TEST_HOST, TEST_MAC, USER_INPUT
from tests.common import MockConfigEntry
@@ -40,48 +34,6 @@ DHCP_DISCOVERY = DhcpServiceInfo(
macaddress="aabbccddeeff",
)
_SUPPORTED_BOARD_INFOS = [
pytest.param(
BoardInfo(
box_name="ENERGY",
box_sub_type_name="Eu",
serial_board_box="ABC123",
serial_board_comm="DEF456",
serial_duco_box="GHI789",
serial_duco_comm="JKL012",
time=1700000000,
public_api_version="2.5",
),
id="energy-supported",
),
pytest.param(
BoardInfo(
box_name="FOCUS",
box_sub_type_name="Eu",
serial_board_box="ABC123",
serial_board_comm="DEF456",
serial_duco_box="GHI789",
serial_duco_comm="JKL012",
time=1700000000,
public_api_version="2.5",
),
id="focus-supported",
),
pytest.param(
BoardInfo(
box_name="SOMETHING_NEW",
box_sub_type_name="Eu",
serial_board_box="ABC123",
serial_board_comm="DEF456",
serial_duco_box="GHI789",
serial_duco_comm="JKL012",
time=1700000000,
public_api_version="2.5",
),
id="unknown-box-name-supported",
),
]
@pytest.mark.usefixtures("mock_setup_entry")
async def test_user_flow_success(
@@ -110,8 +62,6 @@ async def test_user_flow_success(
[
(DucoConnectionError("Connection refused"), "cannot_connect"),
(DucoError("Unexpected error"), "unknown"),
(DucoResponseError(500, "/info"), "unknown"),
(DucoResponseError(404, "/info"), "unsupported_board"),
],
)
@pytest.mark.usefixtures("mock_setup_entry")
@@ -240,7 +190,6 @@ async def test_zeroconf_discovery_already_configured_same_ip(
[
(DucoConnectionError("Connection refused"), "cannot_connect"),
(DucoError("Unexpected error"), "unknown"),
(DucoResponseError(404, "/info"), "unsupported_board"),
],
)
async def test_zeroconf_discovery_exceptions(
@@ -336,7 +285,6 @@ async def test_reconfigure_flow_wrong_device(
[
(DucoConnectionError("Connection refused"), "cannot_connect"),
(DucoError("Unexpected error"), "unknown"),
(DucoResponseError(500, "/info"), "unknown"),
],
)
async def test_reconfigure_flow_error(
@@ -369,35 +317,6 @@ async def test_reconfigure_flow_error(
assert result["reason"] == "reconfigure_successful"
@pytest.mark.usefixtures("mock_setup_entry")
async def test_reconfigure_flow_without_info_endpoint(
hass: HomeAssistant,
mock_duco_client: AsyncMock,
mock_config_entry: MockConfigEntry,
) -> None:
"""Test reconfigure flow rejects boards that do not expose the supported API."""
mock_config_entry.add_to_hass(hass)
result = await mock_config_entry.start_reconfigure_flow(hass)
mock_duco_client.async_get_board_info.side_effect = DucoResponseError(404, "/info")
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {CONF_HOST: "192.168.1.50"}
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "reconfigure"
assert result["errors"] == {"base": "unsupported_board"}
mock_duco_client.async_get_board_info.side_effect = None
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {CONF_HOST: "192.168.1.200"}
)
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "reconfigure_successful"
@pytest.mark.usefixtures("mock_setup_entry")
async def test_dhcp_discovery_new_device(
hass: HomeAssistant, mock_duco_client: AsyncMock
@@ -472,7 +391,6 @@ async def test_dhcp_discovery_already_configured_same_ip(
[
(DucoConnectionError("Connection refused"), "cannot_connect"),
(DucoError("Unexpected error"), "unknown"),
(DucoResponseError(404, "/info"), "unsupported_board"),
],
)
async def test_dhcp_discovery_exceptions(
@@ -531,172 +449,6 @@ async def test_dhcp_discovery_exception_recovery(
assert result["result"].unique_id == TEST_MAC
@pytest.mark.usefixtures("mock_setup_entry")
@pytest.mark.parametrize("unsupported_board_info", UNSUPPORTED_BOARD_INFOS)
async def test_user_flow_unsupported_board_from_board_info(
hass: HomeAssistant,
mock_duco_client: AsyncMock,
mock_board_info: BoardInfo,
unsupported_board_info: BoardInfo,
) -> None:
"""Test user flow shows unsupported_board error when board validation fails."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
mock_duco_client.async_get_board_info.return_value = unsupported_board_info
result = await hass.config_entries.flow.async_configure(
result["flow_id"], USER_INPUT
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "user"
assert result["errors"] == {"base": "unsupported_board"}
mock_duco_client.async_get_board_info.return_value = mock_board_info
result = await hass.config_entries.flow.async_configure(
result["flow_id"], USER_INPUT
)
assert result["type"] is FlowResultType.CREATE_ENTRY
@pytest.mark.usefixtures("mock_setup_entry")
@pytest.mark.parametrize("supported_board_info", _SUPPORTED_BOARD_INFOS)
async def test_user_flow_allows_api_compatible_board_info(
hass: HomeAssistant,
mock_duco_client: AsyncMock,
supported_board_info: BoardInfo,
) -> None:
"""Test user flow allows boards that expose a compatible API version."""
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)
mock_duco_client.async_get_board_info.return_value = supported_board_info
result = await hass.config_entries.flow.async_configure(
result["flow_id"], USER_INPUT
)
assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == str(supported_board_info.box_name)
assert result["result"].unique_id == TEST_MAC
@pytest.mark.usefixtures("mock_setup_entry")
@pytest.mark.parametrize("unsupported_board_info", UNSUPPORTED_BOARD_INFOS)
async def test_reconfigure_flow_unsupported_board_from_board_info(
hass: HomeAssistant,
mock_duco_client: AsyncMock,
mock_config_entry: MockConfigEntry,
mock_board_info: BoardInfo,
unsupported_board_info: BoardInfo,
) -> None:
"""Test reconfigure flow shows unsupported_board when board validation fails."""
mock_config_entry.add_to_hass(hass)
result = await mock_config_entry.start_reconfigure_flow(hass)
mock_duco_client.async_get_board_info.return_value = unsupported_board_info
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {CONF_HOST: "192.168.1.50"}
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "reconfigure"
assert result["errors"] == {"base": "unsupported_board"}
mock_duco_client.async_get_board_info.return_value = mock_board_info
result = await hass.config_entries.flow.async_configure(
result["flow_id"], {CONF_HOST: "192.168.1.200"}
)
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "reconfigure_successful"
@pytest.mark.parametrize("supported_board_info", _SUPPORTED_BOARD_INFOS)
async def test_zeroconf_discovery_allows_api_compatible_board_info(
hass: HomeAssistant,
mock_duco_client: AsyncMock,
supported_board_info: BoardInfo,
) -> None:
"""Test zeroconf discovery allows boards that expose a compatible API version."""
mock_duco_client.async_get_board_info.return_value = supported_board_info
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_ZEROCONF},
data=ZEROCONF_DISCOVERY,
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "discovery_confirm"
assert result["description_placeholders"] == {
"name": str(supported_board_info.box_name)
}
@pytest.mark.parametrize("unsupported_board_info", UNSUPPORTED_BOARD_INFOS)
async def test_zeroconf_discovery_unsupported_board_from_board_info(
hass: HomeAssistant,
mock_duco_client: AsyncMock,
unsupported_board_info: BoardInfo,
) -> None:
"""Test zeroconf discovery aborts with unsupported_board when board validation fails."""
mock_duco_client.async_get_board_info.return_value = unsupported_board_info
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_ZEROCONF},
data=ZEROCONF_DISCOVERY,
)
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "unsupported_board"
@pytest.mark.parametrize("supported_board_info", _SUPPORTED_BOARD_INFOS)
async def test_dhcp_discovery_allows_api_compatible_board_info(
hass: HomeAssistant,
mock_duco_client: AsyncMock,
supported_board_info: BoardInfo,
) -> None:
"""Test DHCP discovery allows boards that expose a compatible API version."""
mock_duco_client.async_get_board_info.return_value = supported_board_info
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_DHCP},
data=DHCP_DISCOVERY,
)
assert result["type"] is FlowResultType.FORM
assert result["step_id"] == "discovery_confirm"
assert result["description_placeholders"] == {
"name": str(supported_board_info.box_name)
}
@pytest.mark.parametrize("unsupported_board_info", UNSUPPORTED_BOARD_INFOS)
async def test_dhcp_discovery_unsupported_board_from_board_info(
hass: HomeAssistant,
mock_duco_client: AsyncMock,
unsupported_board_info: BoardInfo,
) -> None:
"""Test DHCP discovery aborts with unsupported_board when board validation fails."""
mock_duco_client.async_get_board_info.return_value = unsupported_board_info
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": SOURCE_DHCP},
data=DHCP_DISCOVERY,
)
assert result["type"] is FlowResultType.ABORT
assert result["reason"] == "unsupported_board"
@pytest.mark.usefixtures("mock_setup_entry")
async def test_user_flow_initializes_client_with_host(
hass: HomeAssistant, mock_board_info: BoardInfo, mock_lan_info: LanInfo
+6 -13
View File
@@ -62,17 +62,18 @@ async def test_diagnostics_connection_error(
assert response.status == HTTPStatus.INTERNAL_SERVER_ERROR
async def test_diagnostics_without_optional_software_version(
async def test_diagnostics_without_optional_board_metadata(
hass: HomeAssistant,
hass_client: ClientSessionGenerator,
mock_config_entry: MockConfigEntry,
mock_duco_client: AsyncMock,
) -> None:
"""Test that an optional software version is omitted from diagnostics."""
"""Test that None board fields are omitted from the diagnostics payload."""
# BoardInfo is a frozen dataclass, so the mock must be updated before
# integration setup — the coordinator stores board_info during async_setup.
mock_duco_client.async_get_board_info.return_value = replace(
mock_duco_client.async_get_board_info.return_value,
public_api_version=None,
software_version=None,
)
mock_config_entry.add_to_hass(hass)
@@ -83,9 +84,7 @@ async def test_diagnostics_without_optional_software_version(
hass, hass_client, mock_config_entry
)
assert diagnostics["board_info"]["public_api_version"] == str(
mock_duco_client.async_get_board_info.return_value.public_api_version
)
assert "public_api_version" not in diagnostics["board_info"]
assert "software_version" not in diagnostics["board_info"]
@@ -97,16 +96,10 @@ async def test_diagnostics_without_optional_api_metadata(
mock_duco_client: AsyncMock,
) -> None:
"""Test diagnostics when optional API metadata is absent."""
mock_duco_client.async_get_api_info.return_value = ApiInfo(
api_version=mock_duco_client.async_get_api_info.return_value.api_version
)
mock_duco_client.async_get_api_info.return_value = ApiInfo(api_version="2.5")
diagnostics = await get_diagnostics_for_config_entry(
hass, hass_client, mock_config_entry
)
assert diagnostics["api_info"] == {
"public_api_version": str(
mock_duco_client.async_get_api_info.return_value.api_version
)
}
assert diagnostics["api_info"] == {"public_api_version": "2.5"}
+8 -64
View File
@@ -8,7 +8,6 @@ from duco_connectivity import (
DiagStatus,
DucoConnectionError,
DucoError,
DucoResponseError,
LanInfo,
Node,
)
@@ -19,47 +18,28 @@ from homeassistant.const import Platform
from homeassistant.core import HomeAssistant
from homeassistant.helpers import entity_registry as er
from .conftest import TEST_HOST, TEST_MAC, UNSUPPORTED_BOARD_INFOS
from .conftest import TEST_HOST, TEST_MAC
from tests.common import MockConfigEntry
@pytest.mark.parametrize(
(
"method",
"exception",
"expected_state",
"expected_translation_key",
"has_error_translation_placeholder",
),
("method", "exception", "expected_state"),
[
(
"async_get_board_info",
DucoConnectionError("Connection refused"),
ConfigEntryState.SETUP_RETRY,
None,
False,
),
(
"async_get_board_info",
DucoError("Unexpected API error"),
ConfigEntryState.SETUP_ERROR,
"api_error",
True,
),
(
"async_get_board_info",
DucoResponseError(500, "/info"),
ConfigEntryState.SETUP_ERROR,
"api_error",
True,
),
(
"async_get_nodes",
DucoConnectionError("Connection refused"),
ConfigEntryState.SETUP_RETRY,
None,
False,
),
],
)
@@ -70,8 +50,6 @@ async def test_setup_entry_error(
method: str,
exception: Exception,
expected_state: ConfigEntryState,
expected_translation_key: str | None,
has_error_translation_placeholder: bool,
) -> None:
"""Test that fetch errors during setup result in the correct state."""
getattr(mock_duco_client, method).side_effect = exception
@@ -80,13 +58,15 @@ async def test_setup_entry_error(
await hass.async_block_till_done()
assert mock_config_entry.state is expected_state
assert mock_config_entry.error_reason_translation_key == expected_translation_key
if has_error_translation_placeholder:
if (
method == "async_get_board_info"
and isinstance(exception, DucoError)
and expected_state is ConfigEntryState.SETUP_ERROR
):
assert mock_config_entry.error_reason_translation_key == "api_error"
assert mock_config_entry.error_reason_translation_placeholders == {
"error": repr(exception)
}
else:
assert mock_config_entry.error_reason_translation_placeholders is None
@pytest.mark.usefixtures("mock_duco_client")
@@ -98,42 +78,6 @@ async def test_setup_entry_success(
assert init_integration.state is ConfigEntryState.LOADED
@pytest.mark.parametrize("unsupported_board_info", UNSUPPORTED_BOARD_INFOS)
async def test_setup_entry_unsupported_board_info(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
mock_duco_client: AsyncMock,
unsupported_board_info: BoardInfo,
) -> None:
"""Test that unsupported board info blocks setup for existing entries."""
mock_duco_client.async_get_board_info.return_value = unsupported_board_info
mock_config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(mock_config_entry.entry_id)
await hass.async_block_till_done()
assert mock_config_entry.state is ConfigEntryState.SETUP_ERROR
assert mock_config_entry.error_reason_translation_key == "unsupported_board"
assert mock_config_entry.error_reason_translation_placeholders is None
async def test_setup_entry_unsupported_board_without_info_endpoint(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
mock_duco_client: AsyncMock,
) -> None:
"""Test that setup fails when the board does not expose /info."""
mock_duco_client.async_get_board_info.side_effect = DucoResponseError(404, "/info")
mock_config_entry.add_to_hass(hass)
await hass.config_entries.async_setup(mock_config_entry.entry_id)
await hass.async_block_till_done()
assert mock_config_entry.state is ConfigEntryState.SETUP_ERROR
assert mock_config_entry.error_reason_translation_key == "unsupported_board"
assert mock_config_entry.error_reason_translation_placeholders is None
async def test_unload_entry(
hass: HomeAssistant,
init_integration: MockConfigEntry,
+23 -1
View File
@@ -9,11 +9,13 @@ import jwt
import pytest
from yoto_api import (
Card,
Chapter,
Device,
PlaybackEvent,
PlaybackStatus,
PlayerInfo,
PlayerStatus,
Track,
YotoPlayer,
)
@@ -36,12 +38,32 @@ ACCESS_TOKEN = jwt.encode({"sub": USER_ID}, "test-secret-long-enough-for-hmac-sh
def _build_card() -> Card:
"""Build a representative Yoto library card."""
"""Build a representative Yoto library card with chapters and tracks."""
return Card(
id=CARD_ID,
title="Outer Space",
author="Ladybird Audio Adventures",
cover_image_large="https://example.test/cover.jpg",
chapters={
"01": Chapter(
key="01",
title="Introduction",
icon="https://example.test/ch01.png",
tracks={
"01-INT": Track(key="01-INT", title="Welcome", duration=120),
"01-MAIN": Track(
key="01-MAIN", title="The Story Begins", duration=240
),
},
),
"02": Chapter(
key="02",
title="Planets",
tracks={
"02-MER": Track(key="02-MER", title="Mercury", duration=180),
},
),
},
)
@@ -31,7 +31,7 @@
'platform': 'yoto',
'previous_unique_id': None,
'suggested_object_id': None,
'supported_features': <MediaPlayerEntityFeature: 21559>,
'supported_features': <MediaPlayerEntityFeature: 153143>,
'translation_key': None,
'unique_id': 'player-test',
'unit_of_measurement': None,
@@ -50,7 +50,7 @@
'media_position': 120,
'media_position_updated_at': datetime.datetime(2026, 5, 8, 12, 0, tzinfo=datetime.timezone.utc),
'media_title': 'Introduction',
'supported_features': <MediaPlayerEntityFeature: 21559>,
'supported_features': <MediaPlayerEntityFeature: 153143>,
'volume_level': 0.5,
}),
'context': <ANY>,
+402 -6
View File
@@ -1,11 +1,12 @@
"""Tests for the Yoto media player platform."""
from typing import Any
from unittest.mock import MagicMock
from freezegun.api import FrozenDateTimeFactory
import pytest
from syrupy.assertion import SnapshotAssertion
from yoto_api import YotoError
from yoto_api import Chapter, Track, YotoError
from homeassistant.components.media_player import (
ATTR_MEDIA_SEEK_POSITION,
@@ -17,22 +18,46 @@ from homeassistant.components.media_player import (
SERVICE_MEDIA_PREVIOUS_TRACK,
SERVICE_MEDIA_SEEK,
SERVICE_MEDIA_STOP,
SERVICE_PLAY_MEDIA,
SERVICE_VOLUME_SET,
MediaPlayerState,
)
from homeassistant.const import ATTR_ENTITY_ID, STATE_UNAVAILABLE
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.exceptions import HomeAssistantError, ServiceValidationError
from homeassistant.helpers import entity_registry as er
from . import setup_integration
from tests.common import MockConfigEntry, snapshot_platform
from tests.typing import WebSocketGenerator
ENTITY_ID = "media_player.nursery_yoto"
pytestmark = pytest.mark.usefixtures("setup_credentials")
def _build_chapters(structure: list[tuple[str, int]]) -> dict[str, Chapter]:
"""Build chapters from a list of ``(chapter_title, track_count)`` tuples."""
chapters = {}
for index, (title, track_count) in enumerate(structure, start=1):
chapter_key = f"{index:02d}"
chapters[chapter_key] = Chapter(
key=chapter_key,
title=title,
icon=f"https://example.test/ch{chapter_key}.png",
tracks={
f"{chapter_key}-{track:02d}": Track(
key=f"{chapter_key}-{track:02d}",
title=f"{title} - Track {track}",
duration=60,
)
for track in range(1, track_count + 1)
},
)
return chapters
@pytest.mark.usefixtures("mock_token_hex", "mock_yoto_client")
async def test_entity_state(
hass: HomeAssistant,
@@ -160,22 +185,393 @@ async def test_state_idle_before_first_event(
state = hass.states.get(ENTITY_ID)
assert state is not None
assert state.state == "idle"
assert state.state == MediaPlayerState.IDLE
@pytest.mark.parametrize(
("media_content_id", "expected_call"),
[
(
"yoto://card-test",
{"chapter_key": None, "track_key": None, "seconds_in": None},
),
(
"yoto://card-test/01",
{"chapter_key": "01", "track_key": "01-INT", "seconds_in": 0},
),
(
"yoto://card-test/01/01-INT",
{"chapter_key": "01", "track_key": "01-INT", "seconds_in": 0},
),
],
)
async def test_play_media(
hass: HomeAssistant,
mock_yoto_client: MagicMock,
mock_config_entry: MockConfigEntry,
media_content_id: str,
expected_call: dict[str, Any],
) -> None:
"""play_media routes a yoto:// URI to the right play_card call."""
await setup_integration(hass, mock_config_entry)
await hass.services.async_call(
MEDIA_PLAYER_DOMAIN,
SERVICE_PLAY_MEDIA,
{
ATTR_ENTITY_ID: ENTITY_ID,
"media_content_type": "music",
"media_content_id": media_content_id,
},
blocking=True,
)
mock_yoto_client.play_card.assert_called_once_with(
"player-test", "card-test", **expected_call
)
@pytest.mark.usefixtures("mock_yoto_client")
@pytest.mark.parametrize(
"media_content_id",
[
pytest.param("spotify:track:abc", id="wrong_scheme"),
pytest.param("yoto://", id="empty_path"),
pytest.param("yoto://card/chapter/track/extra", id="too_many_segments"),
],
)
async def test_play_media_invalid_uri_raises(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
media_content_id: str,
) -> None:
"""A media_id that isn't a complete yoto:// URI is rejected."""
await setup_integration(hass, mock_config_entry)
with pytest.raises(ServiceValidationError):
await hass.services.async_call(
MEDIA_PLAYER_DOMAIN,
SERVICE_PLAY_MEDIA,
{
ATTR_ENTITY_ID: ENTITY_ID,
"media_content_type": "music",
"media_content_id": media_content_id,
},
blocking=True,
)
@pytest.mark.parametrize(
"media_content_id",
[
pytest.param("yoto://does-not-exist", id="unknown_card"),
pytest.param("yoto://card-test/does-not-exist", id="unknown_chapter"),
pytest.param("yoto://card-test/01/does-not-exist", id="unknown_track"),
],
)
async def test_play_media_unknown_target_raises(
hass: HomeAssistant,
mock_yoto_client: MagicMock,
mock_config_entry: MockConfigEntry,
media_content_id: str,
) -> None:
"""A yoto:// URI pointing at unknown content is rejected."""
await setup_integration(hass, mock_config_entry)
with pytest.raises(ServiceValidationError):
await hass.services.async_call(
MEDIA_PLAYER_DOMAIN,
SERVICE_PLAY_MEDIA,
{
ATTR_ENTITY_ID: ENTITY_ID,
"media_content_type": "music",
"media_content_id": media_content_id,
},
blocking=True,
)
mock_yoto_client.play_card.assert_not_called()
@pytest.mark.usefixtures("mock_yoto_client")
async def test_browse_media_root_lists_cards(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
hass_ws_client: WebSocketGenerator,
) -> None:
"""Browsing without a content id lists every library card."""
await setup_integration(hass, mock_config_entry)
client = await hass_ws_client()
await client.send_json(
{"id": 1, "type": "media_player/browse_media", "entity_id": ENTITY_ID}
)
response = await client.receive_json()
assert response["success"]
children = response["result"]["children"]
assert len(children) == 1
assert children[0]["title"] == "Outer Space"
assert children[0]["media_content_id"] == "yoto://card-test"
assert children[0]["can_play"] is True
assert children[0]["can_expand"] is True
async def test_browse_card_with_multiple_chapters_and_multiple_tracks(
hass: HomeAssistant,
mock_yoto_client: MagicMock,
mock_config_entry: MockConfigEntry,
hass_ws_client: WebSocketGenerator,
) -> None:
"""N-N: multi-chapter card, multi-track chapters: list expandable chapters."""
card = mock_yoto_client.library["card-test"]
card.chapters = _build_chapters([("Intro", 2), ("Planets", 3)])
await setup_integration(hass, mock_config_entry)
client = await hass_ws_client()
await client.send_json(
{
"id": 1,
"type": "media_player/browse_media",
"entity_id": ENTITY_ID,
"media_content_type": "music",
"media_content_id": "yoto://card-test",
}
)
response = await client.receive_json()
assert response["success"]
children = response["result"]["children"]
assert [c["title"] for c in children] == ["Intro", "Planets"]
assert all(c["can_expand"] for c in children)
async def test_browse_card_with_multiple_chapters_and_single_track(
hass: HomeAssistant,
mock_yoto_client: MagicMock,
mock_config_entry: MockConfigEntry,
hass_ws_client: WebSocketGenerator,
) -> None:
"""N-1: multi-chapter card, single-track chapters: list non-expandable chapters."""
card = mock_yoto_client.library["card-test"]
card.chapters = _build_chapters([("Song A", 1), ("Song B", 1), ("Song C", 1)])
await setup_integration(hass, mock_config_entry)
client = await hass_ws_client()
await client.send_json(
{
"id": 1,
"type": "media_player/browse_media",
"entity_id": ENTITY_ID,
"media_content_type": "music",
"media_content_id": "yoto://card-test",
}
)
response = await client.receive_json()
assert response["success"]
children = response["result"]["children"]
assert [c["title"] for c in children] == ["Song A", "Song B", "Song C"]
assert not any(c["can_expand"] for c in children)
async def test_browse_card_with_single_chapter_collapses_to_tracks(
hass: HomeAssistant,
mock_yoto_client: MagicMock,
mock_config_entry: MockConfigEntry,
hass_ws_client: WebSocketGenerator,
) -> None:
"""1-N: single-chapter card expands straight to tracks (skips chapter level)."""
card = mock_yoto_client.library["card-test"]
card.chapters = _build_chapters([("Only chapter", 3)])
await setup_integration(hass, mock_config_entry)
client = await hass_ws_client()
await client.send_json(
{
"id": 1,
"type": "media_player/browse_media",
"entity_id": ENTITY_ID,
"media_content_type": "music",
"media_content_id": "yoto://card-test",
}
)
response = await client.receive_json()
assert response["success"]
children = response["result"]["children"]
assert [c["title"] for c in children] == [
"Only chapter - Track 1",
"Only chapter - Track 2",
"Only chapter - Track 3",
]
assert children[0]["media_content_id"] == "yoto://card-test/01/01-01"
@pytest.mark.usefixtures("mock_yoto_client")
async def test_browse_media_chapter_shows_tracks(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
hass_ws_client: WebSocketGenerator,
) -> None:
"""Browsing a chapter lists its tracks."""
await setup_integration(hass, mock_config_entry)
client = await hass_ws_client()
await client.send_json(
{
"id": 1,
"type": "media_player/browse_media",
"entity_id": ENTITY_ID,
"media_content_type": "playlist",
"media_content_id": "yoto://card-test/01",
}
)
response = await client.receive_json()
assert response["success"]
children = response["result"]["children"]
assert [c["title"] for c in children] == ["Welcome", "The Story Begins"]
assert children[0]["media_content_id"] == "yoto://card-test/01/01-INT"
async def test_browse_media_fetches_card_detail_lazily(
hass: HomeAssistant,
mock_yoto_client: MagicMock,
mock_config_entry: MockConfigEntry,
hass_ws_client: WebSocketGenerator,
) -> None:
"""Browsing a card without loaded chapters triggers update_card_detail."""
card = mock_yoto_client.library["card-test"]
card.chapters = {}
async def _populate(card_id: str) -> None:
card.chapters = {"01": Chapter(key="01", title="Intro", tracks={})}
mock_yoto_client.update_card_detail.side_effect = _populate
await setup_integration(hass, mock_config_entry)
client = await hass_ws_client()
await client.send_json(
{
"id": 1,
"type": "media_player/browse_media",
"entity_id": ENTITY_ID,
"media_content_type": "album",
"media_content_id": "yoto://card-test",
}
)
response = await client.receive_json()
assert response["success"]
mock_yoto_client.update_card_detail.assert_called_once_with("card-test")
@pytest.mark.usefixtures("mock_yoto_client")
async def test_browse_media_unknown_card_raises(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
hass_ws_client: WebSocketGenerator,
) -> None:
"""Browsing a card that's not in the library returns a browse error."""
await setup_integration(hass, mock_config_entry)
client = await hass_ws_client()
await client.send_json(
{
"id": 1,
"type": "media_player/browse_media",
"entity_id": ENTITY_ID,
"media_content_type": "album",
"media_content_id": "yoto://does-not-exist",
}
)
response = await client.receive_json()
assert response["success"] is False
@pytest.mark.usefixtures("mock_yoto_client")
async def test_browse_media_unknown_chapter_raises(
hass: HomeAssistant,
mock_config_entry: MockConfigEntry,
hass_ws_client: WebSocketGenerator,
) -> None:
"""Browsing a chapter that's not in the card returns a browse error."""
await setup_integration(hass, mock_config_entry)
client = await hass_ws_client()
await client.send_json(
{
"id": 1,
"type": "media_player/browse_media",
"entity_id": ENTITY_ID,
"media_content_type": "playlist",
"media_content_id": "yoto://card-test/does-not-exist",
}
)
response = await client.receive_json()
assert response["success"] is False
async def test_browse_media_card_detail_failure_raises(
hass: HomeAssistant,
mock_yoto_client: MagicMock,
mock_config_entry: MockConfigEntry,
hass_ws_client: WebSocketGenerator,
) -> None:
"""A failure fetching card chapters bubbles up as a browse error."""
card = mock_yoto_client.library["card-test"]
card.chapters = {}
mock_yoto_client.update_card_detail.side_effect = YotoError("offline")
await setup_integration(hass, mock_config_entry)
client = await hass_ws_client()
await client.send_json(
{
"id": 1,
"type": "media_player/browse_media",
"entity_id": ENTITY_ID,
"media_content_type": "album",
"media_content_id": "yoto://card-test",
}
)
response = await client.receive_json()
assert response["success"] is False
@pytest.mark.parametrize(
("client_method", "service", "service_data"),
[
pytest.param("pause", SERVICE_MEDIA_PAUSE, {}, id="playback"),
pytest.param(
"play_card",
SERVICE_PLAY_MEDIA,
{"media_content_type": "music", "media_content_id": "yoto://card-test"},
id="play_media",
),
],
)
async def test_command_error_raises(
hass: HomeAssistant,
mock_yoto_client: MagicMock,
mock_config_entry: MockConfigEntry,
client_method: str,
service: str,
service_data: dict[str, Any],
) -> None:
"""Yoto command failures surface as HomeAssistantError."""
await setup_integration(hass, mock_config_entry)
mock_yoto_client.pause.side_effect = YotoError("nope")
getattr(mock_yoto_client, client_method).side_effect = YotoError("nope")
with pytest.raises(HomeAssistantError):
await hass.services.async_call(
MEDIA_PLAYER_DOMAIN,
SERVICE_MEDIA_PAUSE,
{ATTR_ENTITY_ID: ENTITY_ID},
service,
{ATTR_ENTITY_ID: ENTITY_ID, **service_data},
blocking=True,
)