Add Upnp volume control/status to SamsungTV (#68663)

Co-authored-by: epenet <epenet@users.noreply.github.com>
Co-authored-by: J. Nick Koston <nick@koston.org>
This commit is contained in:
epenet
2022-03-28 00:27:24 +02:00
committed by GitHub
parent b5496441ae
commit c024033dae
7 changed files with 240 additions and 16 deletions

View File

@ -6,7 +6,8 @@
"getmac==0.8.2",
"samsungctl[websocket]==0.7.1",
"samsungtvws[async,encrypted]==2.5.0",
"wakeonlan==2.0.1"
"wakeonlan==2.0.1",
"async-upnp-client==0.27.0"
],
"ssdp": [
{

View File

@ -2,9 +2,15 @@
from __future__ import annotations
import asyncio
from collections.abc import Coroutine
import contextlib
from datetime import datetime, timedelta
from typing import Any
from async_upnp_client.aiohttp import AiohttpSessionRequester
from async_upnp_client.client import UpnpDevice, UpnpService
from async_upnp_client.client_factory import UpnpFactory
from async_upnp_client.exceptions import UpnpActionResponseError, UpnpConnectionError
import voluptuous as vol
from wakeonlan import send_magic_packet
@ -24,12 +30,14 @@ from homeassistant.components.media_player.const import (
SUPPORT_TURN_OFF,
SUPPORT_TURN_ON,
SUPPORT_VOLUME_MUTE,
SUPPORT_VOLUME_SET,
SUPPORT_VOLUME_STEP,
)
from homeassistant.config_entries import SOURCE_REAUTH, ConfigEntry
from homeassistant.const import CONF_HOST, CONF_MAC, CONF_NAME, STATE_OFF, STATE_ON
from homeassistant.core import HomeAssistant
from homeassistant.helpers import entity_component
from homeassistant.helpers.aiohttp_client import async_get_clientsession
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.device_registry import CONNECTION_NETWORK_MAC
from homeassistant.helpers.entity import DeviceInfo
@ -42,9 +50,11 @@ from .const import (
CONF_MANUFACTURER,
CONF_MODEL,
CONF_ON_ACTION,
CONF_SSDP_RENDERING_CONTROL_LOCATION,
DEFAULT_NAME,
DOMAIN,
LOGGER,
UPNP_SVC_RENDERINGCONTROL,
)
SOURCES = {"TV": "KEY_TV", "HDMI": "KEY_HDMI"}
@ -104,6 +114,9 @@ class SamsungTVDevice(MediaPlayerEntity):
self._config_entry = config_entry
self._host: str | None = config_entry.data[CONF_HOST]
self._mac: str | None = config_entry.data.get(CONF_MAC)
self._ssdp_rendering_control_location = config_entry.data.get(
CONF_SSDP_RENDERING_CONTROL_LOCATION
)
self._on_script = on_script
# Assume that the TV is in Play mode
self._playing: bool = True
@ -121,6 +134,8 @@ class SamsungTVDevice(MediaPlayerEntity):
if self._on_script or self._mac:
# Add turn-on if on_script or mac is available
self._attr_supported_features |= SUPPORT_TURN_ON
if self._ssdp_rendering_control_location:
self._attr_supported_features |= SUPPORT_VOLUME_SET
self._attr_device_info = DeviceInfo(
name=self.name,
@ -142,6 +157,8 @@ class SamsungTVDevice(MediaPlayerEntity):
self._bridge.register_reauth_callback(self.access_denied)
self._bridge.register_app_list_callback(self._app_list_callback)
self._upnp_device: UpnpDevice | None = None
def _update_sources(self) -> None:
self._attr_source_list = list(SOURCES)
if app_list := self._app_list:
@ -179,21 +196,77 @@ class SamsungTVDevice(MediaPlayerEntity):
STATE_ON if await self._bridge.async_is_on() else STATE_OFF
)
if self._attr_state == STATE_ON and not self._app_list_event.is_set():
await self._bridge.async_request_app_list()
if self._app_list_event.is_set():
# The try+wait_for is a bit expensive so we should try not to
# enter it unless we have to (Python 3.11 will have zero cost try)
return
try:
await asyncio.wait_for(self._app_list_event.wait(), APP_LIST_DELAY)
except asyncio.TimeoutError as err:
# No need to try again
self._app_list_event.set()
LOGGER.debug(
"Failed to load app list from %s: %s", self._host, err.__repr__()
if self._attr_state != STATE_ON:
return
startup_tasks: list[Coroutine[Any, Any, None]] = []
if not self._app_list_event.is_set():
startup_tasks.append(self._async_startup_app_list())
if not self._upnp_device and self._ssdp_rendering_control_location:
startup_tasks.append(self._async_startup_upnp())
if startup_tasks:
await asyncio.gather(*startup_tasks)
if not (service := self._get_upnp_service()):
return
get_volume, get_mute = await asyncio.gather(
service.action("GetVolume").async_call(InstanceID=0, Channel="Master"),
service.action("GetMute").async_call(InstanceID=0, Channel="Master"),
)
LOGGER.debug("Upnp GetVolume on %s: %s", self._host, get_volume)
if (volume_level := get_volume.get("CurrentVolume")) is not None:
self._attr_volume_level = volume_level / 100
LOGGER.debug("Upnp GetMute on %s: %s", self._host, get_mute)
if (is_muted := get_mute.get("CurrentMute")) is not None:
self._attr_is_volume_muted = is_muted
async def _async_startup_app_list(self) -> None:
await self._bridge.async_request_app_list()
if self._app_list_event.is_set():
# The try+wait_for is a bit expensive so we should try not to
# enter it unless we have to (Python 3.11 will have zero cost try)
return
try:
await asyncio.wait_for(self._app_list_event.wait(), APP_LIST_DELAY)
except asyncio.TimeoutError as err:
# No need to try again
self._app_list_event.set()
LOGGER.debug(
"Failed to load app list from %s: %s", self._host, err.__repr__()
)
async def _async_startup_upnp(self) -> None:
assert self._ssdp_rendering_control_location is not None
if self._upnp_device is None:
session = async_get_clientsession(self.hass)
upnp_requester = AiohttpSessionRequester(session)
upnp_factory = UpnpFactory(upnp_requester)
with contextlib.suppress(UpnpConnectionError):
self._upnp_device = await upnp_factory.async_create_device(
self._ssdp_rendering_control_location
)
def _get_upnp_service(self, log: bool = False) -> UpnpService | None:
if self._upnp_device is None:
if log:
LOGGER.info("Upnp services are not available on %s", self._host)
return None
if service := self._upnp_device.services.get(UPNP_SVC_RENDERINGCONTROL):
return service
if log:
LOGGER.info(
"Upnp service %s is not available on %s",
UPNP_SVC_RENDERINGCONTROL,
self._host,
)
return None
async def _async_launch_app(self, app_id: str) -> None:
"""Send launch_app to the tv."""
if self._power_off_in_progress():
@ -233,6 +306,19 @@ class SamsungTVDevice(MediaPlayerEntity):
self._end_of_power_off = dt_util.utcnow() + SCAN_INTERVAL_PLUS_OFF_TIME
await self._bridge.async_power_off()
async def async_set_volume_level(self, volume: float) -> None:
"""Set volume level on the media player."""
if not (service := self._get_upnp_service(log=True)):
return
try:
await service.action("SetVolume").async_call(
InstanceID=0, Channel="Master", DesiredVolume=int(volume * 100)
)
except UpnpActionResponseError as err:
LOGGER.warning(
"Unable to set volume level on %s: %s", self._host, err.__repr__()
)
async def async_volume_up(self) -> None:
"""Volume up the media player."""
await self._async_send_keys(["KEY_VOLUP"])

View File

@ -326,6 +326,7 @@ asterisk_mbox==0.5.0
# homeassistant.components.dlna_dmr
# homeassistant.components.dlna_dms
# homeassistant.components.samsungtv
# homeassistant.components.ssdp
# homeassistant.components.upnp
# homeassistant.components.yeelight

View File

@ -259,6 +259,7 @@ arcam-fmj==0.12.0
# homeassistant.components.dlna_dmr
# homeassistant.components.dlna_dms
# homeassistant.components.samsungtv
# homeassistant.components.ssdp
# homeassistant.components.upnp
# homeassistant.components.yeelight

View File

@ -1,6 +1,10 @@
"""Tests for the samsungtv component."""
from __future__ import annotations
from unittest.mock import Mock
from async_upnp_client.client import UpnpAction, UpnpService
from homeassistant.components.samsungtv.const import DOMAIN
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
@ -21,3 +25,24 @@ async def setup_samsungtv_entry(hass: HomeAssistant, data: ConfigType) -> Config
await hass.async_block_till_done()
return entry
def upnp_get_action_mock(device: Mock, service_type: str, action: str) -> Mock:
"""Get or Add UpnpService/UpnpAction to UpnpDevice mock."""
upnp_service: Mock | None
if (upnp_service := device.services.get(service_type)) is None:
upnp_service = Mock(UpnpService)
upnp_service.actions = {}
def _get_action(action: str):
return upnp_service.actions.get(action)
upnp_service.action.side_effect = _get_action
device.services[service_type] = upnp_service
upnp_action: Mock | None
if (upnp_action := upnp_service.actions.get(action)) is None:
upnp_action = Mock(UpnpAction)
upnp_service.actions[action] = upnp_action
return upnp_action

View File

@ -6,6 +6,8 @@ from datetime import datetime
from typing import Any
from unittest.mock import AsyncMock, Mock, patch
from async_upnp_client.client import UpnpDevice
from async_upnp_client.exceptions import UpnpConnectionError
import pytest
from samsungctl import Remote
from samsungtvws.async_remote import SamsungTVWSAsyncRemote
@ -38,6 +40,28 @@ def app_list_delay_fixture() -> None:
yield
@pytest.fixture(name="upnp_factory", autouse=True)
def upnp_factory_fixture() -> Mock:
"""Patch UpnpFactory."""
with patch(
"homeassistant.components.samsungtv.media_player.UpnpFactory",
autospec=True,
) as upnp_factory_class:
upnp_factory: Mock = upnp_factory_class.return_value
upnp_factory.async_create_device.side_effect = UpnpConnectionError
yield upnp_factory
@pytest.fixture(name="upnp_device")
async def upnp_device_fixture(upnp_factory: Mock) -> Mock:
"""Patch async_upnp_client."""
upnp_device = Mock(UpnpDevice)
upnp_device.services = {}
with patch.object(upnp_factory, "async_create_device", side_effect=[upnp_device]):
yield upnp_device
@pytest.fixture(name="remote")
def remote_fixture() -> Mock:
"""Patch the samsungctl Remote."""

View File

@ -4,6 +4,7 @@ from datetime import datetime, timedelta
import logging
from unittest.mock import DEFAULT as DEFAULT_MOCK, AsyncMock, Mock, call, patch
from async_upnp_client.exceptions import UpnpActionResponseError
import pytest
from samsungctl import exceptions
from samsungtvws.async_remote import SamsungTVWSAsyncRemote
@ -21,6 +22,7 @@ from homeassistant.components.media_player.const import (
ATTR_INPUT_SOURCE,
ATTR_MEDIA_CONTENT_ID,
ATTR_MEDIA_CONTENT_TYPE,
ATTR_MEDIA_VOLUME_LEVEL,
ATTR_MEDIA_VOLUME_MUTED,
DOMAIN,
MEDIA_TYPE_APP,
@ -33,13 +35,17 @@ from homeassistant.components.media_player.const import (
from homeassistant.components.samsungtv.const import (
CONF_MODEL,
CONF_ON_ACTION,
CONF_SSDP_RENDERING_CONTROL_LOCATION,
DOMAIN as SAMSUNGTV_DOMAIN,
ENCRYPTED_WEBSOCKET_PORT,
METHOD_ENCRYPTED_WEBSOCKET,
METHOD_WEBSOCKET,
TIMEOUT_WEBSOCKET,
)
from homeassistant.components.samsungtv.media_player import SUPPORT_SAMSUNGTV
from homeassistant.components.samsungtv.media_player import (
SUPPORT_SAMSUNGTV,
UPNP_SVC_RENDERINGCONTROL,
)
from homeassistant.const import (
ATTR_DEVICE_CLASS,
ATTR_ENTITY_ID,
@ -62,6 +68,7 @@ from homeassistant.const import (
SERVICE_TURN_ON,
SERVICE_VOLUME_DOWN,
SERVICE_VOLUME_MUTE,
SERVICE_VOLUME_SET,
SERVICE_VOLUME_UP,
STATE_OFF,
STATE_ON,
@ -73,7 +80,7 @@ from homeassistant.helpers.typing import ConfigType
from homeassistant.setup import async_setup_component
import homeassistant.util.dt as dt_util
from . import setup_samsungtv_entry
from . import setup_samsungtv_entry, upnp_get_action_mock
from .const import (
MOCK_ENTRYDATA_ENCRYPTED_WS,
SAMPLE_DEVICE_INFO_FRAME,
@ -119,6 +126,7 @@ MOCK_ENTRY_WS = {
CONF_NAME: "fake",
CONF_PORT: 8001,
CONF_TOKEN: "123456789",
CONF_SSDP_RENDERING_CONTROL_LOCATION: "https://any",
}
@ -1304,3 +1312,81 @@ async def test_websocket_unsupported_remote_control(
assert entry.data[CONF_PORT] == ENCRYPTED_WEBSOCKET_PORT
state = hass.states.get(ENTITY_ID)
assert state.state == STATE_UNAVAILABLE
@pytest.mark.usefixtures("remotews")
async def test_volume_control_upnp(
hass: HomeAssistant, upnp_device: Mock, caplog: pytest.LogCaptureFixture
) -> None:
"""Test for Upnp volume control."""
upnp_get_volume = upnp_get_action_mock(
upnp_device, UPNP_SVC_RENDERINGCONTROL, "GetVolume"
)
upnp_get_volume.async_call.return_value = {"CurrentVolume": 44}
upnp_get_mute = upnp_get_action_mock(
upnp_device, UPNP_SVC_RENDERINGCONTROL, "GetMute"
)
upnp_get_mute.async_call.return_value = {"CurrentMute": False}
await setup_samsungtv_entry(hass, MOCK_ENTRY_WS)
upnp_get_volume.async_call.assert_called_once()
upnp_get_mute.async_call.assert_called_once()
# Upnp action succeeds
upnp_set_volume = upnp_get_action_mock(
upnp_device, UPNP_SVC_RENDERINGCONTROL, "SetVolume"
)
assert await hass.services.async_call(
DOMAIN,
SERVICE_VOLUME_SET,
{ATTR_ENTITY_ID: ENTITY_ID, ATTR_MEDIA_VOLUME_LEVEL: 0.5},
True,
)
assert "Unable to set volume level on" not in caplog.text
# Upnp action failed
upnp_set_volume.async_call.side_effect = UpnpActionResponseError(
status=500, error_code=501, error_desc="Action Failed"
)
assert await hass.services.async_call(
DOMAIN,
SERVICE_VOLUME_SET,
{ATTR_ENTITY_ID: ENTITY_ID, ATTR_MEDIA_VOLUME_LEVEL: 0.6},
True,
)
assert "Unable to set volume level on" in caplog.text
@pytest.mark.usefixtures("remotews")
async def test_upnp_not_available(
hass: HomeAssistant, caplog: pytest.LogCaptureFixture
) -> None:
"""Test for volume control when Upnp is not available."""
await setup_samsungtv_entry(hass, MOCK_ENTRY_WS)
# Upnp action fails
assert await hass.services.async_call(
DOMAIN,
SERVICE_VOLUME_SET,
{ATTR_ENTITY_ID: ENTITY_ID, ATTR_MEDIA_VOLUME_LEVEL: 0.6},
True,
)
assert "Upnp services are not available" in caplog.text
@pytest.mark.usefixtures("remotews", "upnp_device")
async def test_upnp_missing_service(
hass: HomeAssistant, caplog: pytest.LogCaptureFixture
) -> None:
"""Test for volume control when Upnp is not available."""
await setup_samsungtv_entry(hass, MOCK_ENTRY_WS)
# Upnp action fails
assert await hass.services.async_call(
DOMAIN,
SERVICE_VOLUME_SET,
{ATTR_ENTITY_ID: ENTITY_ID, ATTR_MEDIA_VOLUME_LEVEL: 0.6},
True,
)
assert f"Upnp service {UPNP_SVC_RENDERINGCONTROL} is not available" in caplog.text