mirror of
https://github.com/home-assistant/core.git
synced 2026-04-21 09:01:10 +02:00
Implement reconfiguration flow for UniFi Protect integration (#157532)
Co-authored-by: J. Nick Koston <nick@koston.org>
This commit is contained in:
@@ -31,6 +31,7 @@ from homeassistant.const import (
|
||||
CONF_VERIFY_SSL,
|
||||
)
|
||||
from homeassistant.core import HomeAssistant, callback
|
||||
from homeassistant.helpers import selector
|
||||
from homeassistant.helpers.aiohttp_client import (
|
||||
async_create_clientsession,
|
||||
async_get_clientsession,
|
||||
@@ -56,15 +57,113 @@ from .const import (
|
||||
)
|
||||
from .data import UFPConfigEntry, async_last_update_was_successful
|
||||
from .discovery import async_start_discovery
|
||||
from .utils import _async_resolve, _async_short_mac, _async_unifi_mac_from_hass
|
||||
from .utils import (
|
||||
_async_resolve,
|
||||
_async_short_mac,
|
||||
_async_unifi_mac_from_hass,
|
||||
async_create_api_client,
|
||||
)
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _filter_empty_credentials(user_input: dict[str, Any]) -> dict[str, Any]:
|
||||
"""Filter out empty credential fields to preserve existing values."""
|
||||
return {k: v for k, v in user_input.items() if v not in (None, "")}
|
||||
|
||||
|
||||
def _normalize_port(data: dict[str, Any]) -> dict[str, Any]:
|
||||
"""Ensure port is stored as int (NumberSelector returns float)."""
|
||||
return {**data, CONF_PORT: int(data.get(CONF_PORT, DEFAULT_PORT))}
|
||||
|
||||
|
||||
def _build_data_without_credentials(entry_data: Mapping[str, Any]) -> dict[str, Any]:
|
||||
"""Build form data from existing config entry, excluding sensitive credentials."""
|
||||
return {
|
||||
CONF_HOST: entry_data[CONF_HOST],
|
||||
CONF_PORT: entry_data[CONF_PORT],
|
||||
CONF_VERIFY_SSL: entry_data[CONF_VERIFY_SSL],
|
||||
CONF_USERNAME: entry_data[CONF_USERNAME],
|
||||
}
|
||||
|
||||
|
||||
async def _async_clear_session_if_credentials_changed(
|
||||
hass: HomeAssistant,
|
||||
entry: UFPConfigEntry,
|
||||
new_data: Mapping[str, Any],
|
||||
) -> None:
|
||||
"""Clear stored session if credentials have changed to force fresh authentication."""
|
||||
existing_data = entry.data
|
||||
if existing_data.get(CONF_USERNAME) != new_data.get(
|
||||
CONF_USERNAME
|
||||
) or existing_data.get(CONF_PASSWORD) != new_data.get(CONF_PASSWORD):
|
||||
_LOGGER.debug("Credentials changed, clearing stored session")
|
||||
protect = async_create_api_client(hass, entry)
|
||||
try:
|
||||
await protect.clear_session()
|
||||
except Exception as ex: # noqa: BLE001
|
||||
_LOGGER.debug("Failed to clear session, continuing anyway: %s", ex)
|
||||
|
||||
|
||||
ENTRY_FAILURE_STATES = (
|
||||
ConfigEntryState.SETUP_ERROR,
|
||||
ConfigEntryState.SETUP_RETRY,
|
||||
)
|
||||
|
||||
# Selectors for config flow form fields
|
||||
_TEXT_SELECTOR = selector.TextSelector()
|
||||
_PASSWORD_SELECTOR = selector.TextSelector(
|
||||
selector.TextSelectorConfig(type=selector.TextSelectorType.PASSWORD)
|
||||
)
|
||||
_PORT_SELECTOR = selector.NumberSelector(
|
||||
selector.NumberSelectorConfig(
|
||||
mode=selector.NumberSelectorMode.BOX, min=1, max=65535
|
||||
)
|
||||
)
|
||||
_BOOL_SELECTOR = selector.BooleanSelector()
|
||||
|
||||
|
||||
def _build_schema(
|
||||
*,
|
||||
include_host: bool = True,
|
||||
include_connection: bool = True,
|
||||
credentials_optional: bool = False,
|
||||
) -> vol.Schema:
|
||||
"""Build a config flow schema.
|
||||
|
||||
Args:
|
||||
include_host: Include host field (False when host comes from discovery).
|
||||
include_connection: Include port/verify_ssl fields.
|
||||
credentials_optional: Credentials optional (True to keep existing values).
|
||||
|
||||
"""
|
||||
req, opt = vol.Required, vol.Optional
|
||||
cred_key = opt if credentials_optional else req
|
||||
|
||||
schema: dict[vol.Marker, selector.Selector] = {}
|
||||
if include_host:
|
||||
schema[req(CONF_HOST)] = _TEXT_SELECTOR
|
||||
if include_connection:
|
||||
schema[req(CONF_PORT, default=DEFAULT_PORT)] = _PORT_SELECTOR
|
||||
schema[req(CONF_VERIFY_SSL, default=DEFAULT_VERIFY_SSL)] = _BOOL_SELECTOR
|
||||
schema[req(CONF_USERNAME)] = _TEXT_SELECTOR
|
||||
schema[cred_key(CONF_PASSWORD)] = _PASSWORD_SELECTOR
|
||||
schema[cred_key(CONF_API_KEY)] = _PASSWORD_SELECTOR
|
||||
return vol.Schema(schema)
|
||||
|
||||
|
||||
# Schemas for different flow contexts
|
||||
# User flow: all fields required
|
||||
CONFIG_SCHEMA = _build_schema()
|
||||
# Reconfigure flow: keep existing credentials if not provided
|
||||
RECONFIGURE_SCHEMA = _build_schema(credentials_optional=True)
|
||||
# Discovery flow: host comes from discovery, user sets port/ssl
|
||||
DISCOVERY_SCHEMA = _build_schema(include_host=False)
|
||||
# Reauth flow: only credentials, connection settings preserved
|
||||
REAUTH_SCHEMA = _build_schema(
|
||||
include_host=False, include_connection=False, credentials_optional=True
|
||||
)
|
||||
|
||||
|
||||
async def async_local_user_documentation_url(hass: HomeAssistant) -> str:
|
||||
"""Get the documentation url for creating a local user."""
|
||||
@@ -178,19 +277,40 @@ class ProtectFlowHandler(ConfigFlow, domain=DOMAIN):
|
||||
"""Confirm discovery."""
|
||||
errors: dict[str, str] = {}
|
||||
discovery_info = self._discovered_device
|
||||
|
||||
form_data = {
|
||||
CONF_HOST: discovery_info["direct_connect_domain"]
|
||||
or discovery_info["source_ip"],
|
||||
CONF_PORT: DEFAULT_PORT,
|
||||
CONF_VERIFY_SSL: bool(discovery_info["direct_connect_domain"]),
|
||||
CONF_USERNAME: "",
|
||||
CONF_PASSWORD: "",
|
||||
}
|
||||
|
||||
if user_input is not None:
|
||||
user_input[CONF_PORT] = DEFAULT_PORT
|
||||
# Merge user input with discovery info
|
||||
merged_input = {**form_data, **user_input}
|
||||
nvr_data = None
|
||||
if discovery_info["direct_connect_domain"]:
|
||||
user_input[CONF_HOST] = discovery_info["direct_connect_domain"]
|
||||
user_input[CONF_VERIFY_SSL] = True
|
||||
nvr_data, errors = await self._async_get_nvr_data(user_input)
|
||||
merged_input[CONF_HOST] = discovery_info["direct_connect_domain"]
|
||||
merged_input[CONF_VERIFY_SSL] = True
|
||||
nvr_data, errors = await self._async_get_nvr_data(merged_input)
|
||||
if not nvr_data or errors:
|
||||
user_input[CONF_HOST] = discovery_info["source_ip"]
|
||||
user_input[CONF_VERIFY_SSL] = False
|
||||
nvr_data, errors = await self._async_get_nvr_data(user_input)
|
||||
merged_input[CONF_HOST] = discovery_info["source_ip"]
|
||||
merged_input[CONF_VERIFY_SSL] = False
|
||||
nvr_data, errors = await self._async_get_nvr_data(merged_input)
|
||||
if nvr_data and not errors:
|
||||
return self._async_create_entry(nvr_data.display_name, user_input)
|
||||
return self._async_create_entry(nvr_data.display_name, merged_input)
|
||||
# Preserve user input for form re-display, but keep discovery info
|
||||
form_data = {
|
||||
CONF_HOST: merged_input[CONF_HOST],
|
||||
CONF_PORT: merged_input[CONF_PORT],
|
||||
CONF_VERIFY_SSL: merged_input[CONF_VERIFY_SSL],
|
||||
CONF_USERNAME: user_input.get(CONF_USERNAME, ""),
|
||||
CONF_PASSWORD: user_input.get(CONF_PASSWORD, ""),
|
||||
}
|
||||
if CONF_API_KEY in user_input:
|
||||
form_data[CONF_API_KEY] = user_input[CONF_API_KEY]
|
||||
|
||||
placeholders = {
|
||||
"name": discovery_info["hostname"]
|
||||
@@ -199,7 +319,6 @@ class ProtectFlowHandler(ConfigFlow, domain=DOMAIN):
|
||||
"ip_address": discovery_info["source_ip"],
|
||||
}
|
||||
self.context["title_placeholders"] = placeholders
|
||||
user_input = user_input or {}
|
||||
return self.async_show_form(
|
||||
step_id="discovery_confirm",
|
||||
description_placeholders={
|
||||
@@ -208,14 +327,8 @@ class ProtectFlowHandler(ConfigFlow, domain=DOMAIN):
|
||||
self.hass
|
||||
),
|
||||
},
|
||||
data_schema=vol.Schema(
|
||||
{
|
||||
vol.Required(
|
||||
CONF_USERNAME, default=user_input.get(CONF_USERNAME)
|
||||
): str,
|
||||
vol.Required(CONF_PASSWORD): str,
|
||||
vol.Required(CONF_API_KEY): str,
|
||||
}
|
||||
data_schema=self.add_suggested_values_to_schema(
|
||||
DISCOVERY_SCHEMA, form_data
|
||||
),
|
||||
errors=errors,
|
||||
)
|
||||
@@ -232,7 +345,7 @@ class ProtectFlowHandler(ConfigFlow, domain=DOMAIN):
|
||||
def _async_create_entry(self, title: str, data: dict[str, Any]) -> ConfigFlowResult:
|
||||
return self.async_create_entry(
|
||||
title=title,
|
||||
data={**data, CONF_ID: title},
|
||||
data={**_normalize_port(data), CONF_ID: title},
|
||||
options={
|
||||
CONF_DISABLE_RTSP: False,
|
||||
CONF_ALL_UPDATES: False,
|
||||
@@ -251,7 +364,7 @@ class ProtectFlowHandler(ConfigFlow, domain=DOMAIN):
|
||||
public_api_session = async_get_clientsession(self.hass)
|
||||
|
||||
host = user_input[CONF_HOST]
|
||||
port = user_input.get(CONF_PORT, DEFAULT_PORT)
|
||||
port = int(user_input.get(CONF_PORT, DEFAULT_PORT))
|
||||
verify_ssl = user_input.get(CONF_VERIFY_SSL, DEFAULT_VERIFY_SSL)
|
||||
|
||||
protect = ProtectApiClient(
|
||||
@@ -261,7 +374,7 @@ class ProtectFlowHandler(ConfigFlow, domain=DOMAIN):
|
||||
port=port,
|
||||
username=user_input[CONF_USERNAME],
|
||||
password=user_input[CONF_PASSWORD],
|
||||
api_key=user_input[CONF_API_KEY],
|
||||
api_key=user_input.get(CONF_API_KEY, ""),
|
||||
verify_ssl=verify_ssl,
|
||||
cache_dir=Path(self.hass.config.path(STORAGE_DIR, "unifiprotect")),
|
||||
config_dir=Path(self.hass.config.path(STORAGE_DIR, "unifiprotect")),
|
||||
@@ -290,14 +403,17 @@ class ProtectFlowHandler(ConfigFlow, domain=DOMAIN):
|
||||
auth_user = bootstrap.users.get(bootstrap.auth_user_id)
|
||||
if auth_user and auth_user.cloud_account:
|
||||
errors["base"] = "cloud_user"
|
||||
try:
|
||||
await protect.get_meta_info()
|
||||
except NotAuthorized as ex:
|
||||
_LOGGER.debug(ex)
|
||||
errors[CONF_API_KEY] = "invalid_auth"
|
||||
except ClientError as ex:
|
||||
_LOGGER.error(ex)
|
||||
errors["base"] = "cannot_connect"
|
||||
|
||||
# Only validate API key if bootstrap succeeded
|
||||
if nvr_data and not errors:
|
||||
try:
|
||||
await protect.get_meta_info()
|
||||
except NotAuthorized as ex:
|
||||
_LOGGER.debug(ex)
|
||||
errors[CONF_API_KEY] = "invalid_auth"
|
||||
except ClientError as ex:
|
||||
_LOGGER.error(ex)
|
||||
errors["base"] = "cannot_connect"
|
||||
|
||||
return nvr_data, errors
|
||||
|
||||
@@ -313,16 +429,27 @@ class ProtectFlowHandler(ConfigFlow, domain=DOMAIN):
|
||||
"""Confirm reauth."""
|
||||
errors: dict[str, str] = {}
|
||||
|
||||
# prepopulate fields
|
||||
reauth_entry = self._get_reauth_entry()
|
||||
form_data = {**reauth_entry.data}
|
||||
form_data = _build_data_without_credentials(reauth_entry.data)
|
||||
|
||||
if user_input is not None:
|
||||
form_data.update(user_input)
|
||||
# Merge with existing config - empty credentials keep existing values
|
||||
merged_input = {
|
||||
**reauth_entry.data,
|
||||
**_filter_empty_credentials(user_input),
|
||||
}
|
||||
|
||||
# Clear stored session if credentials changed to force fresh authentication
|
||||
await _async_clear_session_if_credentials_changed(
|
||||
self.hass, reauth_entry, merged_input
|
||||
)
|
||||
|
||||
# validate login data
|
||||
_, errors = await self._async_get_nvr_data(form_data)
|
||||
_, errors = await self._async_get_nvr_data(merged_input)
|
||||
if not errors:
|
||||
return self.async_update_reload_and_abort(reauth_entry, data=form_data)
|
||||
return self.async_update_reload_and_abort(
|
||||
reauth_entry, data=_normalize_port(merged_input)
|
||||
)
|
||||
|
||||
self.context["title_placeholders"] = {
|
||||
"name": reauth_entry.title,
|
||||
@@ -335,14 +462,58 @@ class ProtectFlowHandler(ConfigFlow, domain=DOMAIN):
|
||||
self.hass
|
||||
),
|
||||
},
|
||||
data_schema=vol.Schema(
|
||||
{
|
||||
vol.Required(
|
||||
CONF_USERNAME, default=form_data.get(CONF_USERNAME)
|
||||
): str,
|
||||
vol.Required(CONF_PASSWORD): str,
|
||||
vol.Required(CONF_API_KEY): str,
|
||||
}
|
||||
data_schema=self.add_suggested_values_to_schema(REAUTH_SCHEMA, form_data),
|
||||
errors=errors,
|
||||
)
|
||||
|
||||
async def async_step_reconfigure(
|
||||
self, user_input: dict[str, Any] | None = None
|
||||
) -> ConfigFlowResult:
|
||||
"""Handle reconfiguration of the integration."""
|
||||
errors: dict[str, str] = {}
|
||||
|
||||
reconfigure_entry = self._get_reconfigure_entry()
|
||||
form_data = _build_data_without_credentials(reconfigure_entry.data)
|
||||
|
||||
if user_input is not None:
|
||||
# Merge with existing config - empty credentials keep existing values
|
||||
merged_input = {
|
||||
**reconfigure_entry.data,
|
||||
**_filter_empty_credentials(user_input),
|
||||
}
|
||||
|
||||
# Clear stored session if credentials changed to force fresh authentication
|
||||
await _async_clear_session_if_credentials_changed(
|
||||
self.hass, reconfigure_entry, merged_input
|
||||
)
|
||||
|
||||
# validate login data
|
||||
nvr_data, errors = await self._async_get_nvr_data(merged_input)
|
||||
if nvr_data and not errors:
|
||||
new_unique_id = _async_unifi_mac_from_hass(nvr_data.mac)
|
||||
_LOGGER.debug(
|
||||
"Reconfigure: Current unique_id=%s, NVR MAC=%s, formatted=%s",
|
||||
reconfigure_entry.unique_id,
|
||||
nvr_data.mac,
|
||||
new_unique_id,
|
||||
)
|
||||
await self.async_set_unique_id(new_unique_id)
|
||||
self._abort_if_unique_id_mismatch(reason="wrong_nvr")
|
||||
|
||||
return self.async_update_reload_and_abort(
|
||||
reconfigure_entry,
|
||||
data=_normalize_port(merged_input),
|
||||
)
|
||||
|
||||
return self.async_show_form(
|
||||
step_id="reconfigure",
|
||||
description_placeholders={
|
||||
"local_user_documentation_url": await async_local_user_documentation_url(
|
||||
self.hass
|
||||
),
|
||||
},
|
||||
data_schema=self.add_suggested_values_to_schema(
|
||||
RECONFIGURE_SCHEMA, form_data
|
||||
),
|
||||
errors=errors,
|
||||
)
|
||||
@@ -362,7 +533,6 @@ class ProtectFlowHandler(ConfigFlow, domain=DOMAIN):
|
||||
|
||||
return self._async_create_entry(nvr_data.display_name, user_input)
|
||||
|
||||
user_input = user_input or {}
|
||||
return self.async_show_form(
|
||||
step_id="user",
|
||||
description_placeholders={
|
||||
@@ -370,23 +540,7 @@ class ProtectFlowHandler(ConfigFlow, domain=DOMAIN):
|
||||
self.hass
|
||||
)
|
||||
},
|
||||
data_schema=vol.Schema(
|
||||
{
|
||||
vol.Required(CONF_HOST, default=user_input.get(CONF_HOST)): str,
|
||||
vol.Required(
|
||||
CONF_PORT, default=user_input.get(CONF_PORT, DEFAULT_PORT)
|
||||
): int,
|
||||
vol.Required(
|
||||
CONF_VERIFY_SSL,
|
||||
default=user_input.get(CONF_VERIFY_SSL, DEFAULT_VERIFY_SSL),
|
||||
): bool,
|
||||
vol.Required(
|
||||
CONF_USERNAME, default=user_input.get(CONF_USERNAME)
|
||||
): str,
|
||||
vol.Required(CONF_PASSWORD): str,
|
||||
vol.Required(CONF_API_KEY): str,
|
||||
}
|
||||
),
|
||||
data_schema=self.add_suggested_values_to_schema(CONFIG_SCHEMA, user_input),
|
||||
errors=errors,
|
||||
)
|
||||
|
||||
|
||||
@@ -3,7 +3,9 @@
|
||||
"abort": {
|
||||
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]",
|
||||
"discovery_started": "Discovery started",
|
||||
"reauth_successful": "[%key:common::config_flow::abort::reauth_successful%]"
|
||||
"reauth_successful": "[%key:common::config_flow::abort::reauth_successful%]",
|
||||
"reconfigure_successful": "[%key:common::config_flow::abort::reconfigure_successful%]",
|
||||
"wrong_nvr": "Connected to a different NVR than expected. If you replaced your hardware, please remove the old integration and add it again."
|
||||
},
|
||||
"error": {
|
||||
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
|
||||
@@ -17,12 +19,16 @@
|
||||
"data": {
|
||||
"api_key": "[%key:common::config_flow::data::api_key%]",
|
||||
"password": "[%key:common::config_flow::data::password%]",
|
||||
"username": "[%key:common::config_flow::data::username%]"
|
||||
"port": "[%key:common::config_flow::data::port%]",
|
||||
"username": "[%key:common::config_flow::data::username%]",
|
||||
"verify_ssl": "[%key:common::config_flow::data::verify_ssl%]"
|
||||
},
|
||||
"data_description": {
|
||||
"api_key": "[%key:component::unifiprotect::config::step::user::data_description::api_key%]",
|
||||
"password": "[%key:component::unifiprotect::config::step::user::data_description::password%]",
|
||||
"username": "[%key:component::unifiprotect::config::step::user::data_description::username%]"
|
||||
"port": "[%key:component::unifiprotect::config::step::user::data_description::port%]",
|
||||
"username": "[%key:component::unifiprotect::config::step::user::data_description::username%]",
|
||||
"verify_ssl": "[%key:component::unifiprotect::config::step::user::data_description::verify_ssl%]"
|
||||
},
|
||||
"description": "Do you want to set up {name} ({ip_address})? You will need a local user created in your UniFi OS Console to log in with. Ubiquiti Cloud users will not work. For more information: {local_user_documentation_url}",
|
||||
"title": "UniFi Protect discovered"
|
||||
@@ -30,20 +36,36 @@
|
||||
"reauth_confirm": {
|
||||
"data": {
|
||||
"api_key": "[%key:common::config_flow::data::api_key%]",
|
||||
"host": "IP/Host of UniFi Protect server",
|
||||
"password": "[%key:common::config_flow::data::password%]",
|
||||
"port": "[%key:common::config_flow::data::port%]",
|
||||
"username": "[%key:common::config_flow::data::username%]"
|
||||
},
|
||||
"data_description": {
|
||||
"api_key": "[%key:component::unifiprotect::config::step::user::data_description::api_key%]",
|
||||
"host": "[%key:component::unifiprotect::config::step::user::data_description::host%]",
|
||||
"password": "[%key:component::unifiprotect::config::step::user::data_description::password%]",
|
||||
"port": "[%key:component::unifiprotect::config::step::user::data_description::port%]",
|
||||
"api_key": "API key for your local user account. Leave empty to keep your existing API key.",
|
||||
"password": "Password for your local user account. Leave empty to keep your existing password.",
|
||||
"username": "[%key:component::unifiprotect::config::step::user::data_description::username%]"
|
||||
},
|
||||
"description": "Your credentials or API key seem to be missing or invalid. For instructions on how to create a local user or generate a new API key, please refer to the documentation: {local_user_documentation_url}",
|
||||
"title": "UniFi Protect reauth"
|
||||
"description": "Your credentials or API key seem to be missing or invalid. Leave password and API key empty to keep your existing credentials. For more information: {local_user_documentation_url}",
|
||||
"title": "Reauth UniFi Protect"
|
||||
},
|
||||
"reconfigure": {
|
||||
"data": {
|
||||
"api_key": "[%key:common::config_flow::data::api_key%]",
|
||||
"host": "[%key:common::config_flow::data::host%]",
|
||||
"password": "[%key:common::config_flow::data::password%]",
|
||||
"port": "[%key:common::config_flow::data::port%]",
|
||||
"username": "[%key:common::config_flow::data::username%]",
|
||||
"verify_ssl": "[%key:common::config_flow::data::verify_ssl%]"
|
||||
},
|
||||
"data_description": {
|
||||
"api_key": "[%key:component::unifiprotect::config::step::reauth_confirm::data_description::api_key%]",
|
||||
"host": "[%key:component::unifiprotect::config::step::user::data_description::host%]",
|
||||
"password": "[%key:component::unifiprotect::config::step::reauth_confirm::data_description::password%]",
|
||||
"port": "[%key:component::unifiprotect::config::step::user::data_description::port%]",
|
||||
"username": "[%key:component::unifiprotect::config::step::user::data_description::username%]",
|
||||
"verify_ssl": "[%key:component::unifiprotect::config::step::user::data_description::verify_ssl%]"
|
||||
},
|
||||
"description": "Update the configuration for your UniFi Protect device. Leave password and API key empty to keep your existing credentials. For more information: {local_user_documentation_url}",
|
||||
"title": "Reconfigure UniFi Protect"
|
||||
},
|
||||
"user": {
|
||||
"data": {
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Callable
|
||||
from collections.abc import Callable, Generator
|
||||
from datetime import datetime, timedelta
|
||||
from functools import partial
|
||||
from ipaddress import IPv4Address
|
||||
@@ -32,7 +32,15 @@ from uiprotect.data import (
|
||||
from uiprotect.websocket import WebsocketState
|
||||
|
||||
from homeassistant.components.unifiprotect.const import DOMAIN
|
||||
from homeassistant.const import CONF_API_KEY
|
||||
from homeassistant.components.unifiprotect.utils import _async_unifi_mac_from_hass
|
||||
from homeassistant.const import (
|
||||
CONF_API_KEY,
|
||||
CONF_HOST,
|
||||
CONF_PASSWORD,
|
||||
CONF_PORT,
|
||||
CONF_USERNAME,
|
||||
CONF_VERIFY_SSL,
|
||||
)
|
||||
from homeassistant.core import HomeAssistant
|
||||
from homeassistant.util import dt as dt_util
|
||||
|
||||
@@ -43,6 +51,14 @@ from tests.common import MockConfigEntry, load_fixture
|
||||
|
||||
MAC_ADDR = "aa:bb:cc:dd:ee:ff"
|
||||
|
||||
# Common test data constants
|
||||
DEFAULT_HOST = "1.1.1.1"
|
||||
DEFAULT_PORT = 443
|
||||
DEFAULT_VERIFY_SSL = False
|
||||
DEFAULT_USERNAME = "test-username"
|
||||
DEFAULT_PASSWORD = "test-password"
|
||||
DEFAULT_API_KEY = "test-api-key"
|
||||
|
||||
|
||||
@pytest.fixture(name="nvr")
|
||||
def mock_nvr():
|
||||
@@ -66,13 +82,13 @@ def mock_ufp_config_entry():
|
||||
return MockConfigEntry(
|
||||
domain=DOMAIN,
|
||||
data={
|
||||
"host": "1.1.1.1",
|
||||
"username": "test-username",
|
||||
"password": "test-password",
|
||||
CONF_API_KEY: "test-api-key",
|
||||
CONF_HOST: DEFAULT_HOST,
|
||||
CONF_USERNAME: DEFAULT_USERNAME,
|
||||
CONF_PASSWORD: DEFAULT_PASSWORD,
|
||||
CONF_API_KEY: DEFAULT_API_KEY,
|
||||
"id": "UnifiProtect",
|
||||
"port": 443,
|
||||
"verify_ssl": False,
|
||||
CONF_PORT: DEFAULT_PORT,
|
||||
CONF_VERIFY_SSL: DEFAULT_VERIFY_SSL,
|
||||
},
|
||||
version=2,
|
||||
)
|
||||
@@ -371,6 +387,78 @@ def fixed_now_fixture():
|
||||
return dt_util.utcnow()
|
||||
|
||||
|
||||
@pytest.fixture(name="ufp_reauth_entry")
|
||||
def mock_ufp_reauth_entry():
|
||||
"""Mock the unifiprotect config entry for reauth and reconfigure tests."""
|
||||
return MockConfigEntry(
|
||||
domain=DOMAIN,
|
||||
data={
|
||||
CONF_HOST: DEFAULT_HOST,
|
||||
CONF_USERNAME: DEFAULT_USERNAME,
|
||||
CONF_PASSWORD: DEFAULT_PASSWORD,
|
||||
CONF_API_KEY: DEFAULT_API_KEY,
|
||||
"id": "UnifiProtect",
|
||||
CONF_PORT: DEFAULT_PORT,
|
||||
CONF_VERIFY_SSL: DEFAULT_VERIFY_SSL,
|
||||
},
|
||||
unique_id=_async_unifi_mac_from_hass(MAC_ADDR),
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(name="ufp_reauth_entry_alt")
|
||||
def mock_ufp_reauth_entry_alt():
|
||||
"""Mock the unifiprotect config entry with alternate port/SSL for reauth/reconfigure tests."""
|
||||
return MockConfigEntry(
|
||||
domain=DOMAIN,
|
||||
data={
|
||||
CONF_HOST: DEFAULT_HOST,
|
||||
CONF_USERNAME: DEFAULT_USERNAME,
|
||||
CONF_PASSWORD: DEFAULT_PASSWORD,
|
||||
CONF_API_KEY: DEFAULT_API_KEY,
|
||||
"id": "UnifiProtect",
|
||||
CONF_PORT: 8443,
|
||||
CONF_VERIFY_SSL: True,
|
||||
},
|
||||
unique_id=_async_unifi_mac_from_hass(MAC_ADDR),
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(name="mock_setup")
|
||||
def mock_setup_fixture() -> Generator[AsyncMock]:
|
||||
"""Mock async_setup and async_setup_entry to prevent reload issues in tests."""
|
||||
with (
|
||||
patch(
|
||||
"homeassistant.components.unifiprotect.async_setup",
|
||||
return_value=True,
|
||||
),
|
||||
patch(
|
||||
"homeassistant.components.unifiprotect.async_setup_entry",
|
||||
return_value=True,
|
||||
) as mock,
|
||||
):
|
||||
yield mock
|
||||
|
||||
|
||||
@pytest.fixture(name="mock_api_bootstrap")
|
||||
def mock_api_bootstrap_fixture(bootstrap: Bootstrap):
|
||||
"""Mock the ProtectApiClient.get_bootstrap method."""
|
||||
with patch(
|
||||
"homeassistant.components.unifiprotect.config_flow.ProtectApiClient.get_bootstrap",
|
||||
return_value=bootstrap,
|
||||
) as mock:
|
||||
yield mock
|
||||
|
||||
|
||||
@pytest.fixture(name="mock_api_meta_info")
|
||||
def mock_api_meta_info_fixture():
|
||||
"""Mock the ProtectApiClient.get_meta_info method."""
|
||||
with patch(
|
||||
"homeassistant.components.unifiprotect.config_flow.ProtectApiClient.get_meta_info",
|
||||
return_value=None,
|
||||
) as mock:
|
||||
yield mock
|
||||
|
||||
|
||||
@pytest.fixture(name="cloud_account")
|
||||
def cloud_account() -> CloudAccount:
|
||||
"""Return UI Cloud Account."""
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user