Import flow prompts user when device is uncontactable during migration

When config flow is able to contact a device, or when it has information
from SSDP, it will create config entries without error. If the device is
uncontactable at this point then it will appear as unavailable in HA
until it is turned on again.

When import flow cannot migrate an entry because it needs to contact the
device and can't, it will notify the user with a config flow form.
This commit is contained in:
Michael "Chishm" Chisholm
2021-10-02 21:47:01 +10:00
parent 09b02618b1
commit a4961c0755
8 changed files with 269 additions and 308 deletions

View File

@@ -12,7 +12,7 @@ from async_upnp_client.profiles.dlna import DmrDevice
from async_upnp_client.profiles.profile import find_device_of_type from async_upnp_client.profiles.profile import find_device_of_type
import voluptuous as vol import voluptuous as vol
from homeassistant import config_entries from homeassistant import config_entries, data_entry_flow
from homeassistant.components import ssdp from homeassistant.components import ssdp
from homeassistant.const import CONF_DEVICE_ID, CONF_NAME, CONF_TYPE, CONF_URL from homeassistant.const import CONF_DEVICE_ID, CONF_NAME, CONF_TYPE, CONF_URL
from homeassistant.core import callback from homeassistant.core import callback
@@ -25,6 +25,7 @@ from .const import (
CONF_CALLBACK_URL_OVERRIDE, CONF_CALLBACK_URL_OVERRIDE,
CONF_LISTEN_PORT, CONF_LISTEN_PORT,
CONF_POLL_AVAILABILITY, CONF_POLL_AVAILABILITY,
DEFAULT_NAME,
DOMAIN, DOMAIN,
) )
from .data import get_domain_data from .data import get_domain_data
@@ -51,6 +52,11 @@ class DlnaDmrFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
def __init__(self) -> None: def __init__(self) -> None:
"""Initialize flow.""" """Initialize flow."""
self._discoveries: list[Mapping[str, str]] = [] self._discoveries: list[Mapping[str, str]] = []
self._location: str | None = None
self._udn: str | None = None
self._device_type: str | None = None
self._name: str | None = None
self._options: dict[str, Any] = {}
@staticmethod @staticmethod
@callback @callback
@@ -68,22 +74,18 @@ class DlnaDmrFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
""" """
LOGGER.debug("async_step_user: user_input: %s", user_input) LOGGER.debug("async_step_user: user_input: %s", user_input)
# Device setup manually, assume we don't get SSDP broadcast notifications
self._options[CONF_POLL_AVAILABILITY] = True
errors = {} errors = {}
if user_input is not None: if user_input is not None:
self._location = user_input[CONF_URL]
try: try:
discovery = await self._async_connect(user_input[CONF_URL]) await self._async_connect()
except ConnectError as err: except ConnectError as err:
errors["base"] = err.args[0] errors["base"] = err.args[0]
else: else:
# If unmigrated config was imported earlier then use it return await self._async_create_entry()
import_data = get_domain_data(self.hass).unmigrated_config.get(
user_input[CONF_URL]
)
if import_data is not None:
return await self.async_step_import(import_data)
# Device setup manually, assume we don't get SSDP broadcast notifications
options = {CONF_POLL_AVAILABILITY: True}
return await self._async_create_entry_from_discovery(discovery, options)
data_schema = vol.Schema({CONF_URL: str}) data_schema = vol.Schema({CONF_URL: str})
return self.async_show_form( return self.async_show_form(
@@ -93,9 +95,10 @@ class DlnaDmrFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
async def async_step_import(self, import_data: FlowInput = None) -> FlowResult: async def async_step_import(self, import_data: FlowInput = None) -> FlowResult:
"""Import a new DLNA DMR device from a config entry. """Import a new DLNA DMR device from a config entry.
This flow is triggered by `async_setup`. If no device has been This flow is triggered by `async_setup_platform`. If the device has not
configured before, find any matching device and create a config_entry been migrated, and can be connected to, automatically import it. If it
for it. Otherwise, do nothing. cannot be connected to, prompt the user to turn it on. If it has already
been migrated, do nothing.
""" """
LOGGER.debug("async_step_import: import_data: %s", import_data) LOGGER.debug("async_step_import: import_data: %s", import_data)
@@ -103,123 +106,171 @@ class DlnaDmrFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
LOGGER.debug("Entry not imported: incomplete_config") LOGGER.debug("Entry not imported: incomplete_config")
return self.async_abort(reason="incomplete_config") return self.async_abort(reason="incomplete_config")
self._async_abort_entries_match({CONF_URL: import_data[CONF_URL]}) self._location = import_data[CONF_URL]
self._async_abort_entries_match({CONF_URL: self._location})
location = import_data[CONF_URL] # Use the location as this config flow's unique ID until UDN is known
self._discoveries = await self._async_get_discoveries() await self.async_set_unique_id(self._location)
poll_availability = True
# Find the device in the list of unconfigured devices
for discovery in self._discoveries:
if discovery[ssdp.ATTR_SSDP_LOCATION] == location:
# Device found via SSDP, it shouldn't need polling
poll_availability = False
LOGGER.debug(
"Entry %s found via SSDP, with UDN %s",
import_data[CONF_URL],
discovery[ssdp.ATTR_SSDP_UDN],
)
break
else:
# Not in discoveries. Try connecting directly.
try:
discovery = await self._async_connect(location)
except ConnectError as err:
LOGGER.debug(
"Entry %s not imported: %s", import_data[CONF_URL], err.args[0]
)
# Store the config to apply if the device is added later
get_domain_data(self.hass).unmigrated_config[location] = import_data
return self.async_abort(reason=err.args[0])
# Set options from the import_data, except listen_ip which is no longer used # Set options from the import_data, except listen_ip which is no longer used
options = { self._options[CONF_LISTEN_PORT] = import_data.get(CONF_LISTEN_PORT)
CONF_LISTEN_PORT: import_data.get(CONF_LISTEN_PORT), self._options[CONF_CALLBACK_URL_OVERRIDE] = import_data.get(
CONF_CALLBACK_URL_OVERRIDE: import_data.get(CONF_CALLBACK_URL_OVERRIDE), CONF_CALLBACK_URL_OVERRIDE
CONF_POLL_AVAILABILITY: poll_availability, )
}
# Override device name if it's set in the YAML # Override device name if it's set in the YAML
if CONF_NAME in import_data: self._name = import_data.get(CONF_NAME)
discovery = dict(discovery)
discovery[ssdp.ATTR_UPNP_FRIENDLY_NAME] = import_data[CONF_NAME]
LOGGER.debug("Entry %s ready for import", import_data[CONF_URL]) discoveries = await self._async_get_discoveries()
return await self._async_create_entry_from_discovery(discovery, options)
# Find the device in the list of unconfigured devices
for discovery in discoveries:
if discovery[ssdp.ATTR_SSDP_LOCATION] == self._location:
# Device found via SSDP, it shouldn't need polling
self._options[CONF_POLL_AVAILABILITY] = False
# Discovery info has everything required to create config entry
self._set_info_from_discovery(discovery)
LOGGER.debug(
"Entry %s found via SSDP, with UDN %s",
self._location,
self._udn,
)
return await self._async_create_entry()
# This device will need to be polled
self._options[CONF_POLL_AVAILABILITY] = True
# Device was not found via SSDP, connect directly for configuration
try:
await self._async_connect()
except ConnectError as err:
# This will require user action
LOGGER.debug("Entry %s not imported yet: %s", self._location, err.args[0])
return await self.async_step_import_turn_on()
LOGGER.debug("Entry %s ready for import", self._location)
return await self._async_create_entry()
async def async_step_import_turn_on(
self, user_input: FlowInput = None
) -> FlowResult:
"""Request the user to turn on the device so that import can finish."""
LOGGER.debug("async_step_import_turn_on: %s", user_input)
self.context["title_placeholders"] = {"name": self._name or self._location}
errors = {}
if user_input is not None:
try:
await self._async_connect()
except ConnectError as err:
errors["base"] = err.args[0]
else:
return await self._async_create_entry()
self._set_confirm_only()
return self.async_show_form(step_id="import_turn_on", errors=errors)
async def async_step_ssdp(self, discovery_info: DiscoveryInfoType) -> FlowResult: async def async_step_ssdp(self, discovery_info: DiscoveryInfoType) -> FlowResult:
"""Handle a flow initialized by SSDP discovery.""" """Handle a flow initialized by SSDP discovery."""
LOGGER.debug("async_step_ssdp: discovery_info %s", pformat(discovery_info)) LOGGER.debug("async_step_ssdp: discovery_info %s", pformat(discovery_info))
self._discoveries = [discovery_info] self._location = discovery_info[ssdp.ATTR_SSDP_LOCATION]
self._udn = discovery_info[ssdp.ATTR_SSDP_UDN]
udn = discovery_info[ssdp.ATTR_SSDP_UDN]
location = discovery_info[ssdp.ATTR_SSDP_LOCATION]
# Abort if already configured, but update the last-known location # Abort if already configured, but update the last-known location
await self.async_set_unique_id(udn) await self.async_set_unique_id(self._udn)
self._abort_if_unique_id_configured( self._abort_if_unique_id_configured(
updates={CONF_URL: location}, reload_on_update=False updates={CONF_URL: self._location}, reload_on_update=False
) )
# If the device needs migration because it wasn't turned on when HA # Abort if a migration flow for the device's location is in progress
# started, silently migrate it now. for progress in self._async_in_progress(include_uninitialized=True):
import_data = get_domain_data(self.hass).unmigrated_config.get(location) if progress["context"].get("unique_id") == self._location:
if import_data is not None: LOGGER.debug(
return await self.async_step_import(import_data) "Aborting SSDP setup because migration for %s is in progress",
self._location,
)
raise data_entry_flow.AbortFlow("already_in_progress")
parsed_url = urlparse(location) self._set_info_from_discovery(discovery_info)
name = discovery_info.get(ssdp.ATTR_UPNP_FRIENDLY_NAME) or parsed_url.hostname self.context["title_placeholders"] = {"name": self._name}
self.context["title_placeholders"] = {"name": name}
return await self.async_step_confirm() return await self.async_step_confirm()
async def async_step_confirm(self, user_input: FlowInput = None) -> FlowResult: async def async_step_confirm(self, user_input: FlowInput = None) -> FlowResult:
"""Allow the user to confirm adding the device. """Allow the user to confirm adding the device."""
Also check that the device is still available, otherwise when it is
added to HA it won't report the correct DeviceInfo.
"""
LOGGER.debug("async_step_confirm: %s", user_input) LOGGER.debug("async_step_confirm: %s", user_input)
errors = {}
if user_input is not None: if user_input is not None:
discovery = self._discoveries[0] return await self._async_create_entry()
try:
await self._async_connect(discovery[ssdp.ATTR_SSDP_LOCATION])
except ConnectError as err:
errors["base"] = err.args[0]
else:
return await self._async_create_entry_from_discovery(discovery)
self._set_confirm_only() self._set_confirm_only()
return self.async_show_form(step_id="confirm", errors=errors) return self.async_show_form(step_id="confirm")
async def _async_create_entry_from_discovery( async def _async_connect(self) -> None:
self, """Connect to a device to confirm it works and gather extra information.
discovery: Mapping[str, Any],
options: Mapping[str, Any] | None = None,
) -> FlowResult:
"""Create an entry from discovery."""
LOGGER.debug("_async_create_entry_from_discovery: discovery: %s", discovery)
location = discovery[ssdp.ATTR_SSDP_LOCATION] Updates this flow's unique ID to the device UDN if not already done.
udn = discovery[ssdp.ATTR_SSDP_UDN] Raises ConnectError if something goes wrong.
"""
LOGGER.debug("_async_connect: location: %s", self._location)
assert self._location, "self._location has not been set before connect"
domain_data = get_domain_data(self.hass)
try:
device = await domain_data.upnp_factory.async_create_device(self._location)
except UpnpError as err:
raise ConnectError("could_not_connect") from err
try:
device = find_device_of_type(device, DmrDevice.DEVICE_TYPES)
except UpnpError as err:
raise ConnectError("not_dmr") from err
if not self._udn:
self._udn = device.udn
if not self._device_type:
self._device_type = device.device_type
if not self._name:
self._name = device.name
async def _async_create_entry(self) -> FlowResult:
"""Create a config entry, assuming all required information is now known."""
LOGGER.debug(
"_async_create_entry: location: %s, UDN: %s", self._location, self._udn
)
assert self._location
assert self._udn
assert self._device_type
# Abort if already configured, but update the last-known location # Abort if already configured, but update the last-known location
await self.async_set_unique_id(udn) await self.async_set_unique_id(self._udn)
self._abort_if_unique_id_configured(updates={CONF_URL: location}) self._abort_if_unique_id_configured(updates={CONF_URL: self._location})
parsed_url = urlparse(location)
title = discovery.get(ssdp.ATTR_UPNP_FRIENDLY_NAME) or parsed_url.hostname
title = self._name or urlparse(self._location).hostname or DEFAULT_NAME
data = { data = {
CONF_URL: discovery[ssdp.ATTR_SSDP_LOCATION], CONF_URL: self._location,
CONF_DEVICE_ID: discovery[ssdp.ATTR_SSDP_UDN], CONF_DEVICE_ID: self._udn,
CONF_TYPE: discovery.get(ssdp.ATTR_SSDP_NT) or discovery[ssdp.ATTR_SSDP_ST], CONF_TYPE: self._device_type,
} }
return self.async_create_entry(title=title, data=data, options=options) return self.async_create_entry(title=title, data=data, options=self._options)
def _set_info_from_discovery(self, discovery_info: Mapping[str, Any]) -> None:
"""Set information required for a config entry from the SSDP discovery."""
assert self._location
self._udn = discovery_info[ssdp.ATTR_SSDP_UDN]
self._device_type = (
discovery_info.get(ssdp.ATTR_SSDP_NT) or discovery_info[ssdp.ATTR_SSDP_ST]
)
self._name = (
discovery_info.get(ssdp.ATTR_UPNP_FRIENDLY_NAME)
or urlparse(self._location).hostname
or DEFAULT_NAME
)
async def _async_get_discoveries(self) -> list[Mapping[str, str]]: async def _async_get_discoveries(self) -> list[Mapping[str, str]]:
"""Get list of unconfigured DLNA devices discovered by SSDP.""" """Get list of unconfigured DLNA devices discovered by SSDP."""
@@ -245,32 +296,6 @@ class DlnaDmrFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
return discoveries return discoveries
async def _async_connect(self, location: str) -> dict[str, str]:
"""Connect to a device to confirm it works and get discovery information.
Raises ConnectError if something goes wrong.
"""
LOGGER.debug("_async_connect(location=%s)", location)
domain_data = get_domain_data(self.hass)
try:
device = await domain_data.upnp_factory.async_create_device(location)
except UpnpError as err:
raise ConnectError("could_not_connect") from err
try:
device = find_device_of_type(device, DmrDevice.DEVICE_TYPES)
except UpnpError as err:
raise ConnectError("not_dmr") from err
discovery = {
ssdp.ATTR_SSDP_LOCATION: location,
ssdp.ATTR_SSDP_UDN: device.udn,
ssdp.ATTR_SSDP_ST: device.device_type,
ssdp.ATTR_UPNP_FRIENDLY_NAME: device.name,
}
return discovery
class DlnaDmrOptionsFlowHandler(config_entries.OptionsFlow): class DlnaDmrOptionsFlowHandler(config_entries.OptionsFlow):
"""Handle a DLNA DMR options flow. """Handle a DLNA DMR options flow.

View File

@@ -3,8 +3,7 @@ from __future__ import annotations
import asyncio import asyncio
from collections import defaultdict from collections import defaultdict
from collections.abc import Mapping from typing import NamedTuple, cast
from typing import Any, NamedTuple, cast
from async_upnp_client import UpnpEventHandler, UpnpFactory, UpnpRequester from async_upnp_client import UpnpEventHandler, UpnpFactory, UpnpRequester
from async_upnp_client.aiohttp import AiohttpNotifyServer, AiohttpSessionRequester from async_upnp_client.aiohttp import AiohttpNotifyServer, AiohttpSessionRequester
@@ -33,7 +32,6 @@ class DlnaDmrData:
event_notifiers: dict[EventListenAddr, AiohttpNotifyServer] event_notifiers: dict[EventListenAddr, AiohttpNotifyServer]
event_notifier_refs: defaultdict[EventListenAddr, int] event_notifier_refs: defaultdict[EventListenAddr, int]
stop_listener_remove: CALLBACK_TYPE | None = None stop_listener_remove: CALLBACK_TYPE | None = None
unmigrated_config: dict[str, Mapping[str, Any]]
def __init__(self, hass: HomeAssistant) -> None: def __init__(self, hass: HomeAssistant) -> None:
"""Initialize global data.""" """Initialize global data."""
@@ -43,7 +41,6 @@ class DlnaDmrData:
self.upnp_factory = UpnpFactory(self.requester, non_strict=True) self.upnp_factory = UpnpFactory(self.requester, non_strict=True)
self.event_notifiers = {} self.event_notifiers = {}
self.event_notifier_refs = defaultdict(int) self.event_notifier_refs = defaultdict(int)
self.unmigrated_config = {}
async def async_cleanup_event_notifiers(self, event: Event) -> None: async def async_cleanup_event_notifiers(self, event: Event) -> None:
"""Clean up resources when Home Assistant is stopped.""" """Clean up resources when Home Assistant is stopped."""

View File

@@ -44,12 +44,10 @@ import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.entity_platform import AddEntitiesCallback from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from .const import ( from .const import (
CONF_CALLBACK_URL_OVERRIDE, CONF_CALLBACK_URL_OVERRIDE,
CONF_LISTEN_PORT, CONF_LISTEN_PORT,
CONF_POLL_AVAILABILITY, CONF_POLL_AVAILABILITY,
DEFAULT_NAME,
DOMAIN, DOMAIN,
LOGGER as _LOGGER, LOGGER as _LOGGER,
MEDIA_TYPE_MAP, MEDIA_TYPE_MAP,
@@ -71,7 +69,7 @@ PLATFORM_SCHEMA = vol.All(
vol.Required(CONF_URL): cv.string, vol.Required(CONF_URL): cv.string,
vol.Optional(CONF_LISTEN_IP): cv.string, vol.Optional(CONF_LISTEN_IP): cv.string,
vol.Optional(CONF_LISTEN_PORT): cv.port, vol.Optional(CONF_LISTEN_PORT): cv.port,
vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string, vol.Optional(CONF_NAME): cv.string,
vol.Optional(CONF_CALLBACK_URL_OVERRIDE): cv.url, vol.Optional(CONF_CALLBACK_URL_OVERRIDE): cv.url,
} }
), ),
@@ -119,7 +117,8 @@ async def async_setup_platform(
_LOGGER.warning( _LOGGER.warning(
"Configuring dlna_dmr via yaml is deprecated; the configuration for" "Configuring dlna_dmr via yaml is deprecated; the configuration for"
" %s has been migrated to a config entry and can be safely removed", " %s will be migrated to a config entry and can be safely removed when"
"migration is complete",
config.get(CONF_URL), config.get(CONF_URL),
) )

View File

@@ -9,6 +9,9 @@
"url": "[%key:common::config_flow::data::url%]" "url": "[%key:common::config_flow::data::url%]"
} }
}, },
"import_turn_on": {
"description": "Please turn on the device and click submit to continue migration"
},
"confirm": { "confirm": {
"description": "[%key:common::config_flow::description::confirm_setup%]" "description": "[%key:common::config_flow::description::confirm_setup%]"
} }

View File

@@ -17,6 +17,9 @@
"confirm": { "confirm": {
"description": "Do you want to start set up?" "description": "Do you want to start set up?"
}, },
"import_turn_on": {
"description": "Please turn on the device and click confirm to continue migration"
},
"user": { "user": {
"data": { "data": {
"url": "URL" "url": "URL"

View File

@@ -52,8 +52,6 @@ def domain_data_mock(hass: HomeAssistant) -> Iterable[Mock]:
seal(upnp_device) seal(upnp_device)
domain_data.upnp_factory.async_create_device.return_value = upnp_device domain_data.upnp_factory.async_create_device.return_value = upnp_device
domain_data.unmigrated_config = {}
with patch.dict(hass.data, {DLNA_DOMAIN: domain_data}): with patch.dict(hass.data, {DLNA_DOMAIN: domain_data}):
yield domain_data yield domain_data

View File

@@ -194,30 +194,6 @@ async def test_import_flow_invalid(hass: HomeAssistant, domain_data_mock: Mock)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "incomplete_config" assert result["reason"] == "incomplete_config"
# Device is not contactable
domain_data_mock.upnp_factory.async_create_device.side_effect = UpnpError
result = await hass.config_entries.flow.async_init(
DLNA_DOMAIN,
context={"source": config_entries.SOURCE_IMPORT},
data={CONF_PLATFORM: DLNA_DOMAIN, CONF_URL: MOCK_DEVICE_LOCATION},
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "could_not_connect"
# Device is the wrong type
domain_data_mock.upnp_factory.async_create_device.side_effect = None
upnp_device = domain_data_mock.upnp_factory.async_create_device.return_value
upnp_device.device_type = WRONG_DEVICE_TYPE
result = await hass.config_entries.flow.async_init(
DLNA_DOMAIN,
context={"source": config_entries.SOURCE_IMPORT},
data={CONF_PLATFORM: DLNA_DOMAIN, CONF_URL: MOCK_DEVICE_LOCATION},
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "not_dmr"
async def test_import_flow_ssdp_discovered( async def test_import_flow_ssdp_discovered(
hass: HomeAssistant, ssdp_scanner_mock: Mock hass: HomeAssistant, ssdp_scanner_mock: Mock
@@ -316,6 +292,85 @@ async def test_import_flow_direct_connect(
} }
async def test_import_flow_offline(
hass: HomeAssistant, domain_data_mock: Mock, ssdp_scanner_mock: Mock
) -> None:
"""Test import flow of offline device."""
# Device is not yet contactable
domain_data_mock.upnp_factory.async_create_device.side_effect = UpnpError
result = await hass.config_entries.flow.async_init(
DLNA_DOMAIN,
context={"source": config_entries.SOURCE_IMPORT},
data={
CONF_PLATFORM: DLNA_DOMAIN,
CONF_URL: MOCK_DEVICE_LOCATION,
CONF_NAME: IMPORTED_DEVICE_NAME,
CONF_CALLBACK_URL_OVERRIDE: "http://override/callback",
CONF_LISTEN_PORT: 2222,
},
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["errors"] == {}
assert result["step_id"] == "import_turn_on"
import_flow_id = result["flow_id"]
# User clicks submit, same form is displayed with an error
result = await hass.config_entries.flow.async_configure(
import_flow_id, user_input={}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["errors"] == {"base": "could_not_connect"}
assert result["step_id"] == "import_turn_on"
# Device is discovered via SSDP, new flow should not be initialized
ssdp_scanner_mock.async_get_discovery_info_by_st.side_effect = [
[MOCK_DISCOVERY],
[],
[],
]
domain_data_mock.upnp_factory.async_create_device.side_effect = None
result = await hass.config_entries.flow.async_init(
DLNA_DOMAIN,
context={"source": config_entries.SOURCE_SSDP},
data=MOCK_DISCOVERY,
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "already_in_progress"
# User clicks submit, config entry should be created
result = await hass.config_entries.flow.async_configure(
import_flow_id, user_input={}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["title"] == IMPORTED_DEVICE_NAME
assert result["data"] == {
CONF_URL: MOCK_DEVICE_LOCATION,
CONF_DEVICE_ID: MOCK_DEVICE_UDN,
CONF_TYPE: MOCK_DEVICE_TYPE,
}
# Options should be retained
assert result["options"] == {
CONF_LISTEN_PORT: 2222,
CONF_CALLBACK_URL_OVERRIDE: "http://override/callback",
CONF_POLL_AVAILABILITY: True,
}
# Wait for platform to be fully setup
await hass.async_block_till_done()
# Remove the device to clean up all resources, completing its life cycle
entry_id = result["result"].entry_id
assert await hass.config_entries.async_remove(entry_id) == {
"require_restart": False
}
async def test_import_flow_options( async def test_import_flow_options(
hass: HomeAssistant, ssdp_scanner_mock: Mock hass: HomeAssistant, ssdp_scanner_mock: Mock
) -> None: ) -> None:
@@ -358,128 +413,6 @@ async def test_import_flow_options(
} }
async def test_import_flow_deferred_ssdp(
hass: HomeAssistant, domain_data_mock: Mock, ssdp_scanner_mock: Mock
) -> None:
"""Test YAML import of unavailable device later found via SSDP."""
# Attempted import at hass start fails because device is unavailable
ssdp_scanner_mock.async_get_discovery_info_by_st.side_effect = [
[],
[],
[],
]
domain_data_mock.upnp_factory.async_create_device.side_effect = UpnpError
result = await hass.config_entries.flow.async_init(
DLNA_DOMAIN,
context={"source": config_entries.SOURCE_IMPORT},
data={
CONF_PLATFORM: DLNA_DOMAIN,
CONF_URL: MOCK_DEVICE_LOCATION,
CONF_NAME: IMPORTED_DEVICE_NAME,
CONF_LISTEN_PORT: 2222,
CONF_CALLBACK_URL_OVERRIDE: "http://override/callback",
},
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "could_not_connect"
# Device becomes available then discovered via SSDP, import now occurs automatically
ssdp_scanner_mock.async_get_discovery_info_by_st.side_effect = [
[MOCK_DISCOVERY],
[],
[],
]
domain_data_mock.upnp_factory.async_create_device.side_effect = None
result = await hass.config_entries.flow.async_init(
DLNA_DOMAIN,
context={"source": config_entries.SOURCE_SSDP},
data=MOCK_DISCOVERY,
)
await hass.async_block_till_done()
assert hass.config_entries.flow.async_progress(include_uninitialized=True) == []
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["title"] == IMPORTED_DEVICE_NAME
assert result["data"] == {
CONF_URL: MOCK_DEVICE_LOCATION,
CONF_DEVICE_ID: MOCK_DEVICE_UDN,
CONF_TYPE: MOCK_DEVICE_TYPE,
}
assert result["options"] == {
CONF_LISTEN_PORT: 2222,
CONF_CALLBACK_URL_OVERRIDE: "http://override/callback",
CONF_POLL_AVAILABILITY: False,
}
# Remove the device to clean up all resources, completing its life cycle
entry_id = result["result"].entry_id
assert await hass.config_entries.async_remove(entry_id) == {
"require_restart": False
}
async def test_import_flow_deferred_user(
hass: HomeAssistant, domain_data_mock: Mock, ssdp_scanner_mock: Mock
) -> None:
"""Test YAML import of unavailable device later added by user."""
# Attempted import at hass start fails because device is unavailable
ssdp_scanner_mock.async_get_discovery_info_by_st.return_value = []
domain_data_mock.upnp_factory.async_create_device.side_effect = UpnpError
result = await hass.config_entries.flow.async_init(
DLNA_DOMAIN,
context={"source": config_entries.SOURCE_IMPORT},
data={
CONF_PLATFORM: DLNA_DOMAIN,
CONF_URL: MOCK_DEVICE_LOCATION,
CONF_NAME: IMPORTED_DEVICE_NAME,
CONF_LISTEN_PORT: 2222,
CONF_CALLBACK_URL_OVERRIDE: "http://override/callback",
},
)
assert result["type"] == data_entry_flow.RESULT_TYPE_ABORT
assert result["reason"] == "could_not_connect"
# Device becomes available then added by user, use all imported settings
domain_data_mock.upnp_factory.async_create_device.side_effect = None
result = await hass.config_entries.flow.async_init(
DLNA_DOMAIN, context={"source": config_entries.SOURCE_USER}
)
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["errors"] == {}
assert result["step_id"] == "user"
result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input={CONF_URL: MOCK_DEVICE_LOCATION}
)
await hass.async_block_till_done()
assert hass.config_entries.flow.async_progress(include_uninitialized=True) == []
assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["title"] == IMPORTED_DEVICE_NAME
assert result["data"] == {
CONF_URL: MOCK_DEVICE_LOCATION,
CONF_DEVICE_ID: MOCK_DEVICE_UDN,
CONF_TYPE: MOCK_DEVICE_TYPE,
}
assert result["options"] == {
CONF_LISTEN_PORT: 2222,
CONF_CALLBACK_URL_OVERRIDE: "http://override/callback",
CONF_POLL_AVAILABILITY: True,
}
# Remove the device to clean up all resources, completing its life cycle
entry_id = result["result"].entry_id
assert await hass.config_entries.async_remove(entry_id) == {
"require_restart": False
}
async def test_ssdp_flow_success(hass: HomeAssistant) -> None: async def test_ssdp_flow_success(hass: HomeAssistant) -> None:
"""Test that SSDP discovery with an available device works.""" """Test that SSDP discovery with an available device works."""
result = await hass.config_entries.flow.async_init( result = await hass.config_entries.flow.async_init(
@@ -488,7 +421,6 @@ async def test_ssdp_flow_success(hass: HomeAssistant) -> None:
data=MOCK_DISCOVERY, data=MOCK_DISCOVERY,
) )
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["errors"] == {}
assert result["step_id"] == "confirm" assert result["step_id"] == "confirm"
result = await hass.config_entries.flow.async_configure( result = await hass.config_entries.flow.async_configure(
@@ -515,10 +447,10 @@ async def test_ssdp_flow_success(hass: HomeAssistant) -> None:
async def test_ssdp_flow_unavailable( async def test_ssdp_flow_unavailable(
hass: HomeAssistant, domain_data_mock: Mock hass: HomeAssistant, domain_data_mock: Mock
) -> None: ) -> None:
"""Test that SSDP discovery with an unavailable device gives an error message. """Test that SSDP discovery with an unavailable device still succeeds.
This may occur if the device is turned on, discovered, then turned off All the required information for configuration is obtained from the SSDP
before the user attempts to add it. message, there's no need to connect to the device to configure it.
""" """
result = await hass.config_entries.flow.async_init( result = await hass.config_entries.flow.async_init(
DLNA_DOMAIN, DLNA_DOMAIN,
@@ -526,7 +458,6 @@ async def test_ssdp_flow_unavailable(
data=MOCK_DISCOVERY, data=MOCK_DISCOVERY,
) )
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM assert result["type"] == data_entry_flow.RESULT_TYPE_FORM
assert result["errors"] == {}
assert result["step_id"] == "confirm" assert result["step_id"] == "confirm"
domain_data_mock.upnp_factory.async_create_device.side_effect = UpnpError domain_data_mock.upnp_factory.async_create_device.side_effect = UpnpError
@@ -534,9 +465,22 @@ async def test_ssdp_flow_unavailable(
result = await hass.config_entries.flow.async_configure( result = await hass.config_entries.flow.async_configure(
result["flow_id"], user_input={} result["flow_id"], user_input={}
) )
assert result["type"] == data_entry_flow.RESULT_TYPE_FORM await hass.async_block_till_done()
assert result["errors"] == {"base": "could_not_connect"}
assert result["step_id"] == "confirm" assert result["type"] == data_entry_flow.RESULT_TYPE_CREATE_ENTRY
assert result["title"] == MOCK_DEVICE_NAME
assert result["data"] == {
CONF_URL: MOCK_DEVICE_LOCATION,
CONF_DEVICE_ID: MOCK_DEVICE_UDN,
CONF_TYPE: MOCK_DEVICE_TYPE,
}
assert result["options"] == {}
# Remove the device to clean up all resources, completing its life cycle
entry_id = result["result"].entry_id
assert await hass.config_entries.async_remove(entry_id) == {
"require_restart": False
}
async def test_ssdp_flow_existing( async def test_ssdp_flow_existing(

View File

@@ -116,6 +116,10 @@ async def test_setup_platform_import_flow_started(
hass: HomeAssistant, domain_data_mock: Mock hass: HomeAssistant, domain_data_mock: Mock
) -> None: ) -> None:
"""Test import flow of YAML config is started if there's config data.""" """Test import flow of YAML config is started if there's config data."""
# Cause connection attempts to fail
domain_data_mock.upnp_factory.async_create_device.side_effect = UpnpConnectionError
# Run the setup
mock_config: ConfigType = { mock_config: ConfigType = {
MP_DOMAIN: [ MP_DOMAIN: [
{ {
@@ -126,30 +130,18 @@ async def test_setup_platform_import_flow_started(
] ]
} }
# Device is not available yet
domain_data_mock.upnp_factory.async_create_device.side_effect = UpnpError
# Run the setup
await async_setup_component(hass, MP_DOMAIN, mock_config) await async_setup_component(hass, MP_DOMAIN, mock_config)
await hass.async_block_till_done() await hass.async_block_till_done()
# Check config_flow has completed # Check config_flow has started
assert hass.config_entries.flow.async_progress(include_uninitialized=True) == [] flows = hass.config_entries.flow.async_progress(include_uninitialized=True)
assert len(flows) == 1
# Check device contact attempt was made # It should be paused, waiting for the user to turn on the device
domain_data_mock.upnp_factory.async_create_device.assert_awaited_once_with( flow = flows[0]
MOCK_DEVICE_LOCATION assert flow["handler"] == "dlna_dmr"
) assert flow["step_id"] == "import_turn_on"
assert flow["context"].get("unique_id") == MOCK_DEVICE_LOCATION
# Check the device is added to the unmigrated configs
assert domain_data_mock.unmigrated_config == {
MOCK_DEVICE_LOCATION: {
CONF_PLATFORM: DLNA_DOMAIN,
CONF_URL: MOCK_DEVICE_LOCATION,
CONF_LISTEN_PORT: 1234,
CONF_NAME: DEFAULT_NAME,
}
}
async def test_setup_entry_no_options( async def test_setup_entry_no_options(