Add dlna_dms integration to support DLNA Digital Media Servers (#66437)

This commit is contained in:
Michael Chisholm
2022-02-22 10:14:08 +11:00
committed by GitHub
parent 95de1dd446
commit b19bf9b147
20 changed files with 3610 additions and 0 deletions

View File

@ -214,6 +214,8 @@ homeassistant/components/digital_ocean/* @fabaff
homeassistant/components/discogs/* @thibmaek
homeassistant/components/dlna_dmr/* @StevenLooman @chishm
tests/components/dlna_dmr/* @StevenLooman @chishm
homeassistant/components/dlna_dms/* @chishm
tests/components/dlna_dms/* @chishm
homeassistant/components/dnsip/* @gjohansson-ST
tests/components/dnsip/* @gjohansson-ST
homeassistant/components/doorbird/* @oblogic7 @bdraco @flacjacket

View File

@ -0,0 +1,28 @@
"""The DLNA Digital Media Server integration.
A single config entry is used, with SSDP discovery for media servers. Each
server is wrapped in a DmsEntity, and the server's USN is used as the unique_id.
"""
from __future__ import annotations
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from .const import LOGGER
from .dms import get_domain_data
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up DLNA DMS device from a config entry."""
LOGGER.debug("Setting up config entry: %s", entry.unique_id)
# Forward setup to this domain's data manager
return await get_domain_data(hass).async_setup_entry(entry)
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload a config entry."""
LOGGER.debug("Unloading config entry: %s", entry.unique_id)
# Forward unload to this domain's data manager
return await get_domain_data(hass).async_unload_entry(entry)

View File

@ -0,0 +1,177 @@
"""Config flow for DLNA DMS."""
from __future__ import annotations
import logging
from pprint import pformat
from typing import Any, cast
from urllib.parse import urlparse
from async_upnp_client.profiles.dlna import DmsDevice
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.components import ssdp
from homeassistant.const import CONF_DEVICE_ID, CONF_HOST, CONF_URL
from homeassistant.data_entry_flow import AbortFlow, FlowResult
from homeassistant.exceptions import IntegrationError
from .const import DEFAULT_NAME, DOMAIN
LOGGER = logging.getLogger(__name__)
class ConnectError(IntegrationError):
"""Error occurred when trying to connect to a device."""
class DlnaDmsFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
"""Handle a DLNA DMS config flow.
The Unique Service Name (USN) of the DMS device is used as the unique_id for
config entries and for entities. This USN may differ from the root USN if
the DMS is an embedded device.
"""
VERSION = 1
def __init__(self) -> None:
"""Initialize flow."""
self._discoveries: dict[str, ssdp.SsdpServiceInfo] = {}
self._location: str | None = None
self._usn: str | None = None
self._name: str | None = None
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Handle a flow initialized by the user by listing unconfigured devices."""
LOGGER.debug("async_step_user: user_input: %s", user_input)
if user_input is not None and (host := user_input.get(CONF_HOST)):
# User has chosen a device
discovery = self._discoveries[host]
await self._async_parse_discovery(discovery)
return self._create_entry()
if not (discoveries := await self._async_get_discoveries()):
# Nothing found, abort configuration
return self.async_abort(reason="no_devices_found")
self._discoveries = {
cast(str, urlparse(discovery.ssdp_location).hostname): discovery
for discovery in discoveries
}
discovery_choices = {
host: f"{discovery.upnp.get(ssdp.ATTR_UPNP_FRIENDLY_NAME)} ({host})"
for host, discovery in self._discoveries.items()
}
data_schema = vol.Schema({vol.Optional(CONF_HOST): vol.In(discovery_choices)})
return self.async_show_form(step_id="user", data_schema=data_schema)
async def async_step_ssdp(self, discovery_info: ssdp.SsdpServiceInfo) -> FlowResult:
"""Handle a flow initialized by SSDP discovery."""
LOGGER.debug("async_step_ssdp: discovery_info %s", pformat(discovery_info))
await self._async_parse_discovery(discovery_info)
# Abort if the device doesn't support all services required for a DmsDevice.
# Use the discovery_info instead of DmsDevice.is_profile_device to avoid
# contacting the device again.
discovery_service_list = discovery_info.upnp.get(ssdp.ATTR_UPNP_SERVICE_LIST)
if not discovery_service_list:
return self.async_abort(reason="not_dms")
discovery_service_ids = {
service.get("serviceId")
for service in discovery_service_list.get("service") or []
}
if not DmsDevice.SERVICE_IDS.issubset(discovery_service_ids):
return self.async_abort(reason="not_dms")
# Abort if another config entry has the same location, in case the
# device doesn't have a static and unique UDN (breaking the UPnP spec).
self._async_abort_entries_match({CONF_URL: self._location})
self.context["title_placeholders"] = {"name": self._name}
return await self.async_step_confirm()
async def async_step_confirm(
self, user_input: dict[str, Any] | None = None
) -> FlowResult:
"""Allow the user to confirm adding the device."""
LOGGER.debug("async_step_confirm: %s", user_input)
if user_input is not None:
return self._create_entry()
self._set_confirm_only()
return self.async_show_form(step_id="confirm")
def _create_entry(self) -> FlowResult:
"""Create a config entry, assuming all required information is now known."""
LOGGER.debug(
"_async_create_entry: location: %s, USN: %s", self._location, self._usn
)
assert self._name
assert self._location
assert self._usn
data = {CONF_URL: self._location, CONF_DEVICE_ID: self._usn}
return self.async_create_entry(title=self._name, data=data)
async def _async_parse_discovery(
self, discovery_info: ssdp.SsdpServiceInfo
) -> None:
"""Get required details from an SSDP discovery.
Aborts if a device matching the SSDP USN has already been configured.
"""
LOGGER.debug(
"_async_parse_discovery: location: %s, USN: %s",
discovery_info.ssdp_location,
discovery_info.ssdp_usn,
)
if not discovery_info.ssdp_location or not discovery_info.ssdp_usn:
raise AbortFlow("bad_ssdp")
if not self._location:
self._location = discovery_info.ssdp_location
self._usn = discovery_info.ssdp_usn
await self.async_set_unique_id(self._usn)
# Abort if already configured, but update the last-known location
self._abort_if_unique_id_configured(
updates={CONF_URL: self._location}, reload_on_update=False
)
self._name = (
discovery_info.upnp.get(ssdp.ATTR_UPNP_FRIENDLY_NAME)
or urlparse(self._location).hostname
or DEFAULT_NAME
)
async def _async_get_discoveries(self) -> list[ssdp.SsdpServiceInfo]:
"""Get list of unconfigured DLNA devices discovered by SSDP."""
LOGGER.debug("_get_discoveries")
# Get all compatible devices from ssdp's cache
discoveries: list[ssdp.SsdpServiceInfo] = []
for udn_st in DmsDevice.DEVICE_TYPES:
st_discoveries = await ssdp.async_get_discovery_info_by_st(
self.hass, udn_st
)
discoveries.extend(st_discoveries)
# Filter out devices already configured
current_unique_ids = {
entry.unique_id
for entry in self._async_current_entries(include_ignore=False)
}
discoveries = [
disc for disc in discoveries if disc.ssdp_udn not in current_unique_ids
]
return discoveries

View File

@ -0,0 +1,78 @@
"""Constants for the DLNA MediaServer integration."""
from __future__ import annotations
from collections.abc import Mapping
import logging
from typing import Final
from homeassistant.components.media_player import const as _mp_const
LOGGER = logging.getLogger(__package__)
DOMAIN: Final = "dlna_dms"
DEFAULT_NAME: Final = "DLNA Media Server"
SOURCE_SEP: Final = "/"
ROOT_OBJECT_ID: Final = "0"
PATH_SEP: Final = "/"
PATH_SEARCH_FLAG: Final = "?"
PATH_OBJECT_ID_FLAG: Final = ":"
# Only request the metadata needed to build a browse response
DLNA_BROWSE_FILTER: Final = [
"id",
"upnp:class",
"dc:title",
"res",
"@childCount",
"upnp:albumArtURI",
]
# Get all metadata when resolving, for the use of media_players
DLNA_RESOLVE_FILTER: Final = "*"
# Metadata needed to resolve a path
DLNA_PATH_FILTER: Final = ["id", "upnp:class", "dc:title"]
DLNA_SORT_CRITERIA: Final = ["+upnp:class", "+upnp:originalTrackNumber", "+dc:title"]
PROTOCOL_HTTP: Final = "http-get"
PROTOCOL_RTSP: Final = "rtsp-rtp-udp"
PROTOCOL_ANY: Final = "*"
STREAMABLE_PROTOCOLS: Final = [PROTOCOL_HTTP, PROTOCOL_RTSP, PROTOCOL_ANY]
# Map UPnP object class to media_player media class
MEDIA_CLASS_MAP: Mapping[str, str] = {
"object": _mp_const.MEDIA_CLASS_URL,
"object.item": _mp_const.MEDIA_CLASS_URL,
"object.item.imageItem": _mp_const.MEDIA_CLASS_IMAGE,
"object.item.imageItem.photo": _mp_const.MEDIA_CLASS_IMAGE,
"object.item.audioItem": _mp_const.MEDIA_CLASS_MUSIC,
"object.item.audioItem.musicTrack": _mp_const.MEDIA_CLASS_MUSIC,
"object.item.audioItem.audioBroadcast": _mp_const.MEDIA_CLASS_MUSIC,
"object.item.audioItem.audioBook": _mp_const.MEDIA_CLASS_PODCAST,
"object.item.videoItem": _mp_const.MEDIA_CLASS_VIDEO,
"object.item.videoItem.movie": _mp_const.MEDIA_CLASS_MOVIE,
"object.item.videoItem.videoBroadcast": _mp_const.MEDIA_CLASS_TV_SHOW,
"object.item.videoItem.musicVideoClip": _mp_const.MEDIA_CLASS_VIDEO,
"object.item.playlistItem": _mp_const.MEDIA_CLASS_TRACK,
"object.item.textItem": _mp_const.MEDIA_CLASS_URL,
"object.item.bookmarkItem": _mp_const.MEDIA_CLASS_URL,
"object.item.epgItem": _mp_const.MEDIA_CLASS_EPISODE,
"object.item.epgItem.audioProgram": _mp_const.MEDIA_CLASS_MUSIC,
"object.item.epgItem.videoProgram": _mp_const.MEDIA_CLASS_VIDEO,
"object.container": _mp_const.MEDIA_CLASS_DIRECTORY,
"object.container.person": _mp_const.MEDIA_CLASS_ARTIST,
"object.container.person.musicArtist": _mp_const.MEDIA_CLASS_ARTIST,
"object.container.playlistContainer": _mp_const.MEDIA_CLASS_PLAYLIST,
"object.container.album": _mp_const.MEDIA_CLASS_ALBUM,
"object.container.album.musicAlbum": _mp_const.MEDIA_CLASS_ALBUM,
"object.container.album.photoAlbum": _mp_const.MEDIA_CLASS_ALBUM,
"object.container.genre": _mp_const.MEDIA_CLASS_GENRE,
"object.container.genre.musicGenre": _mp_const.MEDIA_CLASS_GENRE,
"object.container.genre.movieGenre": _mp_const.MEDIA_CLASS_GENRE,
"object.container.channelGroup": _mp_const.MEDIA_CLASS_CHANNEL,
"object.container.channelGroup.audioChannelGroup": _mp_const.MEDIA_TYPE_CHANNELS,
"object.container.channelGroup.videoChannelGroup": _mp_const.MEDIA_TYPE_CHANNELS,
"object.container.epgContainer": _mp_const.MEDIA_CLASS_DIRECTORY,
"object.container.storageSystem": _mp_const.MEDIA_CLASS_DIRECTORY,
"object.container.storageVolume": _mp_const.MEDIA_CLASS_DIRECTORY,
"object.container.storageFolder": _mp_const.MEDIA_CLASS_DIRECTORY,
"object.container.bookmarkFolder": _mp_const.MEDIA_CLASS_DIRECTORY,
}

View File

@ -0,0 +1,726 @@
"""Wrapper for media_source around async_upnp_client's DmsDevice ."""
from __future__ import annotations
import asyncio
from collections.abc import Callable, Coroutine
from dataclasses import dataclass
import functools
from typing import Any, TypeVar, cast
from async_upnp_client import UpnpEventHandler, UpnpFactory, UpnpRequester
from async_upnp_client.aiohttp import AiohttpSessionRequester
from async_upnp_client.const import NotificationSubType
from async_upnp_client.exceptions import UpnpActionError, UpnpConnectionError, UpnpError
from async_upnp_client.profiles.dlna import ContentDirectoryErrorCode, DmsDevice
from didl_lite import didl_lite
from homeassistant.backports.enum import StrEnum
from homeassistant.components import ssdp
from homeassistant.components.media_player.const import MEDIA_CLASS_DIRECTORY
from homeassistant.components.media_player.errors import BrowseError
from homeassistant.components.media_source.error import Unresolvable
from homeassistant.components.media_source.models import BrowseMediaSource, PlayMedia
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_DEVICE_ID, CONF_URL
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import aiohttp_client
from homeassistant.util import slugify
from .const import (
DLNA_BROWSE_FILTER,
DLNA_PATH_FILTER,
DLNA_RESOLVE_FILTER,
DLNA_SORT_CRITERIA,
DOMAIN,
LOGGER,
MEDIA_CLASS_MAP,
PATH_OBJECT_ID_FLAG,
PATH_SEARCH_FLAG,
PATH_SEP,
ROOT_OBJECT_ID,
STREAMABLE_PROTOCOLS,
)
_DlnaDmsDeviceMethod = TypeVar("_DlnaDmsDeviceMethod", bound="DmsDeviceSource")
_RetType = TypeVar("_RetType")
class DlnaDmsData:
"""Storage class for domain global data."""
hass: HomeAssistant
lock: asyncio.Lock
requester: UpnpRequester
upnp_factory: UpnpFactory
event_handler: UpnpEventHandler
devices: dict[str, DmsDeviceSource] # Indexed by config_entry.unique_id
sources: dict[str, DmsDeviceSource] # Indexed by source_id
def __init__(
self,
hass: HomeAssistant,
) -> None:
"""Initialize global data."""
self.hass = hass
self.lock = asyncio.Lock()
session = aiohttp_client.async_get_clientsession(hass, verify_ssl=False)
self.requester = AiohttpSessionRequester(session, with_sleep=True)
self.upnp_factory = UpnpFactory(self.requester, non_strict=True)
# NOTE: event_handler is not actually used, and is only created to
# satisfy the DmsDevice.__init__ signature
self.event_handler = UpnpEventHandler("", self.requester)
self.devices = {}
self.sources = {}
async def async_setup_entry(self, config_entry: ConfigEntry) -> bool:
"""Create a DMS device connection from a config entry."""
assert config_entry.unique_id
async with self.lock:
source_id = self._generate_source_id(config_entry.title)
device = DmsDeviceSource(self.hass, config_entry, source_id)
self.devices[config_entry.unique_id] = device
self.sources[device.source_id] = device
# Update the device when the associated config entry is modified
config_entry.async_on_unload(
config_entry.add_update_listener(self.async_update_entry)
)
await device.async_added_to_hass()
return True
async def async_unload_entry(self, config_entry: ConfigEntry) -> bool:
"""Unload a config entry and disconnect the corresponding DMS device."""
assert config_entry.unique_id
async with self.lock:
device = self.devices.pop(config_entry.unique_id)
del self.sources[device.source_id]
await device.async_will_remove_from_hass()
return True
async def async_update_entry(
self, hass: HomeAssistant, config_entry: ConfigEntry
) -> None:
"""Update a DMS device when the config entry changes."""
assert config_entry.unique_id
async with self.lock:
device = self.devices[config_entry.unique_id]
# Update the source_id to match the new name
del self.sources[device.source_id]
device.source_id = self._generate_source_id(config_entry.title)
self.sources[device.source_id] = device
def _generate_source_id(self, name: str) -> str:
"""Generate a unique source ID.
Caller should hold self.lock when calling this method.
"""
source_id_base = slugify(name)
if source_id_base not in self.sources:
return source_id_base
tries = 1
while (suggested_source_id := f"{source_id_base}_{tries}") in self.sources:
tries += 1
return suggested_source_id
@callback
def get_domain_data(hass: HomeAssistant) -> DlnaDmsData:
"""Obtain this integration's domain data, creating it if needed."""
if DOMAIN in hass.data:
return cast(DlnaDmsData, hass.data[DOMAIN])
data = DlnaDmsData(hass)
hass.data[DOMAIN] = data
return data
@dataclass
class DidlPlayMedia(PlayMedia):
"""Playable media with DIDL metadata."""
didl_metadata: didl_lite.DidlObject
class DlnaDmsDeviceError(BrowseError, Unresolvable):
"""Base for errors raised by DmsDeviceSource.
Caught by both media_player (BrowseError) and media_source (Unresolvable),
so DmsDeviceSource methods can be used for both browse and resolve
functionality.
"""
class DeviceConnectionError(DlnaDmsDeviceError):
"""Error occurred with the connection to the server."""
class ActionError(DlnaDmsDeviceError):
"""Error when calling a UPnP Action on the device."""
def catch_request_errors(
func: Callable[[_DlnaDmsDeviceMethod, str], Coroutine[Any, Any, _RetType]]
) -> Callable[[_DlnaDmsDeviceMethod, str], Coroutine[Any, Any, _RetType]]:
"""Catch UpnpError errors."""
@functools.wraps(func)
async def wrapper(self: _DlnaDmsDeviceMethod, req_param: str) -> _RetType:
"""Catch UpnpError errors and check availability before and after request."""
if not self.available:
LOGGER.warning("Device disappeared when trying to call %s", func.__name__)
raise DeviceConnectionError("DMS is not connected")
try:
return await func(self, req_param)
except UpnpActionError as err:
LOGGER.debug("Server failure", exc_info=err)
if err.error_code == ContentDirectoryErrorCode.NO_SUCH_OBJECT:
LOGGER.debug("No such object: %s", req_param)
raise ActionError(f"No such object: {req_param}") from err
if err.error_code == ContentDirectoryErrorCode.INVALID_SEARCH_CRITERIA:
LOGGER.debug("Invalid query: %s", req_param)
raise ActionError(f"Invalid query: {req_param}") from err
raise DeviceConnectionError(f"Server failure: {err!r}") from err
except UpnpConnectionError as err:
LOGGER.debug("Server disconnected", exc_info=err)
await self.device_disconnect()
raise DeviceConnectionError(f"Server disconnected: {err!r}") from err
except UpnpError as err:
LOGGER.debug("Server communication failure", exc_info=err)
raise DeviceConnectionError(
f"Server communication failure: {err!r}"
) from err
return wrapper
class DmsDeviceSource:
"""DMS Device wrapper, providing media files as a media_source."""
hass: HomeAssistant
config_entry: ConfigEntry
# Unique slug used for media-source URIs
source_id: str
# Last known URL for the device, used when adding this wrapper to hass to
# try to connect before SSDP has rediscovered it, or when SSDP discovery
# fails.
location: str | None
_device_lock: asyncio.Lock # Held when connecting or disconnecting the device
_device: DmsDevice | None = None
# Only try to connect once when an ssdp:alive advertisement is received
_ssdp_connect_failed: bool = False
# Track BOOTID in SSDP advertisements for device changes
_bootid: int | None = None
def __init__(
self, hass: HomeAssistant, config_entry: ConfigEntry, source_id: str
) -> None:
"""Initialize a DMS Source."""
self.hass = hass
self.config_entry = config_entry
self.source_id = source_id
self.location = self.config_entry.data[CONF_URL]
self._device_lock = asyncio.Lock()
# Callbacks and events
async def async_added_to_hass(self) -> None:
"""Handle addition of this source."""
# Try to connect to the last known location, but don't worry if not available
if not self._device and self.location:
try:
await self.device_connect()
except UpnpError as err:
LOGGER.debug("Couldn't connect immediately: %r", err)
# Get SSDP notifications for only this device
self.config_entry.async_on_unload(
await ssdp.async_register_callback(
self.hass, self.async_ssdp_callback, {"USN": self.usn}
)
)
# async_upnp_client.SsdpListener only reports byebye once for each *UDN*
# (device name) which often is not the USN (service within the device)
# that we're interested in. So also listen for byebye advertisements for
# the UDN, which is reported in the _udn field of the combined_headers.
self.config_entry.async_on_unload(
await ssdp.async_register_callback(
self.hass,
self.async_ssdp_callback,
{"_udn": self.udn, "NTS": NotificationSubType.SSDP_BYEBYE},
)
)
async def async_will_remove_from_hass(self) -> None:
"""Handle removal of this source."""
await self.device_disconnect()
async def async_ssdp_callback(
self, info: ssdp.SsdpServiceInfo, change: ssdp.SsdpChange
) -> None:
"""Handle notification from SSDP of device state change."""
LOGGER.debug(
"SSDP %s notification of device %s at %s",
change,
info.ssdp_usn,
info.ssdp_location,
)
try:
bootid_str = info.ssdp_headers[ssdp.ATTR_SSDP_BOOTID]
bootid: int | None = int(bootid_str, 10)
except (KeyError, ValueError):
bootid = None
if change == ssdp.SsdpChange.UPDATE:
# This is an announcement that bootid is about to change
if self._bootid is not None and self._bootid == bootid:
# Store the new value (because our old value matches) so that we
# can ignore subsequent ssdp:alive messages
try:
next_bootid_str = info.ssdp_headers[ssdp.ATTR_SSDP_NEXTBOOTID]
self._bootid = int(next_bootid_str, 10)
except (KeyError, ValueError):
pass
# Nothing left to do until ssdp:alive comes through
return
if self._bootid is not None and self._bootid != bootid:
# Device has rebooted
# Maybe connection will succeed now
self._ssdp_connect_failed = False
if self._device:
# Drop existing connection and maybe reconnect
await self.device_disconnect()
self._bootid = bootid
if change == ssdp.SsdpChange.BYEBYE:
# Device is going away
if self._device:
# Disconnect from gone device
await self.device_disconnect()
# Maybe the next alive message will result in a successful connection
self._ssdp_connect_failed = False
if (
change == ssdp.SsdpChange.ALIVE
and not self._device
and not self._ssdp_connect_failed
):
assert info.ssdp_location
self.location = info.ssdp_location
try:
await self.device_connect()
except UpnpError as err:
self._ssdp_connect_failed = True
LOGGER.warning(
"Failed connecting to recently alive device at %s: %r",
self.location,
err,
)
# Device connection/disconnection
async def device_connect(self) -> None:
"""Connect to the device now that it's available."""
LOGGER.debug("Connecting to device at %s", self.location)
async with self._device_lock:
if self._device:
LOGGER.debug("Trying to connect when device already connected")
return
if not self.location:
LOGGER.debug("Not connecting because location is not known")
return
domain_data = get_domain_data(self.hass)
# Connect to the base UPNP device
upnp_device = await domain_data.upnp_factory.async_create_device(
self.location
)
# Create profile wrapper
self._device = DmsDevice(upnp_device, domain_data.event_handler)
# Update state variables. We don't care if they change, so this is
# only done once, here.
await self._device.async_update()
async def device_disconnect(self) -> None:
"""Destroy connections to the device now that it's not available.
Also call when removing this device wrapper from hass to clean up connections.
"""
async with self._device_lock:
if not self._device:
LOGGER.debug("Disconnecting from device that's not connected")
return
LOGGER.debug("Disconnecting from %s", self._device.name)
self._device = None
# Device properties
@property
def available(self) -> bool:
"""Device is available when we have a connection to it."""
return self._device is not None and self._device.profile_device.available
@property
def usn(self) -> str:
"""Get the USN (Unique Service Name) for the wrapped UPnP device end-point."""
return self.config_entry.data[CONF_DEVICE_ID]
@property
def udn(self) -> str:
"""Get the UDN (Unique Device Name) based on the USN."""
return self.usn.partition("::")[0]
@property
def name(self) -> str:
"""Return a name for the media server."""
return self.config_entry.title
@property
def icon(self) -> str | None:
"""Return an URL to an icon for the media server."""
if not self._device:
return None
return self._device.icon
# MediaSource methods
async def async_resolve_media(self, identifier: str) -> DidlPlayMedia:
"""Resolve a media item to a playable item."""
LOGGER.debug("async_resolve_media(%s)", identifier)
action, parameters = _parse_identifier(identifier)
if action is Action.OBJECT:
return await self.async_resolve_object(parameters)
if action is Action.PATH:
object_id = await self.async_resolve_path(parameters)
return await self.async_resolve_object(object_id)
if action is Action.SEARCH:
return await self.async_resolve_search(parameters)
LOGGER.debug("Invalid identifier %s", identifier)
raise Unresolvable(f"Invalid identifier {identifier}")
async def async_browse_media(self, identifier: str | None) -> BrowseMediaSource:
"""Browse media."""
LOGGER.debug("async_browse_media(%s)", identifier)
action, parameters = _parse_identifier(identifier)
if action is Action.OBJECT:
return await self.async_browse_object(parameters)
if action is Action.PATH:
object_id = await self.async_resolve_path(parameters)
return await self.async_browse_object(object_id)
if action is Action.SEARCH:
return await self.async_browse_search(parameters)
return await self.async_browse_object(ROOT_OBJECT_ID)
# DMS methods
@catch_request_errors
async def async_resolve_object(self, object_id: str) -> DidlPlayMedia:
"""Return a playable media item specified by ObjectID."""
assert self._device
item = await self._device.async_browse_metadata(
object_id, metadata_filter=DLNA_RESOLVE_FILTER
)
# Use the first playable resource
return self._didl_to_play_media(item)
@catch_request_errors
async def async_resolve_path(self, path: str) -> str:
"""Return an Object ID resolved from a path string."""
assert self._device
# Iterate through the path, searching for a matching title within the
# DLNA object hierarchy.
object_id = ROOT_OBJECT_ID
for node in path.split(PATH_SEP):
if not node:
# Skip empty names, for when multiple slashes are involved, e.g //
continue
criteria = (
f'@parentID="{_esc_quote(object_id)}" and dc:title="{_esc_quote(node)}"'
)
try:
result = await self._device.async_search_directory(
object_id,
search_criteria=criteria,
metadata_filter=DLNA_PATH_FILTER,
requested_count=1,
)
except UpnpActionError as err:
LOGGER.debug("Error in call to async_search_directory: %r", err)
if err.error_code == ContentDirectoryErrorCode.NO_SUCH_CONTAINER:
raise Unresolvable(f"No such container: {object_id}") from err
# Search failed, but can still try browsing children
else:
if result.total_matches > 1:
raise Unresolvable(f"Too many items found for {node} in {path}")
if result.result:
object_id = result.result[0].id
continue
# Nothing was found via search, fall back to iterating children
result = await self._device.async_browse_direct_children(
object_id, metadata_filter=DLNA_PATH_FILTER
)
if result.total_matches == 0 or not result.result:
raise Unresolvable(f"No contents for {node} in {path}")
node_lower = node.lower()
for child in result.result:
if child.title.lower() == node_lower:
object_id = child.id
break
else:
# Examining all direct children failed too
raise Unresolvable(f"Nothing found for {node} in {path}")
return object_id
@catch_request_errors
async def async_resolve_search(self, query: str) -> DidlPlayMedia:
"""Return first playable media item found by the query string."""
assert self._device
result = await self._device.async_search_directory(
container_id=ROOT_OBJECT_ID,
search_criteria=query,
metadata_filter=DLNA_RESOLVE_FILTER,
requested_count=1,
)
if result.total_matches == 0 or not result.result:
raise Unresolvable(f"Nothing found for {query}")
# Use the first result, even if it doesn't have a playable resource
item = result.result[0]
if not isinstance(item, didl_lite.DidlObject):
raise Unresolvable(f"{item} is not a DidlObject")
return self._didl_to_play_media(item)
@catch_request_errors
async def async_browse_object(self, object_id: str) -> BrowseMediaSource:
"""Return the contents of a DLNA container by ObjectID."""
assert self._device
base_object = await self._device.async_browse_metadata(
object_id, metadata_filter=DLNA_BROWSE_FILTER
)
children = await self._device.async_browse_direct_children(
object_id,
metadata_filter=DLNA_BROWSE_FILTER,
sort_criteria=DLNA_SORT_CRITERIA,
)
return self._didl_to_media_source(base_object, children)
@catch_request_errors
async def async_browse_search(self, query: str) -> BrowseMediaSource:
"""Return all media items found by the query string."""
assert self._device
result = await self._device.async_search_directory(
container_id=ROOT_OBJECT_ID,
search_criteria=query,
metadata_filter=DLNA_BROWSE_FILTER,
)
children = [
self._didl_to_media_source(child)
for child in result.result
if isinstance(child, didl_lite.DidlObject)
]
media_source = BrowseMediaSource(
domain=DOMAIN,
identifier=self._make_identifier(Action.SEARCH, query),
media_class=MEDIA_CLASS_DIRECTORY,
media_content_type="",
title="Search results",
can_play=False,
can_expand=True,
children=children,
)
media_source.calculate_children_class()
return media_source
def _didl_to_play_media(self, item: didl_lite.DidlObject) -> DidlPlayMedia:
"""Return the first playable resource from a DIDL-Lite object."""
assert self._device
if not item.res:
LOGGER.debug("Object %s has no resources", item.id)
raise Unresolvable("Object has no resources")
for resource in item.res:
if not resource.uri:
continue
if mime_type := _resource_mime_type(resource):
url = self._device.get_absolute_url(resource.uri)
LOGGER.debug("Resolved to url %s MIME %s", url, mime_type)
return DidlPlayMedia(url, mime_type, item)
LOGGER.debug("Object %s has no playable resources", item.id)
raise Unresolvable("Object has no playable resources")
def _didl_to_media_source(
self,
item: didl_lite.DidlObject,
browsed_children: DmsDevice.BrowseResult | None = None,
) -> BrowseMediaSource:
"""Convert a DIDL-Lite object to a browse media source."""
children: list[BrowseMediaSource] | None = None
if browsed_children:
children = [
self._didl_to_media_source(child)
for child in browsed_children.result
if isinstance(child, didl_lite.DidlObject)
]
# Can expand if it has children (even if we don't have them yet), or its
# a container type. Otherwise the front-end will try to play it (even if
# can_play is False).
try:
child_count = int(item.child_count)
except (AttributeError, TypeError, ValueError):
child_count = 0
can_expand = (
bool(children) or child_count > 0 or isinstance(item, didl_lite.Container)
)
# Can play if item has any resource that can be streamed over the network
can_play = any(_resource_is_streaming(res) for res in item.res)
# Use server name for root object, not "root"
title = self.name if item.id == ROOT_OBJECT_ID else item.title
mime_type = _resource_mime_type(item.res[0]) if item.res else None
media_content_type = mime_type or item.upnp_class
media_source = BrowseMediaSource(
domain=DOMAIN,
identifier=self._make_identifier(Action.OBJECT, item.id),
media_class=MEDIA_CLASS_MAP.get(item.upnp_class, ""),
media_content_type=media_content_type,
title=title,
can_play=can_play,
can_expand=can_expand,
children=children,
thumbnail=self._didl_thumbnail_url(item),
)
media_source.calculate_children_class()
return media_source
def _didl_thumbnail_url(self, item: didl_lite.DidlObject) -> str | None:
"""Return absolute URL of a thumbnail for a DIDL-Lite object.
Some objects have the thumbnail in albumArtURI, others in an image
resource.
"""
assert self._device
# Based on DmrDevice.media_image_url from async_upnp_client.
if album_art_uri := getattr(item, "album_art_uri", None):
return self._device.get_absolute_url(album_art_uri)
for resource in item.res:
if not resource.protocol_info or not resource.uri:
continue
if resource.protocol_info.startswith("http-get:*:image/"):
return self._device.get_absolute_url(resource.uri)
return None
def _make_identifier(self, action: Action, object_id: str) -> str:
"""Make an identifier for BrowseMediaSource."""
return f"{self.source_id}/{action}{object_id}"
class Action(StrEnum):
"""Actions that can be specified in a DMS media-source identifier."""
OBJECT = PATH_OBJECT_ID_FLAG
PATH = PATH_SEP
SEARCH = PATH_SEARCH_FLAG
def _parse_identifier(identifier: str | None) -> tuple[Action | None, str]:
"""Parse the media identifier component of a media-source URI."""
if not identifier:
return None, ""
if identifier.startswith(PATH_OBJECT_ID_FLAG):
return Action.OBJECT, identifier[1:]
if identifier.startswith(PATH_SEP):
return Action.PATH, identifier[1:]
if identifier.startswith(PATH_SEARCH_FLAG):
return Action.SEARCH, identifier[1:]
return Action.PATH, identifier
def _resource_is_streaming(resource: didl_lite.Resource) -> bool:
"""Determine if a resource can be streamed across a network."""
# Err on the side of "True" if the protocol info is not available
if not resource.protocol_info:
return True
protocol = resource.protocol_info.split(":")[0].lower()
return protocol.lower() in STREAMABLE_PROTOCOLS
def _resource_mime_type(resource: didl_lite.Resource) -> str | None:
"""Return the MIME type of a resource, if specified."""
# This is the contentFormat portion of the ProtocolInfo for an http-get stream
if not resource.protocol_info:
return None
try:
protocol, _, content_format, _ = resource.protocol_info.split(":", 3)
except ValueError:
return None
if protocol.lower() in STREAMABLE_PROTOCOLS:
return content_format
return None
def _esc_quote(contents: str) -> str:
"""Escape string contents for DLNA search quoted values.
See ContentDirectory:v4, section 4.1.2.
"""
return contents.replace("\\", "\\\\").replace('"', '\\"')

View File

@ -0,0 +1,29 @@
{
"domain": "dlna_dms",
"name": "DLNA Digital Media Server",
"config_flow": true,
"documentation": "https://www.home-assistant.io/integrations/dlna_dms",
"requirements": ["async-upnp-client==0.23.5"],
"dependencies": ["media_source", "ssdp"],
"ssdp": [
{
"deviceType": "urn:schemas-upnp-org:device:MediaServer:1",
"st": "urn:schemas-upnp-org:device:MediaServer:1"
},
{
"deviceType": "urn:schemas-upnp-org:device:MediaServer:2",
"st": "urn:schemas-upnp-org:device:MediaServer:2"
},
{
"deviceType": "urn:schemas-upnp-org:device:MediaServer:3",
"st": "urn:schemas-upnp-org:device:MediaServer:3"
},
{
"deviceType": "urn:schemas-upnp-org:device:MediaServer:4",
"st": "urn:schemas-upnp-org:device:MediaServer:4"
}
],
"codeowners": ["@chishm"],
"iot_class": "local_polling",
"quality_scale": "platinum"
}

View File

@ -0,0 +1,126 @@
"""Implementation of DLNA DMS as a media source.
URIs look like "media-source://dlna_dms/<source_id>/<media_identifier>"
Media identifiers can look like:
* `/path/to/file`: slash-separated path through the Content Directory
* `:ObjectID`: colon followed by a server-assigned ID for an object
* `?query`: question mark followed by a query string to search for,
see [DLNA ContentDirectory SearchCriteria](http://www.upnp.org/specs/av/UPnP-av-ContentDirectory-v1-Service.pdf)
for the syntax.
"""
from __future__ import annotations
from homeassistant.components.media_player.const import (
MEDIA_CLASS_CHANNEL,
MEDIA_CLASS_DIRECTORY,
MEDIA_TYPE_CHANNEL,
MEDIA_TYPE_CHANNELS,
)
from homeassistant.components.media_player.errors import BrowseError
from homeassistant.components.media_source.error import Unresolvable
from homeassistant.components.media_source.models import (
BrowseMediaSource,
MediaSource,
MediaSourceItem,
)
from homeassistant.core import HomeAssistant
from .const import DOMAIN, LOGGER, PATH_OBJECT_ID_FLAG, ROOT_OBJECT_ID, SOURCE_SEP
from .dms import DidlPlayMedia, get_domain_data
async def async_get_media_source(hass: HomeAssistant):
"""Set up DLNA DMS media source."""
LOGGER.debug("Setting up DLNA media sources")
return DmsMediaSource(hass)
class DmsMediaSource(MediaSource):
"""Provide DLNA Media Servers as media sources."""
name = "DLNA Servers"
def __init__(self, hass: HomeAssistant) -> None:
"""Initialize DLNA source."""
super().__init__(DOMAIN)
self.hass = hass
async def async_resolve_media(self, item: MediaSourceItem) -> DidlPlayMedia:
"""Resolve a media item to a playable item."""
dms_data = get_domain_data(self.hass)
if not dms_data.sources:
raise Unresolvable("No sources have been configured")
source_id, media_id = _parse_identifier(item)
if not source_id:
raise Unresolvable(f"No source ID in {item.identifier}")
if not media_id:
raise Unresolvable(f"No media ID in {item.identifier}")
try:
source = dms_data.sources[source_id]
except KeyError as err:
raise Unresolvable(f"Unknown source ID: {source_id}") from err
return await source.async_resolve_media(media_id)
async def async_browse_media(self, item: MediaSourceItem) -> BrowseMediaSource:
"""Browse media."""
dms_data = get_domain_data(self.hass)
if not dms_data.sources:
raise BrowseError("No sources have been configured")
source_id, media_id = _parse_identifier(item)
LOGGER.debug("Browsing for %s / %s", source_id, media_id)
if not source_id and len(dms_data.sources) > 1:
# Browsing the root of dlna_dms with more than one server, return
# all known servers.
base = BrowseMediaSource(
domain=DOMAIN,
identifier="",
media_class=MEDIA_CLASS_DIRECTORY,
media_content_type=MEDIA_TYPE_CHANNELS,
title=self.name,
can_play=False,
can_expand=True,
children_media_class=MEDIA_CLASS_CHANNEL,
)
base.children = [
BrowseMediaSource(
domain=DOMAIN,
identifier=f"{source_id}/{PATH_OBJECT_ID_FLAG}{ROOT_OBJECT_ID}",
media_class=MEDIA_CLASS_CHANNEL,
media_content_type=MEDIA_TYPE_CHANNEL,
title=source.name,
can_play=False,
can_expand=True,
thumbnail=source.icon,
)
for source_id, source in dms_data.sources.items()
]
return base
if not source_id:
# No source specified, default to the first registered
source_id = next(iter(dms_data.sources))
try:
source = dms_data.sources[source_id]
except KeyError as err:
raise BrowseError(f"Unknown source ID: {source_id}") from err
return await source.async_browse_media(media_id)
def _parse_identifier(item: MediaSourceItem) -> tuple[str | None, str | None]:
"""Parse the source_id and media identifier from a media source item."""
if not item.identifier:
return None, None
source_id, _, media_id = item.identifier.partition(SOURCE_SEP)
return source_id or None, media_id or None

View File

@ -0,0 +1,24 @@
{
"config": {
"flow_title": "{name}",
"step": {
"user": {
"title": "Discovered DLNA DMA devices",
"description": "Choose a device to configure",
"data": {
"host": "[%key:common::config_flow::data::host%]"
}
},
"confirm": {
"description": "[%key:common::config_flow::description::confirm_setup%]"
}
},
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]",
"already_in_progress": "[%key:common::config_flow::abort::already_in_progress%]",
"bad_ssdp": "SSDP data is missing a required value",
"no_devices_found": "[%key:common::config_flow::abort::no_devices_found%]",
"not_dms": "Device is not a supported Media Server"
}
}
}

View File

@ -0,0 +1,24 @@
{
"config": {
"abort": {
"already_configured": "Device is already configured",
"already_in_progress": "Configuration flow is already in progress",
"bad_ssdp": "SSDP data is missing a required value",
"no_devices_found": "No devices found on the network",
"not_dms": "Device is not a supported Media Server"
},
"flow_title": "{name}",
"step": {
"confirm": {
"description": "Do you want to start set up?"
},
"user": {
"data": {
"host": "Host"
},
"description": "Choose a device to configure",
"title": "Discovered DLNA DMA devices"
}
}
}
}

View File

@ -72,6 +72,7 @@ FLOWS = [
"dialogflow",
"directv",
"dlna_dmr",
"dlna_dms",
"dnsip",
"doorbird",
"dsmr",

View File

@ -97,6 +97,24 @@ SSDP = {
"st": "urn:schemas-upnp-org:device:MediaRenderer:3"
}
],
"dlna_dms": [
{
"deviceType": "urn:schemas-upnp-org:device:MediaServer:1",
"st": "urn:schemas-upnp-org:device:MediaServer:1"
},
{
"deviceType": "urn:schemas-upnp-org:device:MediaServer:2",
"st": "urn:schemas-upnp-org:device:MediaServer:2"
},
{
"deviceType": "urn:schemas-upnp-org:device:MediaServer:3",
"st": "urn:schemas-upnp-org:device:MediaServer:3"
},
{
"deviceType": "urn:schemas-upnp-org:device:MediaServer:4",
"st": "urn:schemas-upnp-org:device:MediaServer:4"
}
],
"fritz": [
{
"st": "urn:schemas-upnp-org:device:fritzbox:1"

View File

@ -341,6 +341,7 @@ asmog==0.0.6
asterisk_mbox==0.5.0
# homeassistant.components.dlna_dmr
# homeassistant.components.dlna_dms
# homeassistant.components.ssdp
# homeassistant.components.upnp
# homeassistant.components.yeelight

View File

@ -249,6 +249,7 @@ aprslib==0.7.0
arcam-fmj==0.12.0
# homeassistant.components.dlna_dmr
# homeassistant.components.dlna_dms
# homeassistant.components.ssdp
# homeassistant.components.upnp
# homeassistant.components.yeelight

View File

@ -0,0 +1 @@
"""Tests for the DLNA MediaServer integration."""

View File

@ -0,0 +1,131 @@
"""Fixtures for DLNA DMS tests."""
from __future__ import annotations
from collections.abc import AsyncGenerator, Iterable
from typing import Final
from unittest.mock import Mock, create_autospec, patch, seal
from async_upnp_client import UpnpDevice, UpnpService
from async_upnp_client.utils import absolute_url
import pytest
from homeassistant.components.dlna_dms.const import DOMAIN
from homeassistant.components.dlna_dms.dms import DlnaDmsData, get_domain_data
from homeassistant.const import CONF_DEVICE_ID, CONF_URL
from homeassistant.core import HomeAssistant
from tests.common import MockConfigEntry
MOCK_DEVICE_HOST: Final = "192.88.99.21"
MOCK_DEVICE_BASE_URL: Final = f"http://{MOCK_DEVICE_HOST}"
MOCK_DEVICE_LOCATION: Final = MOCK_DEVICE_BASE_URL + "/dms_description.xml"
MOCK_DEVICE_NAME: Final = "Test Server Device"
MOCK_DEVICE_TYPE: Final = "urn:schemas-upnp-org:device:MediaServer:1"
MOCK_DEVICE_UDN: Final = "uuid:7bf34520-f034-4fa2-8d2d-2f709d4221ef"
MOCK_DEVICE_USN: Final = f"{MOCK_DEVICE_UDN}::{MOCK_DEVICE_TYPE}"
MOCK_SOURCE_ID: Final = "test_server_device"
LOCAL_IP: Final = "192.88.99.1"
EVENT_CALLBACK_URL: Final = "http://192.88.99.1/notify"
NEW_DEVICE_LOCATION: Final = "http://192.88.99.7" + "/dmr_description.xml"
@pytest.fixture
def upnp_factory_mock() -> Iterable[Mock]:
"""Mock the UpnpFactory class to construct DMS-style UPnP devices."""
with patch(
"homeassistant.components.dlna_dms.dms.UpnpFactory",
autospec=True,
spec_set=True,
) as upnp_factory:
upnp_device = create_autospec(UpnpDevice, instance=True)
upnp_device.name = MOCK_DEVICE_NAME
upnp_device.udn = MOCK_DEVICE_UDN
upnp_device.device_url = MOCK_DEVICE_LOCATION
upnp_device.device_type = MOCK_DEVICE_TYPE
upnp_device.available = True
upnp_device.parent_device = None
upnp_device.root_device = upnp_device
upnp_device.all_devices = [upnp_device]
upnp_device.services = {
"urn:schemas-upnp-org:service:ContentDirectory:1": create_autospec(
UpnpService,
instance=True,
service_type="urn:schemas-upnp-org:service:ContentDirectory:1",
service_id="urn:upnp-org:serviceId:ContentDirectory",
),
"urn:schemas-upnp-org:service:ConnectionManager:1": create_autospec(
UpnpService,
instance=True,
service_type="urn:schemas-upnp-org:service:ConnectionManager:1",
service_id="urn:upnp-org:serviceId:ConnectionManager",
),
}
seal(upnp_device)
upnp_factory_instance = upnp_factory.return_value
upnp_factory_instance.async_create_device.return_value = upnp_device
yield upnp_factory_instance
@pytest.fixture
async def domain_data_mock(
hass: HomeAssistant, aioclient_mock, upnp_factory_mock
) -> AsyncGenerator[DlnaDmsData, None]:
"""Mock some global data used by this component.
This includes network clients and library object factories. Mocking it
prevents network use.
Yields the actual domain data, for ease of access
"""
with patch(
"homeassistant.components.dlna_dms.dms.AiohttpSessionRequester", autospec=True
):
yield get_domain_data(hass)
@pytest.fixture
def config_entry_mock() -> MockConfigEntry:
"""Mock a config entry for this platform."""
mock_entry = MockConfigEntry(
unique_id=MOCK_DEVICE_USN,
domain=DOMAIN,
data={
CONF_URL: MOCK_DEVICE_LOCATION,
CONF_DEVICE_ID: MOCK_DEVICE_USN,
},
title=MOCK_DEVICE_NAME,
)
return mock_entry
@pytest.fixture
def dms_device_mock(upnp_factory_mock: Mock) -> Iterable[Mock]:
"""Mock the async_upnp_client DMS device, initially connected."""
with patch(
"homeassistant.components.dlna_dms.dms.DmsDevice", autospec=True
) as constructor:
device = constructor.return_value
device.on_event = None
device.profile_device = upnp_factory_mock.async_create_device.return_value
device.icon = MOCK_DEVICE_BASE_URL + "/icon.jpg"
device.udn = "device_udn"
device.manufacturer = "device_manufacturer"
device.model_name = "device_model_name"
device.name = "device_name"
device.get_absolute_url.side_effect = lambda url: absolute_url(
MOCK_DEVICE_BASE_URL, url
)
yield device
@pytest.fixture(autouse=True)
def ssdp_scanner_mock() -> Iterable[Mock]:
"""Mock the SSDP module."""
with patch("homeassistant.components.ssdp.Scanner", autospec=True) as mock_scanner:
reg_callback = mock_scanner.return_value.async_register_callback
reg_callback.return_value = Mock(return_value=None)
yield mock_scanner.return_value

View File

@ -0,0 +1,346 @@
"""Test the DLNA DMS config flow."""
from __future__ import annotations
import dataclasses
from typing import Final
from unittest.mock import Mock
from async_upnp_client import UpnpError
import pytest
from homeassistant import config_entries, data_entry_flow
from homeassistant.components import ssdp
from homeassistant.components.dlna_dms.const import DOMAIN
from homeassistant.const import CONF_DEVICE_ID, CONF_HOST, CONF_URL
from homeassistant.core import HomeAssistant
from .conftest import (
MOCK_DEVICE_HOST,
MOCK_DEVICE_LOCATION,
MOCK_DEVICE_NAME,
MOCK_DEVICE_TYPE,
MOCK_DEVICE_UDN,
MOCK_DEVICE_USN,
NEW_DEVICE_LOCATION,
)
from tests.common import MockConfigEntry
# Auto-use the domain_data_mock and dms_device_mock fixtures for every test in this module
pytestmark = [
pytest.mark.usefixtures("domain_data_mock"),
pytest.mark.usefixtures("dms_device_mock"),
]
WRONG_DEVICE_TYPE: Final = "urn:schemas-upnp-org:device:InternetGatewayDevice:1"
MOCK_ROOT_DEVICE_UDN: Final = "ROOT_DEVICE"
MOCK_DISCOVERY: Final = ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN,
ssdp_location=MOCK_DEVICE_LOCATION,
ssdp_udn=MOCK_DEVICE_UDN,
ssdp_st=MOCK_DEVICE_TYPE,
upnp={
ssdp.ATTR_UPNP_UDN: MOCK_ROOT_DEVICE_UDN,
ssdp.ATTR_UPNP_DEVICE_TYPE: MOCK_DEVICE_TYPE,
ssdp.ATTR_UPNP_FRIENDLY_NAME: MOCK_DEVICE_NAME,
ssdp.ATTR_UPNP_SERVICE_LIST: {
"service": [
{
"SCPDURL": "/ContentDirectory/scpd.xml",
"controlURL": "/ContentDirectory/control.xml",
"eventSubURL": "/ContentDirectory/event.xml",
"serviceId": "urn:upnp-org:serviceId:ContentDirectory",
"serviceType": "urn:schemas-upnp-org:service:ContentDirectory:1",
},
{
"SCPDURL": "/ConnectionManager/scpd.xml",
"controlURL": "/ConnectionManager/control.xml",
"eventSubURL": "/ConnectionManager/event.xml",
"serviceId": "urn:upnp-org:serviceId:ConnectionManager",
"serviceType": "urn:schemas-upnp-org:service:ConnectionManager:1",
},
]
},
},
x_homeassistant_matching_domains={DOMAIN},
)
async def test_user_flow(hass: HomeAssistant, ssdp_scanner_mock: Mock) -> None:
"""Test user-init'd flow, user selects discovered device."""
ssdp_scanner_mock.async_get_discovery_info_by_st.side_effect = [
[MOCK_DISCOVERY],
[],
[],
[],
]
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["errors"] is None
assert result["step_id"] == "user"
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input={CONF_HOST: MOCK_DEVICE_HOST}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["title"] == MOCK_DEVICE_NAME
assert result["data"] == {
CONF_URL: MOCK_DEVICE_LOCATION,
CONF_DEVICE_ID: MOCK_DEVICE_USN,
}
assert result["options"] == {}
await hass.async_block_till_done()
async def test_user_flow_no_devices(
hass: HomeAssistant, ssdp_scanner_mock: Mock
) -> None:
"""Test user-init'd flow, there's really no devices to choose from."""
ssdp_scanner_mock.async_get_discovery_info_by_st.side_effect = [
[],
[],
[],
[],
]
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "no_devices_found"
async def test_ssdp_flow_success(hass: HomeAssistant) -> None:
"""Test that SSDP discovery with an available device works."""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_SSDP},
data=MOCK_DISCOVERY,
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == "confirm"
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input={}
)
await hass.async_block_till_done()
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["title"] == MOCK_DEVICE_NAME
assert result["data"] == {
CONF_URL: MOCK_DEVICE_LOCATION,
CONF_DEVICE_ID: MOCK_DEVICE_USN,
}
assert result["options"] == {}
async def test_ssdp_flow_unavailable(
hass: HomeAssistant, domain_data_mock: Mock
) -> None:
"""Test that SSDP discovery with an unavailable device still succeeds.
All the required information for configuration is obtained from the SSDP
message, there's no need to connect to the device to configure it.
"""
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_SSDP},
data=MOCK_DISCOVERY,
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == "confirm"
domain_data_mock.upnp_factory.async_create_device.side_effect = UpnpError
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input={}
)
await hass.async_block_till_done()
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["title"] == MOCK_DEVICE_NAME
assert result["data"] == {
CONF_URL: MOCK_DEVICE_LOCATION,
CONF_DEVICE_ID: MOCK_DEVICE_USN,
}
assert result["options"] == {}
async def test_ssdp_flow_existing(
hass: HomeAssistant, config_entry_mock: MockConfigEntry
) -> None:
"""Test that SSDP discovery of existing config entry updates the URL."""
config_entry_mock.add_to_hass(hass)
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_SSDP},
data=ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN,
ssdp_st="mock_st",
ssdp_location=NEW_DEVICE_LOCATION,
ssdp_udn=MOCK_DEVICE_UDN,
upnp={
ssdp.ATTR_UPNP_UDN: MOCK_ROOT_DEVICE_UDN,
ssdp.ATTR_UPNP_DEVICE_TYPE: MOCK_DEVICE_TYPE,
ssdp.ATTR_UPNP_FRIENDLY_NAME: MOCK_DEVICE_NAME,
},
),
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "already_configured"
assert config_entry_mock.data[CONF_URL] == NEW_DEVICE_LOCATION
async def test_ssdp_flow_duplicate_location(
hass: HomeAssistant, config_entry_mock: MockConfigEntry
) -> None:
"""Test that discovery of device with URL matching existing entry gets aborted."""
config_entry_mock.add_to_hass(hass)
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_SSDP},
data=MOCK_DISCOVERY,
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "already_configured"
assert config_entry_mock.data[CONF_URL] == MOCK_DEVICE_LOCATION
async def test_ssdp_flow_bad_data(
hass: HomeAssistant, config_entry_mock: MockConfigEntry
) -> None:
"""Test bad SSDP discovery information is rejected cleanly."""
# Missing location
discovery = dataclasses.replace(MOCK_DISCOVERY, ssdp_location="")
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_SSDP},
data=discovery,
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "bad_ssdp"
# Missing USN
discovery = dataclasses.replace(MOCK_DISCOVERY, ssdp_usn="")
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_SSDP},
data=discovery,
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "bad_ssdp"
async def test_duplicate_name(
hass: HomeAssistant, config_entry_mock: MockConfigEntry
) -> None:
"""Test device with name same as another results in no error."""
config_entry_mock.add_to_hass(hass)
mock_entry_1 = MockConfigEntry(
unique_id="mock_entry_1",
domain=DOMAIN,
data={
CONF_URL: "not-important",
CONF_DEVICE_ID: "not-important",
},
title=MOCK_DEVICE_NAME,
)
mock_entry_1.add_to_hass(hass)
# New UDN, USN, and location to be sure it's a new device
new_device_udn = "uuid:7bf34520-f034-4fa2-8d2d-2f709d422000"
new_device_usn = f"{new_device_udn}::{MOCK_DEVICE_TYPE}"
new_device_location = "http://192.88.99.22/dms_description.xml"
discovery = dataclasses.replace(
MOCK_DISCOVERY,
ssdp_usn=new_device_usn,
ssdp_location=new_device_location,
ssdp_udn=new_device_udn,
)
discovery.upnp = dict(discovery.upnp)
discovery.upnp[ssdp.ATTR_UPNP_UDN] = new_device_udn
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_SSDP},
data=discovery,
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["step_id"] == "confirm"
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input={}
)
await hass.async_block_till_done()
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["title"] == MOCK_DEVICE_NAME
assert result["data"] == {
CONF_URL: new_device_location,
CONF_DEVICE_ID: new_device_usn,
}
assert result["options"] == {}
async def test_ssdp_flow_upnp_udn(
hass: HomeAssistant, config_entry_mock: MockConfigEntry
) -> None:
"""Test that SSDP discovery ignores the root device's UDN."""
config_entry_mock.add_to_hass(hass)
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_SSDP},
data=ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN,
ssdp_location=NEW_DEVICE_LOCATION,
ssdp_udn=MOCK_DEVICE_UDN,
ssdp_st=MOCK_DEVICE_TYPE,
upnp={
ssdp.ATTR_UPNP_UDN: "DIFFERENT_ROOT_DEVICE",
ssdp.ATTR_UPNP_DEVICE_TYPE: MOCK_DEVICE_TYPE,
ssdp.ATTR_UPNP_FRIENDLY_NAME: MOCK_DEVICE_NAME,
},
),
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "already_configured"
assert config_entry_mock.data[CONF_URL] == NEW_DEVICE_LOCATION
async def test_ssdp_missing_services(hass: HomeAssistant) -> None:
"""Test SSDP ignores devices that are missing required services."""
# No services defined at all
discovery = dataclasses.replace(MOCK_DISCOVERY)
discovery.upnp = dict(discovery.upnp)
del discovery.upnp[ssdp.ATTR_UPNP_SERVICE_LIST]
result = await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_SSDP},
data=discovery,
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "not_dms"
# ContentDirectory service is missing
discovery = dataclasses.replace(MOCK_DISCOVERY)
discovery.upnp = dict(discovery.upnp)
discovery.upnp[ssdp.ATTR_UPNP_SERVICE_LIST] = {
"service": [
service
for service in discovery.upnp[ssdp.ATTR_UPNP_SERVICE_LIST]["service"]
if service.get("serviceId") != "urn:upnp-org:serviceId:ContentDirectory"
]
}
result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": config_entries.SOURCE_SSDP}, data=discovery
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "not_dms"

View File

@ -0,0 +1,705 @@
"""Test how the DmsDeviceSource handles available and unavailable devices."""
from __future__ import annotations
import asyncio
from collections.abc import AsyncIterable
import logging
from unittest.mock import ANY, DEFAULT, Mock, patch
from async_upnp_client.exceptions import UpnpConnectionError, UpnpError
from didl_lite import didl_lite
import pytest
from homeassistant.components import ssdp
from homeassistant.components.dlna_dms.const import DOMAIN
from homeassistant.components.dlna_dms.dms import DmsDeviceSource, get_domain_data
from homeassistant.components.media_player.errors import BrowseError
from homeassistant.components.media_source.error import Unresolvable
from homeassistant.core import HomeAssistant
from homeassistant.setup import async_setup_component
from .conftest import (
MOCK_DEVICE_LOCATION,
MOCK_DEVICE_NAME,
MOCK_DEVICE_TYPE,
MOCK_DEVICE_UDN,
MOCK_DEVICE_USN,
NEW_DEVICE_LOCATION,
)
from tests.common import MockConfigEntry
# Auto-use the domain_data_mock for every test in this module
pytestmark = [
pytest.mark.usefixtures("domain_data_mock"),
]
async def setup_mock_component(
hass: HomeAssistant, mock_entry: MockConfigEntry
) -> DmsDeviceSource:
"""Set up a mock DlnaDmrEntity with the given configuration."""
mock_entry.add_to_hass(hass)
assert await async_setup_component(hass, DOMAIN, {}) is True
await hass.async_block_till_done()
domain_data = get_domain_data(hass)
return next(iter(domain_data.devices.values()))
@pytest.fixture
async def connected_source_mock(
hass: HomeAssistant,
config_entry_mock: MockConfigEntry,
ssdp_scanner_mock: Mock,
dms_device_mock: Mock,
) -> AsyncIterable[DmsDeviceSource]:
"""Fixture to set up a mock DmsDeviceSource in a connected state.
Yields the entity. Cleans up the entity after the test is complete.
"""
entity = await setup_mock_component(hass, config_entry_mock)
# Check the entity has registered all needed listeners
assert len(config_entry_mock.update_listeners) == 1
assert ssdp_scanner_mock.async_register_callback.await_count == 2
assert ssdp_scanner_mock.async_register_callback.return_value.call_count == 0
# Run the test
yield entity
# Unload config entry to clean up
assert await hass.config_entries.async_remove(config_entry_mock.entry_id) == {
"require_restart": False
}
# Check entity has cleaned up its resources
assert not config_entry_mock.update_listeners
assert (
ssdp_scanner_mock.async_register_callback.await_count
== ssdp_scanner_mock.async_register_callback.return_value.call_count
)
@pytest.fixture
async def disconnected_source_mock(
hass: HomeAssistant,
upnp_factory_mock: Mock,
config_entry_mock: MockConfigEntry,
ssdp_scanner_mock: Mock,
dms_device_mock: Mock,
) -> AsyncIterable[DmsDeviceSource]:
"""Fixture to set up a mock DmsDeviceSource in a disconnected state.
Yields the entity. Cleans up the entity after the test is complete.
"""
# Cause the connection attempt to fail
upnp_factory_mock.async_create_device.side_effect = UpnpConnectionError
entity = await setup_mock_component(hass, config_entry_mock)
# Check the entity has registered all needed listeners
assert len(config_entry_mock.update_listeners) == 1
assert ssdp_scanner_mock.async_register_callback.await_count == 2
assert ssdp_scanner_mock.async_register_callback.return_value.call_count == 0
# Run the test
yield entity
# Unload config entry to clean up
assert await hass.config_entries.async_remove(config_entry_mock.entry_id) == {
"require_restart": False
}
# Check entity has cleaned up its resources
assert not config_entry_mock.update_listeners
assert (
ssdp_scanner_mock.async_register_callback.await_count
== ssdp_scanner_mock.async_register_callback.return_value.call_count
)
async def test_unavailable_device(
hass: HomeAssistant,
upnp_factory_mock: Mock,
ssdp_scanner_mock: Mock,
config_entry_mock: MockConfigEntry,
) -> None:
"""Test a DlnaDmsEntity with out a connected DmsDevice."""
# Cause connection attempts to fail
upnp_factory_mock.async_create_device.side_effect = UpnpConnectionError
with patch(
"homeassistant.components.dlna_dms.dms.DmsDevice", autospec=True
) as dms_device_constructor_mock:
connected_source_mock = await setup_mock_component(hass, config_entry_mock)
# Check device is not created
dms_device_constructor_mock.assert_not_called()
# Check attempt was made to create a device from the supplied URL
upnp_factory_mock.async_create_device.assert_awaited_once_with(MOCK_DEVICE_LOCATION)
# Check SSDP notifications are registered
ssdp_scanner_mock.async_register_callback.assert_any_call(
ANY, {"USN": MOCK_DEVICE_USN}
)
ssdp_scanner_mock.async_register_callback.assert_any_call(
ANY, {"_udn": MOCK_DEVICE_UDN, "NTS": "ssdp:byebye"}
)
# Quick check of the state to verify the entity has no connected DmsDevice
assert not connected_source_mock.available
# Check the name matches that supplied
assert connected_source_mock.name == MOCK_DEVICE_NAME
# Check attempts to browse and resolve media give errors
with pytest.raises(BrowseError):
await connected_source_mock.async_browse_media("/browse_path")
with pytest.raises(BrowseError):
await connected_source_mock.async_browse_media(":browse_object")
with pytest.raises(BrowseError):
await connected_source_mock.async_browse_media("?browse_search")
with pytest.raises(Unresolvable):
await connected_source_mock.async_resolve_media("/resolve_path")
with pytest.raises(Unresolvable):
await connected_source_mock.async_resolve_media(":resolve_object")
with pytest.raises(Unresolvable):
await connected_source_mock.async_resolve_media("?resolve_search")
# Unload config entry to clean up
assert await hass.config_entries.async_remove(config_entry_mock.entry_id) == {
"require_restart": False
}
# Confirm SSDP notifications unregistered
assert ssdp_scanner_mock.async_register_callback.return_value.call_count == 2
async def test_become_available(
hass: HomeAssistant,
upnp_factory_mock: Mock,
ssdp_scanner_mock: Mock,
config_entry_mock: MockConfigEntry,
dms_device_mock: Mock,
) -> None:
"""Test a device becoming available after the entity is constructed."""
# Cause connection attempts to fail before adding the entity
upnp_factory_mock.async_create_device.side_effect = UpnpConnectionError
connected_source_mock = await setup_mock_component(hass, config_entry_mock)
assert not connected_source_mock.available
# Mock device is now available.
upnp_factory_mock.async_create_device.side_effect = None
upnp_factory_mock.async_create_device.reset_mock()
# Send an SSDP notification from the now alive device
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0]
await ssdp_callback(
ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN,
ssdp_location=NEW_DEVICE_LOCATION,
ssdp_st=MOCK_DEVICE_TYPE,
upnp={},
),
ssdp.SsdpChange.ALIVE,
)
await hass.async_block_till_done()
# Check device was created from the supplied URL
upnp_factory_mock.async_create_device.assert_awaited_once_with(NEW_DEVICE_LOCATION)
# Quick check of the state to verify the entity has a connected DmsDevice
assert connected_source_mock.available
# Unload config entry to clean up
assert await hass.config_entries.async_remove(config_entry_mock.entry_id) == {
"require_restart": False
}
# Confirm SSDP notifications unregistered
assert ssdp_scanner_mock.async_register_callback.return_value.call_count == 2
async def test_alive_but_gone(
hass: HomeAssistant,
upnp_factory_mock: Mock,
ssdp_scanner_mock: Mock,
disconnected_source_mock: DmsDeviceSource,
) -> None:
"""Test a device sending an SSDP alive announcement, but not being connectable."""
upnp_factory_mock.async_create_device.side_effect = UpnpError
# Send an SSDP notification from the still missing device
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0]
await ssdp_callback(
ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN,
ssdp_location=NEW_DEVICE_LOCATION,
ssdp_st=MOCK_DEVICE_TYPE,
ssdp_headers={ssdp.ATTR_SSDP_BOOTID: "1"},
upnp={},
),
ssdp.SsdpChange.ALIVE,
)
await hass.async_block_till_done()
# There should be a connection attempt to the device
upnp_factory_mock.async_create_device.assert_awaited()
# Device should still be unavailable
assert not disconnected_source_mock.available
# Send the same SSDP notification, expecting no extra connection attempts
upnp_factory_mock.async_create_device.reset_mock()
await ssdp_callback(
ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN,
ssdp_location=NEW_DEVICE_LOCATION,
ssdp_st=MOCK_DEVICE_TYPE,
ssdp_headers={ssdp.ATTR_SSDP_BOOTID: "1"},
upnp={},
),
ssdp.SsdpChange.ALIVE,
)
await hass.async_block_till_done()
upnp_factory_mock.async_create_device.assert_not_called()
upnp_factory_mock.async_create_device.assert_not_awaited()
assert not disconnected_source_mock.available
# Send an SSDP notification with a new BOOTID, indicating the device has rebooted
upnp_factory_mock.async_create_device.reset_mock()
await ssdp_callback(
ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN,
ssdp_location=NEW_DEVICE_LOCATION,
ssdp_st=MOCK_DEVICE_TYPE,
ssdp_headers={ssdp.ATTR_SSDP_BOOTID: "2"},
upnp={},
),
ssdp.SsdpChange.ALIVE,
)
await hass.async_block_till_done()
# Rebooted device (seen via BOOTID) should mean a new connection attempt
upnp_factory_mock.async_create_device.assert_awaited()
assert not disconnected_source_mock.available
# Send byebye message to indicate device is going away. Next alive message
# should result in a reconnect attempt even with same BOOTID.
upnp_factory_mock.async_create_device.reset_mock()
await ssdp_callback(
ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN,
ssdp_st=MOCK_DEVICE_TYPE,
upnp={},
),
ssdp.SsdpChange.BYEBYE,
)
await ssdp_callback(
ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN,
ssdp_location=NEW_DEVICE_LOCATION,
ssdp_st=MOCK_DEVICE_TYPE,
ssdp_headers={ssdp.ATTR_SSDP_BOOTID: "2"},
upnp={},
),
ssdp.SsdpChange.ALIVE,
)
await hass.async_block_till_done()
# Rebooted device (seen via byebye/alive) should mean a new connection attempt
upnp_factory_mock.async_create_device.assert_awaited()
assert not disconnected_source_mock.available
async def test_multiple_ssdp_alive(
hass: HomeAssistant,
upnp_factory_mock: Mock,
ssdp_scanner_mock: Mock,
disconnected_source_mock: DmsDeviceSource,
) -> None:
"""Test multiple SSDP alive notifications is ok, only connects to device once."""
upnp_factory_mock.async_create_device.reset_mock()
# Contacting the device takes long enough that 2 simultaneous attempts could be made
async def create_device_delayed(_location):
"""Delay before continuing with async_create_device.
This gives a chance for parallel calls to `device_connect` to occur.
"""
await asyncio.sleep(0.1)
return DEFAULT
upnp_factory_mock.async_create_device.side_effect = create_device_delayed
# Send two SSDP notifications with the new device URL
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0]
await ssdp_callback(
ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN,
ssdp_location=NEW_DEVICE_LOCATION,
ssdp_st=MOCK_DEVICE_TYPE,
upnp={},
),
ssdp.SsdpChange.ALIVE,
)
await ssdp_callback(
ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN,
ssdp_location=NEW_DEVICE_LOCATION,
ssdp_st=MOCK_DEVICE_TYPE,
upnp={},
),
ssdp.SsdpChange.ALIVE,
)
await hass.async_block_till_done()
# Check device is contacted exactly once
upnp_factory_mock.async_create_device.assert_awaited_once_with(NEW_DEVICE_LOCATION)
# Device should be available
assert disconnected_source_mock.available
async def test_ssdp_byebye(
hass: HomeAssistant,
ssdp_scanner_mock: Mock,
connected_source_mock: DmsDeviceSource,
) -> None:
"""Test device is disconnected when byebye is received."""
# First byebye will cause a disconnect
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0]
await ssdp_callback(
ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN,
ssdp_udn=MOCK_DEVICE_UDN,
ssdp_headers={"NTS": "ssdp:byebye"},
ssdp_st=MOCK_DEVICE_TYPE,
upnp={},
),
ssdp.SsdpChange.BYEBYE,
)
# Device should be gone
assert not connected_source_mock.available
# Second byebye will do nothing
await ssdp_callback(
ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN,
ssdp_udn=MOCK_DEVICE_UDN,
ssdp_headers={"NTS": "ssdp:byebye"},
ssdp_st=MOCK_DEVICE_TYPE,
upnp={},
),
ssdp.SsdpChange.BYEBYE,
)
async def test_ssdp_update_seen_bootid(
hass: HomeAssistant,
ssdp_scanner_mock: Mock,
upnp_factory_mock: Mock,
disconnected_source_mock: DmsDeviceSource,
) -> None:
"""Test device does not reconnect when it gets ssdp:update with next bootid."""
# Start with a disconnected device
entity = disconnected_source_mock
assert not entity.available
# "Reconnect" the device
upnp_factory_mock.async_create_device.reset_mock()
upnp_factory_mock.async_create_device.side_effect = None
# Send SSDP alive with boot ID
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0]
await ssdp_callback(
ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN,
ssdp_location=MOCK_DEVICE_LOCATION,
ssdp_headers={ssdp.ATTR_SSDP_BOOTID: "1"},
ssdp_st=MOCK_DEVICE_TYPE,
upnp={},
),
ssdp.SsdpChange.ALIVE,
)
await hass.async_block_till_done()
# Device should be connected
assert entity.available
assert upnp_factory_mock.async_create_device.await_count == 1
# Send SSDP update with next boot ID
await ssdp_callback(
ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN,
ssdp_udn=MOCK_DEVICE_UDN,
ssdp_headers={
"NTS": "ssdp:update",
ssdp.ATTR_SSDP_BOOTID: "1",
ssdp.ATTR_SSDP_NEXTBOOTID: "2",
},
ssdp_st=MOCK_DEVICE_TYPE,
upnp={},
),
ssdp.SsdpChange.UPDATE,
)
await hass.async_block_till_done()
# Device was not reconnected, even with a new boot ID
assert entity.available
assert upnp_factory_mock.async_create_device.await_count == 1
# Send SSDP update with same next boot ID, again
await ssdp_callback(
ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN,
ssdp_udn=MOCK_DEVICE_UDN,
ssdp_headers={
"NTS": "ssdp:update",
ssdp.ATTR_SSDP_BOOTID: "1",
ssdp.ATTR_SSDP_NEXTBOOTID: "2",
},
ssdp_st=MOCK_DEVICE_TYPE,
upnp={},
),
ssdp.SsdpChange.UPDATE,
)
await hass.async_block_till_done()
# Nothing should change
assert entity.available
assert upnp_factory_mock.async_create_device.await_count == 1
# Send SSDP update with bad next boot ID
await ssdp_callback(
ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN,
ssdp_udn=MOCK_DEVICE_UDN,
ssdp_headers={
"NTS": "ssdp:update",
ssdp.ATTR_SSDP_BOOTID: "2",
ssdp.ATTR_SSDP_NEXTBOOTID: "7c848375-a106-4bd1-ac3c-8e50427c8e4f",
},
ssdp_st=MOCK_DEVICE_TYPE,
upnp={},
),
ssdp.SsdpChange.UPDATE,
)
await hass.async_block_till_done()
# Nothing should change
assert entity.available
assert upnp_factory_mock.async_create_device.await_count == 1
# Send a new SSDP alive with the new boot ID, device should not reconnect
await ssdp_callback(
ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN,
ssdp_location=MOCK_DEVICE_LOCATION,
ssdp_headers={ssdp.ATTR_SSDP_BOOTID: "2"},
ssdp_st=MOCK_DEVICE_TYPE,
upnp={},
),
ssdp.SsdpChange.ALIVE,
)
await hass.async_block_till_done()
assert entity.available
assert upnp_factory_mock.async_create_device.await_count == 1
async def test_ssdp_update_missed_bootid(
hass: HomeAssistant,
ssdp_scanner_mock: Mock,
upnp_factory_mock: Mock,
disconnected_source_mock: DmsDeviceSource,
) -> None:
"""Test device disconnects when it gets ssdp:update bootid it wasn't expecting."""
# Start with a disconnected device
entity = disconnected_source_mock
assert not entity.available
# "Reconnect" the device
upnp_factory_mock.async_create_device.reset_mock()
upnp_factory_mock.async_create_device.side_effect = None
# Send SSDP alive with boot ID
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0]
await ssdp_callback(
ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN,
ssdp_location=MOCK_DEVICE_LOCATION,
ssdp_headers={ssdp.ATTR_SSDP_BOOTID: "1"},
ssdp_st=MOCK_DEVICE_TYPE,
upnp={},
),
ssdp.SsdpChange.ALIVE,
)
await hass.async_block_till_done()
# Device should be connected
assert entity.available
assert upnp_factory_mock.async_create_device.await_count == 1
# Send SSDP update with skipped boot ID (not previously seen)
await ssdp_callback(
ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN,
ssdp_udn=MOCK_DEVICE_UDN,
ssdp_headers={
"NTS": "ssdp:update",
ssdp.ATTR_SSDP_BOOTID: "2",
ssdp.ATTR_SSDP_NEXTBOOTID: "3",
},
ssdp_st=MOCK_DEVICE_TYPE,
upnp={},
),
ssdp.SsdpChange.UPDATE,
)
await hass.async_block_till_done()
# Device should not *re*-connect yet
assert entity.available
assert upnp_factory_mock.async_create_device.await_count == 1
# Send a new SSDP alive with the new boot ID, device should reconnect
await ssdp_callback(
ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN,
ssdp_location=MOCK_DEVICE_LOCATION,
ssdp_headers={ssdp.ATTR_SSDP_BOOTID: "3"},
ssdp_st=MOCK_DEVICE_TYPE,
upnp={},
),
ssdp.SsdpChange.ALIVE,
)
await hass.async_block_till_done()
assert entity.available
assert upnp_factory_mock.async_create_device.await_count == 2
async def test_ssdp_bootid(
hass: HomeAssistant,
upnp_factory_mock: Mock,
ssdp_scanner_mock: Mock,
disconnected_source_mock: DmsDeviceSource,
) -> None:
"""Test an alive with a new BOOTID.UPNP.ORG header causes a reconnect."""
# Start with a disconnected device
entity = disconnected_source_mock
assert not entity.available
# "Reconnect" the device
upnp_factory_mock.async_create_device.side_effect = None
upnp_factory_mock.async_create_device.reset_mock()
# Send SSDP alive with boot ID
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0]
await ssdp_callback(
ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN,
ssdp_location=MOCK_DEVICE_LOCATION,
ssdp_headers={ssdp.ATTR_SSDP_BOOTID: "1"},
ssdp_st=MOCK_DEVICE_TYPE,
upnp={},
),
ssdp.SsdpChange.ALIVE,
)
await hass.async_block_till_done()
assert entity.available
assert upnp_factory_mock.async_create_device.await_count == 1
# Send SSDP alive with same boot ID, nothing should happen
await ssdp_callback(
ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN,
ssdp_location=MOCK_DEVICE_LOCATION,
ssdp_headers={ssdp.ATTR_SSDP_BOOTID: "1"},
ssdp_st=MOCK_DEVICE_TYPE,
upnp={},
),
ssdp.SsdpChange.ALIVE,
)
await hass.async_block_till_done()
assert entity.available
assert upnp_factory_mock.async_create_device.await_count == 1
# Send a new SSDP alive with an incremented boot ID, device should be dis/reconnected
await ssdp_callback(
ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN,
ssdp_location=MOCK_DEVICE_LOCATION,
ssdp_headers={ssdp.ATTR_SSDP_BOOTID: "2"},
ssdp_st=MOCK_DEVICE_TYPE,
upnp={},
),
ssdp.SsdpChange.ALIVE,
)
await hass.async_block_till_done()
assert entity.available
assert upnp_factory_mock.async_create_device.await_count == 2
async def test_repeated_connect(
caplog: pytest.LogCaptureFixture,
connected_source_mock: DmsDeviceSource,
upnp_factory_mock: Mock,
) -> None:
"""Test trying to connect an already connected device is safely ignored."""
upnp_factory_mock.async_create_device.reset_mock()
# Calling internal function directly to skip trying to time 2 SSDP messages carefully
with caplog.at_level(logging.DEBUG):
await connected_source_mock.device_connect()
assert (
"Trying to connect when device already connected" == caplog.records[-1].message
)
assert not upnp_factory_mock.async_create_device.await_count
async def test_connect_no_location(
caplog: pytest.LogCaptureFixture,
disconnected_source_mock: DmsDeviceSource,
upnp_factory_mock: Mock,
) -> None:
"""Test trying to connect without a location is safely ignored."""
disconnected_source_mock.location = ""
upnp_factory_mock.async_create_device.reset_mock()
# Calling internal function directly to skip trying to time 2 SSDP messages carefully
with caplog.at_level(logging.DEBUG):
await disconnected_source_mock.device_connect()
assert "Not connecting because location is not known" == caplog.records[-1].message
assert not upnp_factory_mock.async_create_device.await_count
async def test_become_unavailable(
hass: HomeAssistant,
connected_source_mock: DmsDeviceSource,
dms_device_mock: Mock,
) -> None:
"""Test a device becoming unavailable."""
# Mock a good resolve result
dms_device_mock.async_browse_metadata.return_value = didl_lite.Item(
id="object_id",
restricted=False,
title="Object",
res=[didl_lite.Resource(uri="foo", protocol_info="http-get:*:audio/mpeg:")],
)
# Check async_resolve_object currently works
await connected_source_mock.async_resolve_media(":object_id")
# Now break the network connection
dms_device_mock.async_browse_metadata.side_effect = UpnpConnectionError
# The device should be considered available until next contacted
assert connected_source_mock.available
# async_resolve_object should fail
with pytest.raises(Unresolvable):
await connected_source_mock.async_resolve_media(":object_id")
# The device should now be unavailable
assert not connected_source_mock.available

View File

@ -0,0 +1,878 @@
"""Test the interface methods of DmsDeviceSource, except availability."""
from collections.abc import AsyncIterable
from typing import Final, Union
from unittest.mock import ANY, Mock, call
from async_upnp_client.exceptions import UpnpActionError, UpnpConnectionError, UpnpError
from async_upnp_client.profiles.dlna import ContentDirectoryErrorCode, DmsDevice
from didl_lite import didl_lite
import pytest
from homeassistant.components.dlna_dms.const import DOMAIN
from homeassistant.components.dlna_dms.dms import (
ActionError,
DeviceConnectionError,
DlnaDmsData,
DmsDeviceSource,
)
from homeassistant.components.media_player.errors import BrowseError
from homeassistant.components.media_source.error import Unresolvable
from homeassistant.components.media_source.models import BrowseMediaSource
from homeassistant.const import CONF_DEVICE_ID, CONF_URL
from homeassistant.core import HomeAssistant
from .conftest import (
MOCK_DEVICE_BASE_URL,
MOCK_DEVICE_NAME,
MOCK_DEVICE_TYPE,
MOCK_DEVICE_USN,
MOCK_SOURCE_ID,
)
from tests.common import MockConfigEntry
BrowseResultList = list[Union[didl_lite.DidlObject, didl_lite.Descriptor]]
@pytest.fixture
async def device_source_mock(
hass: HomeAssistant,
config_entry_mock: MockConfigEntry,
ssdp_scanner_mock: Mock,
dms_device_mock: Mock,
domain_data_mock: DlnaDmsData,
) -> AsyncIterable[DmsDeviceSource]:
"""Fixture to set up a DmsDeviceSource in a connected state and cleanup at completion."""
await hass.config_entries.async_add(config_entry_mock)
await hass.async_block_till_done()
mock_entity = domain_data_mock.devices[MOCK_DEVICE_USN]
# Check the DmsDeviceSource has registered all needed listeners
assert len(config_entry_mock.update_listeners) == 1
assert ssdp_scanner_mock.async_register_callback.await_count == 2
assert ssdp_scanner_mock.async_register_callback.return_value.call_count == 0
# Run the test
yield mock_entity
# Unload config entry to clean up
assert await hass.config_entries.async_remove(config_entry_mock.entry_id) == {
"require_restart": False
}
# Check DmsDeviceSource has cleaned up its resources
assert not config_entry_mock.update_listeners
assert (
ssdp_scanner_mock.async_register_callback.await_count
== ssdp_scanner_mock.async_register_callback.return_value.call_count
)
assert MOCK_DEVICE_USN not in domain_data_mock.devices
assert MOCK_SOURCE_ID not in domain_data_mock.sources
async def test_update_source_id(
hass: HomeAssistant,
config_entry_mock: MockConfigEntry,
device_source_mock: DmsDeviceSource,
domain_data_mock: DlnaDmsData,
) -> None:
"""Test the config listener updates the source_id and source list upon title change."""
new_title: Final = "New Name"
new_source_id: Final = "new_name"
assert domain_data_mock.sources.keys() == {MOCK_SOURCE_ID}
hass.config_entries.async_update_entry(config_entry_mock, title=new_title)
await hass.async_block_till_done()
assert device_source_mock.source_id == new_source_id
assert domain_data_mock.sources.keys() == {new_source_id}
async def test_update_existing_source_id(
caplog: pytest.LogCaptureFixture,
hass: HomeAssistant,
config_entry_mock: MockConfigEntry,
device_source_mock: DmsDeviceSource,
domain_data_mock: DlnaDmsData,
) -> None:
"""Test the config listener gracefully handles colliding source_id."""
new_title: Final = "New Name"
new_source_id: Final = "new_name"
new_source_id_2: Final = "new_name_1"
# Set up another config entry to collide with the new source_id
colliding_entry = MockConfigEntry(
unique_id=f"different-udn::{MOCK_DEVICE_TYPE}",
domain=DOMAIN,
data={
CONF_URL: "http://192.88.99.22/dms_description.xml",
CONF_DEVICE_ID: f"different-udn::{MOCK_DEVICE_TYPE}",
},
title=new_title,
)
await hass.config_entries.async_add(colliding_entry)
await hass.async_block_till_done()
assert device_source_mock.source_id == MOCK_SOURCE_ID
assert domain_data_mock.sources.keys() == {MOCK_SOURCE_ID, new_source_id}
assert domain_data_mock.sources[MOCK_SOURCE_ID] is device_source_mock
# Update the existing entry to match the other entry's name
hass.config_entries.async_update_entry(config_entry_mock, title=new_title)
await hass.async_block_till_done()
# The existing device's source ID should be a newly generated slug
assert device_source_mock.source_id == new_source_id_2
assert domain_data_mock.sources.keys() == {new_source_id, new_source_id_2}
assert domain_data_mock.sources[new_source_id_2] is device_source_mock
# Changing back to the old name should not cause issues
hass.config_entries.async_update_entry(config_entry_mock, title=MOCK_DEVICE_NAME)
await hass.async_block_till_done()
assert device_source_mock.source_id == MOCK_SOURCE_ID
assert domain_data_mock.sources.keys() == {MOCK_SOURCE_ID, new_source_id}
assert domain_data_mock.sources[MOCK_SOURCE_ID] is device_source_mock
# Remove the collision and try again
await hass.config_entries.async_remove(colliding_entry.entry_id)
assert domain_data_mock.sources.keys() == {MOCK_SOURCE_ID}
hass.config_entries.async_update_entry(config_entry_mock, title=new_title)
await hass.async_block_till_done()
assert device_source_mock.source_id == new_source_id
assert domain_data_mock.sources.keys() == {new_source_id}
async def test_catch_request_error_unavailable(
device_source_mock: DmsDeviceSource,
) -> None:
"""Test the device is checked for availability before trying requests."""
device_source_mock._device = None
with pytest.raises(DeviceConnectionError):
await device_source_mock.async_resolve_object("id")
with pytest.raises(DeviceConnectionError):
await device_source_mock.async_resolve_path("path")
with pytest.raises(DeviceConnectionError):
await device_source_mock.async_resolve_search("query")
with pytest.raises(DeviceConnectionError):
await device_source_mock.async_browse_object("object_id")
with pytest.raises(DeviceConnectionError):
await device_source_mock.async_browse_search("query")
async def test_catch_request_error(
device_source_mock: DmsDeviceSource, dms_device_mock: Mock
) -> None:
"""Test errors when making requests to the device are handled."""
dms_device_mock.async_browse_metadata.side_effect = UpnpActionError(
error_code=ContentDirectoryErrorCode.NO_SUCH_OBJECT
)
with pytest.raises(ActionError, match="No such object: bad_id"):
await device_source_mock.async_resolve_media(":bad_id")
dms_device_mock.async_search_directory.side_effect = UpnpActionError(
error_code=ContentDirectoryErrorCode.INVALID_SEARCH_CRITERIA
)
with pytest.raises(ActionError, match="Invalid query: bad query"):
await device_source_mock.async_resolve_media("?bad query")
dms_device_mock.async_browse_metadata.side_effect = UpnpActionError(
error_code=ContentDirectoryErrorCode.CANNOT_PROCESS_REQUEST
)
with pytest.raises(DeviceConnectionError, match="Server failure: "):
await device_source_mock.async_resolve_media(":good_id")
dms_device_mock.async_browse_metadata.side_effect = UpnpError
with pytest.raises(
DeviceConnectionError, match="Server communication failure: UpnpError(.*)"
):
await device_source_mock.async_resolve_media(":bad_id")
# UpnpConnectionErrors will cause the device_source_mock to disconnect from the device
assert device_source_mock.available
dms_device_mock.async_browse_metadata.side_effect = UpnpConnectionError
with pytest.raises(
DeviceConnectionError, match="Server disconnected: UpnpConnectionError(.*)"
):
await device_source_mock.async_resolve_media(":bad_id")
assert not device_source_mock.available
async def test_icon(device_source_mock: DmsDeviceSource, dms_device_mock: Mock) -> None:
"""Test the device's icon URL is returned."""
assert device_source_mock.icon == dms_device_mock.icon
device_source_mock._device = None
assert device_source_mock.icon is None
async def test_resolve_media_invalid(device_source_mock: DmsDeviceSource) -> None:
"""Test async_resolve_media will raise Unresolvable when an identifier isn't supplied."""
with pytest.raises(Unresolvable, match="Invalid identifier.*"):
await device_source_mock.async_resolve_media("")
async def test_resolve_media_object(
device_source_mock: DmsDeviceSource, dms_device_mock: Mock
) -> None:
"""Test the async_resolve_object method via async_resolve_media."""
object_id: Final = "123"
res_url: Final = "foo/bar"
res_abs_url: Final = f"{MOCK_DEVICE_BASE_URL}/{res_url}"
res_mime: Final = "audio/mpeg"
# Success case: one resource
didl_item = didl_lite.Item(
id=object_id,
restricted="false",
title="Object",
res=[didl_lite.Resource(uri=res_url, protocol_info=f"http-get:*:{res_mime}:")],
)
dms_device_mock.async_browse_metadata.return_value = didl_item
result = await device_source_mock.async_resolve_media(f":{object_id}")
dms_device_mock.async_browse_metadata.assert_awaited_once_with(
object_id, metadata_filter="*"
)
assert result.url == res_abs_url
assert result.mime_type == res_mime
assert result.didl_metadata is didl_item
# Success case: two resources, first is playable
didl_item = didl_lite.Item(
id=object_id,
restricted="false",
title="Object",
res=[
didl_lite.Resource(uri=res_url, protocol_info=f"http-get:*:{res_mime}:"),
didl_lite.Resource(
uri="thumbnail.png", protocol_info="http-get:*:image/png:"
),
],
)
dms_device_mock.async_browse_metadata.return_value = didl_item
result = await device_source_mock.async_resolve_media(f":{object_id}")
assert result.url == res_abs_url
assert result.mime_type == res_mime
assert result.didl_metadata is didl_item
# Success case: three resources, only third is playable
didl_item = didl_lite.Item(
id=object_id,
restricted="false",
title="Object",
res=[
didl_lite.Resource(uri="", protocol_info=""),
didl_lite.Resource(uri="internal:thing", protocol_info="internal:*::"),
didl_lite.Resource(uri=res_url, protocol_info=f"http-get:*:{res_mime}:"),
],
)
dms_device_mock.async_browse_metadata.return_value = didl_item
result = await device_source_mock.async_resolve_media(f":{object_id}")
assert result.url == res_abs_url
assert result.mime_type == res_mime
assert result.didl_metadata is didl_item
# Failure case: no resources
didl_item = didl_lite.Item(
id=object_id,
restricted="false",
title="Object",
res=[],
)
dms_device_mock.async_browse_metadata.return_value = didl_item
with pytest.raises(Unresolvable, match="Object has no resources"):
await device_source_mock.async_resolve_media(f":{object_id}")
# Failure case: resources are not playable
didl_item = didl_lite.Item(
id=object_id,
restricted="false",
title="Object",
res=[didl_lite.Resource(uri="internal:thing", protocol_info="internal:*::")],
)
dms_device_mock.async_browse_metadata.return_value = didl_item
with pytest.raises(Unresolvable, match="Object has no playable resources"):
await device_source_mock.async_resolve_media(f":{object_id}")
async def test_resolve_media_path(
device_source_mock: DmsDeviceSource, dms_device_mock: Mock
) -> None:
"""Test the async_resolve_path method via async_resolve_media."""
path: Final = "path/to/thing"
object_ids: Final = ["path_id", "to_id", "thing_id"]
res_url: Final = "foo/bar"
res_abs_url: Final = f"{MOCK_DEVICE_BASE_URL}/{res_url}"
res_mime: Final = "audio/mpeg"
search_directory_result = []
for ob_id, ob_title in zip(object_ids, path.split("/")):
didl_item = didl_lite.Item(
id=ob_id,
restricted="false",
title=ob_title,
res=[],
)
search_directory_result.append(DmsDevice.BrowseResult([didl_item], 1, 1, 0))
# Test that path is resolved correctly
dms_device_mock.async_search_directory.side_effect = search_directory_result
dms_device_mock.async_browse_metadata.return_value = didl_lite.Item(
id=object_ids[-1],
restricted="false",
title="thing",
res=[didl_lite.Resource(uri=res_url, protocol_info=f"http-get:*:{res_mime}:")],
)
result = await device_source_mock.async_resolve_media(f"/{path}")
assert dms_device_mock.async_search_directory.await_args_list == [
call(
parent_id,
search_criteria=f'@parentID="{parent_id}" and dc:title="{title}"',
metadata_filter=["id", "upnp:class", "dc:title"],
requested_count=1,
)
for parent_id, title in zip(["0"] + object_ids[:-1], path.split("/"))
]
assert result.url == res_abs_url
assert result.mime_type == res_mime
# Test a path starting with a / (first / is path action, second / is root of path)
dms_device_mock.async_search_directory.reset_mock()
dms_device_mock.async_search_directory.side_effect = search_directory_result
result = await device_source_mock.async_resolve_media(f"//{path}")
assert dms_device_mock.async_search_directory.await_args_list == [
call(
parent_id,
search_criteria=f'@parentID="{parent_id}" and dc:title="{title}"',
metadata_filter=["id", "upnp:class", "dc:title"],
requested_count=1,
)
for parent_id, title in zip(["0"] + object_ids[:-1], path.split("/"))
]
assert result.url == res_abs_url
assert result.mime_type == res_mime
async def test_resolve_path_simple(
device_source_mock: DmsDeviceSource, dms_device_mock: Mock
) -> None:
"""Test async_resolve_path for simple success as for test_resolve_media_path."""
path: Final = "path/to/thing"
object_ids: Final = ["path_id", "to_id", "thing_id"]
search_directory_result = []
for ob_id, ob_title in zip(object_ids, path.split("/")):
didl_item = didl_lite.Item(
id=ob_id,
restricted="false",
title=ob_title,
res=[],
)
search_directory_result.append(DmsDevice.BrowseResult([didl_item], 1, 1, 0))
dms_device_mock.async_search_directory.side_effect = search_directory_result
result = await device_source_mock.async_resolve_path(path)
assert dms_device_mock.async_search_directory.call_args_list == [
call(
parent_id,
search_criteria=f'@parentID="{parent_id}" and dc:title="{title}"',
metadata_filter=["id", "upnp:class", "dc:title"],
requested_count=1,
)
for parent_id, title in zip(["0"] + object_ids[:-1], path.split("/"))
]
assert result == object_ids[-1]
assert not dms_device_mock.async_browse_direct_children.await_count
async def test_resolve_path_browsed(
device_source_mock: DmsDeviceSource, dms_device_mock: Mock
) -> None:
"""Test async_resolve_path: action error results in browsing."""
path: Final = "path/to/thing"
object_ids: Final = ["path_id", "to_id", "thing_id"]
search_directory_result = []
for ob_id, ob_title in zip(object_ids, path.split("/")):
didl_item = didl_lite.Item(
id=ob_id,
restricted="false",
title=ob_title,
res=[],
)
search_directory_result.append(DmsDevice.BrowseResult([didl_item], 1, 1, 0))
dms_device_mock.async_search_directory.side_effect = [
search_directory_result[0],
# 2nd level can't be searched (this happens with Kodi)
UpnpActionError(),
search_directory_result[2],
]
browse_children_result: BrowseResultList = []
for title in ("Irrelevant", "to", "Ignored"):
browse_children_result.append(
didl_lite.Item(id=f"{title}_id", restricted="false", title=title, res=[])
)
dms_device_mock.async_browse_direct_children.side_effect = [
DmsDevice.BrowseResult(browse_children_result, 3, 3, 0)
]
result = await device_source_mock.async_resolve_path(path)
# All levels should have an attempted search
assert dms_device_mock.async_search_directory.await_args_list == [
call(
parent_id,
search_criteria=f'@parentID="{parent_id}" and dc:title="{title}"',
metadata_filter=["id", "upnp:class", "dc:title"],
requested_count=1,
)
for parent_id, title in zip(["0"] + object_ids[:-1], path.split("/"))
]
assert result == object_ids[-1]
# 2nd level should also be browsed
assert dms_device_mock.async_browse_direct_children.await_args_list == [
call("path_id", metadata_filter=["id", "upnp:class", "dc:title"])
]
async def test_resolve_path_browsed_nothing(
device_source_mock: DmsDeviceSource, dms_device_mock: Mock
) -> None:
"""Test async_resolve_path: action error results in browsing, but nothing found."""
dms_device_mock.async_search_directory.side_effect = UpnpActionError()
# No children
dms_device_mock.async_browse_direct_children.side_effect = [
DmsDevice.BrowseResult([], 0, 0, 0)
]
with pytest.raises(Unresolvable, match="No contents for thing in thing/other"):
await device_source_mock.async_resolve_path(r"thing/other")
# There are children, but they don't match
dms_device_mock.async_browse_direct_children.side_effect = [
DmsDevice.BrowseResult(
[
didl_lite.Item(
id="nothingid", restricted="false", title="not thing", res=[]
)
],
1,
1,
0,
)
]
with pytest.raises(Unresolvable, match="Nothing found for thing in thing/other"):
await device_source_mock.async_resolve_path(r"thing/other")
async def test_resolve_path_quoted(
device_source_mock: DmsDeviceSource, dms_device_mock: Mock
) -> None:
"""Test async_resolve_path: quotes and backslashes in the path get escaped correctly."""
dms_device_mock.async_search_directory.side_effect = [
DmsDevice.BrowseResult(
[
didl_lite.Item(
id=r'id_with quote" and back\slash',
restricted="false",
title="path",
res=[],
)
],
1,
1,
0,
),
UpnpError("Quick abort"),
]
with pytest.raises(DeviceConnectionError):
await device_source_mock.async_resolve_path(r'path/quote"back\slash')
assert dms_device_mock.async_search_directory.await_args_list == [
call(
"0",
search_criteria='@parentID="0" and dc:title="path"',
metadata_filter=["id", "upnp:class", "dc:title"],
requested_count=1,
),
call(
r'id_with quote" and back\slash',
search_criteria=r'@parentID="id_with quote\" and back\\slash" and dc:title="quote\"back\\slash"',
metadata_filter=["id", "upnp:class", "dc:title"],
requested_count=1,
),
]
async def test_resolve_path_ambiguous(
device_source_mock: DmsDeviceSource, dms_device_mock: Mock
) -> None:
"""Test async_resolve_path: ambiguous results (too many matches) gives error."""
dms_device_mock.async_search_directory.side_effect = [
DmsDevice.BrowseResult(
[
didl_lite.Item(
id=r"thing 1",
restricted="false",
title="thing",
res=[],
),
didl_lite.Item(
id=r"thing 2",
restricted="false",
title="thing",
res=[],
),
],
2,
2,
0,
)
]
with pytest.raises(
Unresolvable, match="Too many items found for thing in thing/other"
):
await device_source_mock.async_resolve_path(r"thing/other")
async def test_resolve_path_no_such_container(
device_source_mock: DmsDeviceSource, dms_device_mock: Mock
) -> None:
"""Test async_resolve_path: Explicit check for NO_SUCH_CONTAINER."""
dms_device_mock.async_search_directory.side_effect = UpnpActionError(
error_code=ContentDirectoryErrorCode.NO_SUCH_CONTAINER
)
with pytest.raises(Unresolvable, match="No such container: 0"):
await device_source_mock.async_resolve_path(r"thing/other")
async def test_resolve_media_search(
device_source_mock: DmsDeviceSource, dms_device_mock: Mock
) -> None:
"""Test the async_resolve_search method via async_resolve_media."""
res_url: Final = "foo/bar"
res_abs_url: Final = f"{MOCK_DEVICE_BASE_URL}/{res_url}"
res_mime: Final = "audio/mpeg"
# No results
dms_device_mock.async_search_directory.return_value = DmsDevice.BrowseResult(
[], 0, 0, 0
)
with pytest.raises(Unresolvable, match='Nothing found for dc:title="thing"'):
await device_source_mock.async_resolve_media('?dc:title="thing"')
assert dms_device_mock.async_search_directory.await_args_list == [
call(
container_id="0",
search_criteria='dc:title="thing"',
metadata_filter="*",
requested_count=1,
)
]
# One result
dms_device_mock.async_search_directory.reset_mock()
didl_item = didl_lite.Item(
id="thing's id",
restricted="false",
title="thing",
res=[didl_lite.Resource(uri=res_url, protocol_info=f"http-get:*:{res_mime}:")],
)
dms_device_mock.async_search_directory.return_value = DmsDevice.BrowseResult(
[didl_item], 1, 1, 0
)
result = await device_source_mock.async_resolve_media('?dc:title="thing"')
assert result.url == res_abs_url
assert result.mime_type == res_mime
assert result.didl_metadata is didl_item
assert dms_device_mock.async_search_directory.await_count == 1
# Values should be taken from search result, not querying the item's metadata
assert dms_device_mock.async_browse_metadata.await_count == 0
# Two results - uses the first
dms_device_mock.async_search_directory.return_value = DmsDevice.BrowseResult(
[didl_item], 1, 2, 0
)
result = await device_source_mock.async_resolve_media('?dc:title="thing"')
assert result.url == res_abs_url
assert result.mime_type == res_mime
assert result.didl_metadata is didl_item
# Bad result
dms_device_mock.async_search_directory.return_value = DmsDevice.BrowseResult(
[didl_lite.Descriptor("id", "namespace")], 1, 1, 0
)
with pytest.raises(Unresolvable, match="Descriptor.* is not a DidlObject"):
await device_source_mock.async_resolve_media('?dc:title="thing"')
async def test_browse_media_root(
device_source_mock: DmsDeviceSource, dms_device_mock: Mock
) -> None:
"""Test async_browse_media with no identifier will browse the root of the device."""
dms_device_mock.async_browse_metadata.return_value = didl_lite.DidlObject(
id="0", restricted="false", title="root"
)
dms_device_mock.async_browse_direct_children.return_value = DmsDevice.BrowseResult(
[], 0, 0, 0
)
# No identifier (first opened in media browser)
result = await device_source_mock.async_browse_media(None)
assert result.identifier == f"{MOCK_SOURCE_ID}/:0"
assert result.title == MOCK_DEVICE_NAME
dms_device_mock.async_browse_metadata.assert_awaited_once_with(
"0", metadata_filter=ANY
)
dms_device_mock.async_browse_direct_children.assert_awaited_once_with(
"0", metadata_filter=ANY, sort_criteria=ANY
)
dms_device_mock.async_browse_metadata.reset_mock()
dms_device_mock.async_browse_direct_children.reset_mock()
# Empty string identifier
result = await device_source_mock.async_browse_media("")
assert result.identifier == f"{MOCK_SOURCE_ID}/:0"
assert result.title == MOCK_DEVICE_NAME
dms_device_mock.async_browse_metadata.assert_awaited_once_with(
"0", metadata_filter=ANY
)
dms_device_mock.async_browse_direct_children.assert_awaited_once_with(
"0", metadata_filter=ANY, sort_criteria=ANY
)
async def test_browse_media_object(
device_source_mock: DmsDeviceSource, dms_device_mock: Mock
) -> None:
"""Test async_browse_object via async_browse_media."""
object_id = "1234"
child_titles = ("Item 1", "Thing", "Item 2")
dms_device_mock.async_browse_metadata.return_value = didl_lite.Container(
id=object_id, restricted="false", title="subcontainer"
)
children_result = DmsDevice.BrowseResult([], 3, 3, 0)
for title in child_titles:
children_result.result.append(
didl_lite.Item(
id=title + "_id",
restricted="false",
title=title,
res=[
didl_lite.Resource(
uri=title + "_url", protocol_info="http-get:*:audio/mpeg:"
)
],
),
)
dms_device_mock.async_browse_direct_children.return_value = children_result
result = await device_source_mock.async_browse_media(f":{object_id}")
dms_device_mock.async_browse_metadata.assert_awaited_once_with(
object_id, metadata_filter=ANY
)
dms_device_mock.async_browse_direct_children.assert_awaited_once_with(
object_id, metadata_filter=ANY, sort_criteria=ANY
)
assert result.domain == DOMAIN
assert result.identifier == f"{MOCK_SOURCE_ID}/:{object_id}"
assert result.title == "subcontainer"
assert not result.can_play
assert result.can_expand
assert result.children
for child, title in zip(result.children, child_titles):
assert isinstance(child, BrowseMediaSource)
assert child.identifier == f"{MOCK_SOURCE_ID}/:{title}_id"
assert child.title == title
assert child.can_play
assert not child.can_expand
assert not child.children
async def test_browse_media_path(
device_source_mock: DmsDeviceSource, dms_device_mock: Mock
) -> None:
"""Test async_browse_media with a path."""
title = "folder"
con_id = "123"
container = didl_lite.Container(id=con_id, restricted="false", title=title)
dms_device_mock.async_search_directory.return_value = DmsDevice.BrowseResult(
[container], 1, 1, 0
)
dms_device_mock.async_browse_metadata.return_value = container
dms_device_mock.async_browse_direct_children.return_value = DmsDevice.BrowseResult(
[], 0, 0, 0
)
result = await device_source_mock.async_browse_media(f"{title}")
assert result.identifier == f"{MOCK_SOURCE_ID}/:{con_id}"
assert result.title == title
dms_device_mock.async_search_directory.assert_awaited_once_with(
"0",
search_criteria=f'@parentID="0" and dc:title="{title}"',
metadata_filter=ANY,
requested_count=1,
)
dms_device_mock.async_browse_metadata.assert_awaited_once_with(
con_id, metadata_filter=ANY
)
dms_device_mock.async_browse_direct_children.assert_awaited_once_with(
con_id, metadata_filter=ANY, sort_criteria=ANY
)
async def test_browse_media_search(
device_source_mock: DmsDeviceSource, dms_device_mock: Mock
) -> None:
"""Test async_browse_media with a search query."""
query = 'dc:title contains "FooBar"'
object_details = (("111", "FooBar baz"), ("432", "Not FooBar"), ("99", "FooBar"))
dms_device_mock.async_search_directory.return_value = DmsDevice.BrowseResult(
[
didl_lite.DidlObject(id=ob_id, restricted="false", title=title)
for ob_id, title in object_details
],
3,
3,
0,
)
# Test that descriptors are skipped
dms_device_mock.async_search_directory.return_value.result.insert(
1, didl_lite.Descriptor("id", "name_space")
)
result = await device_source_mock.async_browse_media(f"?{query}")
assert result.identifier == f"{MOCK_SOURCE_ID}/?{query}"
assert result.title == "Search results"
assert result.children
for obj, child in zip(object_details, result.children):
assert isinstance(child, BrowseMediaSource)
assert child.identifier == f"{MOCK_SOURCE_ID}/:{obj[0]}"
assert child.title == obj[1]
assert not child.children
async def test_browse_search_invalid(
device_source_mock: DmsDeviceSource, dms_device_mock: Mock
) -> None:
"""Test searching with an invalid query gives a BrowseError."""
query = "title == FooBar"
dms_device_mock.async_search_directory.side_effect = UpnpActionError(
error_code=ContentDirectoryErrorCode.INVALID_SEARCH_CRITERIA
)
with pytest.raises(BrowseError, match=f"Invalid query: {query}"):
await device_source_mock.async_browse_media(f"?{query}")
async def test_browse_search_no_results(
device_source_mock: DmsDeviceSource, dms_device_mock: Mock
) -> None:
"""Test a search with no results does not give an error."""
query = 'dc:title contains "FooBar"'
dms_device_mock.async_search_directory.return_value = DmsDevice.BrowseResult(
[], 0, 0, 0
)
result = await device_source_mock.async_browse_media(f"?{query}")
assert result.identifier == f"{MOCK_SOURCE_ID}/?{query}"
assert result.title == "Search results"
assert not result.children
async def test_thumbnail(
device_source_mock: DmsDeviceSource, dms_device_mock: Mock
) -> None:
"""Test getting thumbnails URLs for items."""
# Use browse_search to get multiple items at once for least effort
dms_device_mock.async_search_directory.return_value = DmsDevice.BrowseResult(
[
# Thumbnail as albumArtURI property
didl_lite.MusicAlbum(
id="a",
restricted="false",
title="a",
res=[],
album_art_uri="a_thumb.jpg",
),
# Thumbnail as resource (1st resource is media item, 2nd is missing
# a URI, 3rd is thumbnail)
didl_lite.MusicTrack(
id="b",
restricted="false",
title="b",
res=[
didl_lite.Resource(
uri="b_track.mp3", protocol_info="http-get:*:audio/mpeg:"
),
didl_lite.Resource(uri="", protocol_info="internal:*::"),
didl_lite.Resource(
uri="b_thumb.png", protocol_info="http-get:*:image/png:"
),
],
),
# No thumbnail
didl_lite.MusicTrack(
id="c",
restricted="false",
title="c",
res=[
didl_lite.Resource(
uri="c_track.mp3", protocol_info="http-get:*:audio/mpeg:"
)
],
),
],
3,
3,
0,
)
result = await device_source_mock.async_browse_media("?query")
assert result.children
assert result.children[0].thumbnail == f"{MOCK_DEVICE_BASE_URL}/a_thumb.jpg"
assert result.children[1].thumbnail == f"{MOCK_DEVICE_BASE_URL}/b_thumb.png"
assert result.children[2].thumbnail is None
async def test_can_play(
device_source_mock: DmsDeviceSource, dms_device_mock: Mock
) -> None:
"""Test determination of playability for items."""
protocol_infos = [
# No protocol info for resource
("", True),
# Protocol info is poorly formatted but can play
("http-get", True),
# Protocol info is poorly formatted and can't play
("internal", False),
# Protocol is HTTP
("http-get:*:audio/mpeg", True),
# Protocol is RTSP
("rtsp-rtp-udp:*:MPA:", True),
# Protocol is something else
("internal:*:audio/mpeg:", False),
]
search_results: BrowseResultList = []
# No resources
search_results.append(didl_lite.DidlObject(id="", restricted="false", title=""))
search_results.extend(
didl_lite.MusicTrack(
id="",
restricted="false",
title="",
res=[didl_lite.Resource(uri="", protocol_info=protocol_info)],
)
for protocol_info, _ in protocol_infos
)
# Use browse_search to get multiple items at once for least effort
dms_device_mock.async_search_directory.return_value = DmsDevice.BrowseResult(
search_results, len(search_results), len(search_results), 0
)
result = await device_source_mock.async_browse_media("?query")
assert result.children
assert not result.children[0].can_play
for idx, info_can_play in enumerate(protocol_infos):
protocol_info, can_play = info_can_play
assert result.children[idx + 1].can_play is can_play, f"Checked {protocol_info}"

View File

@ -0,0 +1,59 @@
"""Test the DLNA DMS component setup, cleanup, and module-level functions."""
from unittest.mock import Mock
from homeassistant.components.dlna_dms.const import DOMAIN
from homeassistant.components.dlna_dms.dms import DlnaDmsData
from homeassistant.core import HomeAssistant
from homeassistant.setup import async_setup_component
from tests.common import MockConfigEntry
async def test_resource_lifecycle(
hass: HomeAssistant,
domain_data_mock: DlnaDmsData,
config_entry_mock: MockConfigEntry,
ssdp_scanner_mock: Mock,
dms_device_mock: Mock,
) -> None:
"""Test that resources are acquired/released as the entity is setup/unloaded."""
# Set up the config entry
config_entry_mock.add_to_hass(hass)
assert await async_setup_component(hass, DOMAIN, {}) is True
await hass.async_block_till_done()
# Check the entity is created and working
assert len(domain_data_mock.devices) == 1
assert len(domain_data_mock.sources) == 1
entity = next(iter(domain_data_mock.devices.values()))
assert entity.available is True
# Check update listeners are subscribed
assert len(config_entry_mock.update_listeners) == 1
assert ssdp_scanner_mock.async_register_callback.await_count == 2
assert ssdp_scanner_mock.async_register_callback.return_value.call_count == 0
# Check event notifiers are not subscribed - dlna_dms doesn't use them
assert dms_device_mock.async_subscribe_services.await_count == 0
assert dms_device_mock.async_unsubscribe_services.await_count == 0
assert dms_device_mock.on_event is None
# Unload the config entry
assert await hass.config_entries.async_remove(config_entry_mock.entry_id) == {
"require_restart": False
}
# Check update listeners are released
assert not config_entry_mock.update_listeners
assert ssdp_scanner_mock.async_register_callback.await_count == 2
assert ssdp_scanner_mock.async_register_callback.return_value.call_count == 2
# Check event notifiers are still not subscribed
assert dms_device_mock.async_subscribe_services.await_count == 0
assert dms_device_mock.async_unsubscribe_services.await_count == 0
assert dms_device_mock.on_event is None
# Check entity is gone
assert not domain_data_mock.devices
assert not domain_data_mock.sources

View File

@ -0,0 +1,255 @@
"""Tests for dlna_dms.media_source, mostly testing DmsMediaSource."""
from unittest.mock import ANY, Mock
from async_upnp_client.exceptions import UpnpError
from didl_lite import didl_lite
import pytest
from homeassistant.components.dlna_dms.const import DOMAIN
from homeassistant.components.dlna_dms.dms import DlnaDmsData, DmsDeviceSource
from homeassistant.components.dlna_dms.media_source import (
DmsMediaSource,
async_get_media_source,
)
from homeassistant.components.media_player.errors import BrowseError
from homeassistant.components.media_source.error import Unresolvable
from homeassistant.components.media_source.models import (
BrowseMediaSource,
MediaSourceItem,
)
from homeassistant.const import CONF_DEVICE_ID, CONF_URL
from homeassistant.core import HomeAssistant
from .conftest import (
MOCK_DEVICE_BASE_URL,
MOCK_DEVICE_NAME,
MOCK_DEVICE_TYPE,
MOCK_DEVICE_USN,
MOCK_SOURCE_ID,
)
from tests.common import MockConfigEntry
@pytest.fixture
async def entity(
hass: HomeAssistant,
config_entry_mock: MockConfigEntry,
dms_device_mock: Mock,
domain_data_mock: DlnaDmsData,
) -> DmsDeviceSource:
"""Fixture to set up a DmsDeviceSource in a connected state and cleanup at completion."""
await hass.config_entries.async_add(config_entry_mock)
await hass.async_block_till_done()
return domain_data_mock.devices[MOCK_DEVICE_USN]
@pytest.fixture
async def dms_source(hass: HomeAssistant, entity: DmsDeviceSource) -> DmsMediaSource:
"""Fixture providing a pre-constructed DmsMediaSource with a single device."""
return DmsMediaSource(hass)
async def test_get_media_source(hass: HomeAssistant) -> None:
"""Test the async_get_media_source function and DmsMediaSource constructor."""
source = await async_get_media_source(hass)
assert isinstance(source, DmsMediaSource)
assert source.domain == DOMAIN
async def test_resolve_media_unconfigured(hass: HomeAssistant) -> None:
"""Test resolve_media without any devices being configured."""
source = DmsMediaSource(hass)
item = MediaSourceItem(hass, DOMAIN, "source_id/media_id")
with pytest.raises(Unresolvable, match="No sources have been configured"):
await source.async_resolve_media(item)
async def test_resolve_media_bad_identifier(
hass: HomeAssistant, dms_source: DmsMediaSource
) -> None:
"""Test trying to resolve an item that has an unresolvable identifier."""
# Empty identifier
item = MediaSourceItem(hass, DOMAIN, "")
with pytest.raises(Unresolvable, match="No source ID.*"):
await dms_source.async_resolve_media(item)
# Identifier has media_id but no source_id
item = MediaSourceItem(hass, DOMAIN, "/media_id")
with pytest.raises(Unresolvable, match="No source ID.*"):
await dms_source.async_resolve_media(item)
# Identifier has source_id but no media_id
item = MediaSourceItem(hass, DOMAIN, "source_id/")
with pytest.raises(Unresolvable, match="No media ID.*"):
await dms_source.async_resolve_media(item)
# Identifier is missing source_id/media_id separator
item = MediaSourceItem(hass, DOMAIN, "source_id")
with pytest.raises(Unresolvable, match="No media ID.*"):
await dms_source.async_resolve_media(item)
# Identifier has an unknown source_id
item = MediaSourceItem(hass, DOMAIN, "unknown_source/media_id")
with pytest.raises(Unresolvable, match="Unknown source ID: unknown_source"):
await dms_source.async_resolve_media(item)
async def test_resolve_media_success(
hass: HomeAssistant, dms_source: DmsMediaSource, dms_device_mock: Mock
) -> None:
"""Test resolving an item via a DmsDeviceSource."""
object_id = "123"
item = MediaSourceItem(hass, DOMAIN, f"{MOCK_SOURCE_ID}/:{object_id}")
res_url = "foo/bar"
res_mime = "audio/mpeg"
didl_item = didl_lite.Item(
id=object_id,
restricted=False,
title="Object",
res=[didl_lite.Resource(uri=res_url, protocol_info=f"http-get:*:{res_mime}:")],
)
dms_device_mock.async_browse_metadata.return_value = didl_item
result = await dms_source.async_resolve_media(item)
assert result.url == f"{MOCK_DEVICE_BASE_URL}/{res_url}"
assert result.mime_type == res_mime
assert result.didl_metadata is didl_item
async def test_browse_media_unconfigured(hass: HomeAssistant) -> None:
"""Test browse_media without any devices being configured."""
source = DmsMediaSource(hass)
item = MediaSourceItem(hass, DOMAIN, "source_id/media_id")
with pytest.raises(BrowseError, match="No sources have been configured"):
await source.async_browse_media(item)
item = MediaSourceItem(hass, DOMAIN, "")
with pytest.raises(BrowseError, match="No sources have been configured"):
await source.async_browse_media(item)
async def test_browse_media_bad_identifier(
hass: HomeAssistant, dms_source: DmsMediaSource
) -> None:
"""Test browse_media with a bad source_id."""
item = MediaSourceItem(hass, DOMAIN, "bad-id/media_id")
with pytest.raises(BrowseError, match="Unknown source ID: bad-id"):
await dms_source.async_browse_media(item)
async def test_browse_media_single_source_no_identifier(
hass: HomeAssistant, dms_source: DmsMediaSource, dms_device_mock: Mock
) -> None:
"""Test browse_media without a source_id, with a single device registered."""
# Fast bail-out, mock will be checked after
dms_device_mock.async_browse_metadata.side_effect = UpnpError
# No source_id nor media_id
item = MediaSourceItem(hass, DOMAIN, "")
with pytest.raises(BrowseError):
await dms_source.async_browse_media(item)
# Mock device should've been browsed for the root directory
dms_device_mock.async_browse_metadata.assert_awaited_once_with(
"0", metadata_filter=ANY
)
# No source_id but a media_id
item = MediaSourceItem(hass, DOMAIN, "/:media-item-id")
dms_device_mock.async_browse_metadata.reset_mock()
with pytest.raises(BrowseError):
await dms_source.async_browse_media(item)
# Mock device should've been browsed for the root directory
dms_device_mock.async_browse_metadata.assert_awaited_once_with(
"media-item-id", metadata_filter=ANY
)
async def test_browse_media_multiple_sources(
hass: HomeAssistant, dms_source: DmsMediaSource, dms_device_mock: Mock
) -> None:
"""Test browse_media without a source_id, with multiple devices registered."""
# Set up a second source
other_source_id = "second_source"
other_source_title = "Second source"
other_config_entry = MockConfigEntry(
unique_id=f"different-udn::{MOCK_DEVICE_TYPE}",
domain=DOMAIN,
data={
CONF_URL: "http://192.88.99.22/dms_description.xml",
CONF_DEVICE_ID: f"different-udn::{MOCK_DEVICE_TYPE}",
},
title=other_source_title,
)
await hass.config_entries.async_add(other_config_entry)
await hass.async_block_till_done()
# No source_id nor media_id
item = MediaSourceItem(hass, DOMAIN, "")
result = await dms_source.async_browse_media(item)
# Mock device should not have been browsed
assert dms_device_mock.async_browse_metadata.await_count == 0
# Result will be a list of available devices
assert result.title == "DLNA Servers"
assert result.children
assert isinstance(result.children[0], BrowseMediaSource)
assert result.children[0].identifier == f"{MOCK_SOURCE_ID}/:0"
assert result.children[0].title == MOCK_DEVICE_NAME
assert isinstance(result.children[1], BrowseMediaSource)
assert result.children[1].identifier == f"{other_source_id}/:0"
assert result.children[1].title == other_source_title
# No source_id but a media_id - will give the exact same list of all devices
item = MediaSourceItem(hass, DOMAIN, "/:media-item-id")
result = await dms_source.async_browse_media(item)
# Mock device should not have been browsed
assert dms_device_mock.async_browse_metadata.await_count == 0
# Result will be a list of available devices
assert result.title == "DLNA Servers"
assert result.children
assert isinstance(result.children[0], BrowseMediaSource)
assert result.children[0].identifier == f"{MOCK_SOURCE_ID}/:0"
assert result.children[0].title == MOCK_DEVICE_NAME
assert isinstance(result.children[1], BrowseMediaSource)
assert result.children[1].identifier == f"{other_source_id}/:0"
assert result.children[1].title == other_source_title
async def test_browse_media_source_id(
hass: HomeAssistant,
config_entry_mock: MockConfigEntry,
dms_device_mock: Mock,
domain_data_mock: DlnaDmsData,
) -> None:
"""Test browse_media with an explicit source_id."""
# Set up a second device first, then the primary mock device.
# This allows testing that the right source is chosen by source_id
other_source_title = "Second source"
other_config_entry = MockConfigEntry(
unique_id=f"different-udn::{MOCK_DEVICE_TYPE}",
domain=DOMAIN,
data={
CONF_URL: "http://192.88.99.22/dms_description.xml",
CONF_DEVICE_ID: f"different-udn::{MOCK_DEVICE_TYPE}",
},
title=other_source_title,
)
await hass.config_entries.async_add(other_config_entry)
await hass.async_block_till_done()
await hass.config_entries.async_add(config_entry_mock)
await hass.async_block_till_done()
# Fast bail-out, mock will be checked after
dms_device_mock.async_browse_metadata.side_effect = UpnpError
# Browse by source_id
item = MediaSourceItem(hass, DOMAIN, f"{MOCK_SOURCE_ID}/:media-item-id")
dms_source = DmsMediaSource(hass)
with pytest.raises(BrowseError):
await dms_source.async_browse_media(item)
# Mock device should've been browsed for the root directory
dms_device_mock.async_browse_metadata.assert_awaited_once_with(
"media-item-id", metadata_filter=ANY
)