Compare commits

..

9 Commits

Author SHA1 Message Date
dependabot[bot] 1d42402735 Bump actions/setup-python from 6.2.0 to 6.3.0
Bumps [actions/setup-python](https://github.com/actions/setup-python) from 6.2.0 to 6.3.0.
- [Release notes](https://github.com/actions/setup-python/releases)
- [Commits](https://github.com/actions/setup-python/compare/a309ff8b426b58ec0e2a45f0f869d46889d02405...ece7cb06caefa5fff74198d8649806c4678c61a1)

---
updated-dependencies:
- dependency-name: actions/setup-python
  dependency-version: 6.3.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2026-06-30 06:04:06 +00:00
Erik Montnemery a0d96a2c62 Remove no longer needed sleep from HomeAssistant.async_start (#175152) 2026-06-30 06:36:25 +02:00
renovate[bot] eb9ebd17b6 Update coverage to 7.14.3 (#175155) 2026-06-30 06:32:50 +02:00
Robert Resch 5ea38fcc07 Bump uv to 0.11.25 (#175153) 2026-06-30 06:27:59 +02:00
Robert Resch 15ef228cfa Bump wheels and base image to 2026.07.0 to use alpine 3.24 (#175133) 2026-06-29 23:41:01 +02:00
Arie Catsman e815c9f0cc bump pyenphase to 3.0.1 (#175141) 2026-06-29 23:31:43 +03:00
Simone Chemelli 434b3ca309 Bump librouteros to 4.1.1 (#175116) 2026-06-29 20:04:32 +01:00
Markus Tuominen a557e96c53 Add walk_checker helper to deduplicate pylint rule tests (#175113) 2026-06-29 20:41:07 +02:00
Jan Bouwhuis 324c95140b Refactor MQTT config entry (#173929)
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Ariel Ebersberger <31776703+justanotherariel@users.noreply.github.com>
2026-06-29 20:07:26 +02:00
125 changed files with 1122 additions and 1813 deletions
+4 -4
View File
@@ -14,7 +14,7 @@ env:
UV_HTTP_TIMEOUT: 60
UV_SYSTEM_PYTHON: "true"
# Base image version from https://github.com/home-assistant/docker
BASE_IMAGE_VERSION: "2026.05.0"
BASE_IMAGE_VERSION: "2026.07.0"
ARCHITECTURES: '["amd64", "aarch64"]'
permissions: {}
@@ -43,7 +43,7 @@ jobs:
persist-credentials: false
- name: Set up Python
uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0
uses: actions/setup-python@ece7cb06caefa5fff74198d8649806c4678c61a1 # v6.3.0
with:
python-version-file: ".python-version"
@@ -130,7 +130,7 @@ jobs:
- name: Set up Python
if: needs.init.outputs.channel == 'dev'
uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0
uses: actions/setup-python@ece7cb06caefa5fff74198d8649806c4678c61a1 # v6.3.0
with:
python-version-file: ".python-version"
@@ -474,7 +474,7 @@ jobs:
persist-credentials: false
- name: Set up Python
uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0
uses: actions/setup-python@ece7cb06caefa5fff74198d8649806c4678c61a1 # v6.3.0
with:
python-version-file: ".python-version"
@@ -41,7 +41,7 @@ jobs:
with:
persist-credentials: false
- name: Set up Python
uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0
uses: actions/setup-python@ece7cb06caefa5fff74198d8649806c4678c61a1 # v6.3.0
with:
python-version-file: ".python-version"
check-latest: true
+13 -13
View File
@@ -346,7 +346,7 @@ jobs:
persist-credentials: false
- name: Set up Python ${{ matrix.python-version }}
id: python
uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0
uses: actions/setup-python@ece7cb06caefa5fff74198d8649806c4678c61a1 # v6.3.0
with:
python-version: ${{ matrix.python-version }}
check-latest: true
@@ -480,7 +480,7 @@ jobs:
version: ${{ env.APT_CACHE_VERSION }}
- name: Set up Python
id: python
uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0
uses: actions/setup-python@ece7cb06caefa5fff74198d8649806c4678c61a1 # v6.3.0
with:
python-version-file: ".python-version"
check-latest: true
@@ -517,7 +517,7 @@ jobs:
persist-credentials: false
- name: Set up Python
id: python
uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0
uses: actions/setup-python@ece7cb06caefa5fff74198d8649806c4678c61a1 # v6.3.0
with:
python-version-file: ".python-version"
check-latest: true
@@ -553,7 +553,7 @@ jobs:
persist-credentials: false
- name: Set up Python
id: python
uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0
uses: actions/setup-python@ece7cb06caefa5fff74198d8649806c4678c61a1 # v6.3.0
with:
python-version-file: ".python-version"
check-latest: true
@@ -608,7 +608,7 @@ jobs:
persist-credentials: false
- name: Set up Python ${{ matrix.python-version }}
id: python
uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0
uses: actions/setup-python@ece7cb06caefa5fff74198d8649806c4678c61a1 # v6.3.0
with:
python-version: ${{ matrix.python-version }}
check-latest: true
@@ -659,7 +659,7 @@ jobs:
persist-credentials: false
- name: Set up Python
id: python
uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0
uses: actions/setup-python@ece7cb06caefa5fff74198d8649806c4678c61a1 # v6.3.0
with:
python-version-file: ".python-version"
check-latest: true
@@ -712,7 +712,7 @@ jobs:
persist-credentials: false
- name: Set up Python
id: python
uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0
uses: actions/setup-python@ece7cb06caefa5fff74198d8649806c4678c61a1 # v6.3.0
with:
python-version-file: ".python-version"
check-latest: true
@@ -763,7 +763,7 @@ jobs:
persist-credentials: false
- name: Set up Python
id: python
uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0
uses: actions/setup-python@ece7cb06caefa5fff74198d8649806c4678c61a1 # v6.3.0
with:
python-version-file: ".python-version"
check-latest: true
@@ -840,7 +840,7 @@ jobs:
execute_install_scripts: true
- name: Set up Python
id: python
uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0
uses: actions/setup-python@ece7cb06caefa5fff74198d8649806c4678c61a1 # v6.3.0
with:
python-version-file: ".python-version"
check-latest: true
@@ -905,7 +905,7 @@ jobs:
execute_install_scripts: true
- name: Set up Python ${{ matrix.python-version }}
id: python
uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0
uses: actions/setup-python@ece7cb06caefa5fff74198d8649806c4678c61a1 # v6.3.0
with:
python-version: ${{ matrix.python-version }}
check-latest: true
@@ -1047,7 +1047,7 @@ jobs:
execute_install_scripts: true
- name: Set up Python ${{ matrix.python-version }}
id: python
uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0
uses: actions/setup-python@ece7cb06caefa5fff74198d8649806c4678c61a1 # v6.3.0
with:
python-version: ${{ matrix.python-version }}
check-latest: true
@@ -1203,7 +1203,7 @@ jobs:
version: ${{ env.APT_CACHE_VERSION }}
- name: Set up Python ${{ matrix.python-version }}
id: python
uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0
uses: actions/setup-python@ece7cb06caefa5fff74198d8649806c4678c61a1 # v6.3.0
with:
python-version: ${{ matrix.python-version }}
check-latest: true
@@ -1371,7 +1371,7 @@ jobs:
execute_install_scripts: true
- name: Set up Python ${{ matrix.python-version }}
id: python
uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0
uses: actions/setup-python@ece7cb06caefa5fff74198d8649806c4678c61a1 # v6.3.0
with:
python-version: ${{ matrix.python-version }}
check-latest: true
+1 -1
View File
@@ -27,7 +27,7 @@ jobs:
persist-credentials: false
- name: Set up Python
uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0
uses: actions/setup-python@ece7cb06caefa5fff74198d8649806c4678c61a1 # v6.3.0
with:
python-version-file: ".python-version"
+3 -3
View File
@@ -35,7 +35,7 @@ jobs:
- name: Set up Python
id: python
uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0
uses: actions/setup-python@ece7cb06caefa5fff74198d8649806c4678c61a1 # v6.3.0
with:
python-version-file: ".python-version"
check-latest: true
@@ -137,7 +137,7 @@ jobs:
sed -i "/uv/d" requirements_diff.txt
- name: Build wheels
uses: home-assistant/wheels@34957438948e0b3dcde73c77750643dadae594f5 # 2026.06.0
uses: home-assistant/wheels@9e17ab1ed5c4c79d8b61e29fa63de25ca2710716 # 2026.07.0
with:
abi: ${{ matrix.abi }}
tag: musllinux_1_2
@@ -195,7 +195,7 @@ jobs:
sed -i "/uv/d" requirements_diff.txt
- name: Build wheels
uses: home-assistant/wheels@34957438948e0b3dcde73c77750643dadae594f5 # 2026.06.0
uses: home-assistant/wheels@9e17ab1ed5c4c79d8b61e29fa63de25ca2710716 # 2026.07.0
with:
abi: ${{ matrix.abi }}
tag: musllinux_1_2
-8
View File
@@ -1,9 +1 @@
"""Init file for Home Assistant."""
from probatio.compat import install_as_voluptuous
# Probatio replaces voluptuous as the validation engine. Custom integrations and a
# few dependencies still import voluptuous directly, so alias it to probatio in
# sys.modules before anything imports it. This must run before the first
# `import voluptuous`, hence the package __init__.
install_as_voluptuous()
@@ -6,8 +6,8 @@ import logging
from typing import TYPE_CHECKING, Any, cast, override
import anthropic
from probatio import to_openapi as convert
import voluptuous as vol
from voluptuous_openapi import convert
from homeassistant.components.zone import ENTITY_ID_HOME
from homeassistant.config_entries import (
+1 -1
View File
@@ -103,8 +103,8 @@ from anthropic.types.web_fetch_tool_result_block import (
from anthropic.types.web_fetch_tool_result_block_param import (
Content as WebFetchToolResultBlockParamContentParam,
)
from probatio import to_openapi as convert
import voluptuous as vol
from voluptuous_openapi import convert
from homeassistant.components import conversation
from homeassistant.config_entries import ConfigSubentry
+2 -2
View File
@@ -74,8 +74,8 @@ from ipaddress import ip_address
from typing import TYPE_CHECKING, Any, cast
from aiohttp import web
from probatio import serialize
import voluptuous as vol
import voluptuous_serialize
from homeassistant import data_entry_flow
from homeassistant.auth import AuthManagerFlowManager, InvalidAuthError
@@ -263,7 +263,7 @@ def _prepare_result_json(result: AuthFlowResult) -> dict[str, Any]:
if (schema := result["data_schema"]) is None:
data["data_schema"] = []
else:
data["data_schema"] = serialize(schema)
data["data_schema"] = voluptuous_serialize.convert(schema)
return data
@@ -3,8 +3,8 @@
import logging
from typing import Any, override
from probatio import serialize
import voluptuous as vol
import voluptuous_serialize
from homeassistant import data_entry_flow
from homeassistant.components import websocket_api
@@ -153,6 +153,6 @@ def _prepare_result_json(result: data_entry_flow.FlowResult) -> dict[str, Any]:
if (schema := result["data_schema"]) is None:
data["data_schema"] = []
else:
data["data_schema"] = serialize(schema)
data["data_schema"] = voluptuous_serialize.convert(schema)
return data
+1 -1
View File
@@ -42,8 +42,8 @@ from openai.types.responses.response_input_param import (
ImageGenerationCall as ImageGenerationCallParam,
)
from openai.types.responses.response_output_item import ImageGenerationCall
from probatio import to_openapi as convert
import voluptuous as vol
from voluptuous_openapi import convert
from homeassistant.components import conversation
from homeassistant.config_entries import ConfigEntry
@@ -9,8 +9,8 @@ import logging
from types import ModuleType
from typing import TYPE_CHECKING, Any, Literal, overload
from probatio import serialize
import voluptuous as vol
import voluptuous_serialize
from homeassistant.components import websocket_api
from homeassistant.components.websocket_api import ActiveConnection
@@ -318,7 +318,7 @@ async def _async_get_device_automation_capabilities(
if (extra_fields := capabilities.get("extra_fields")) is None:
capabilities["extra_fields"] = []
else:
capabilities["extra_fields"] = serialize(
capabilities["extra_fields"] = voluptuous_serialize.convert(
extra_fields, custom_serializer=cv.custom_serializer
)
@@ -8,7 +8,7 @@
"iot_class": "local_polling",
"loggers": ["pyenphase"],
"quality_scale": "platinum",
"requirements": ["pyenphase==3.0.0"],
"requirements": ["pyenphase==3.0.1"],
"zeroconf": [
{
"type": "_enphase-envoy._tcp.local."
@@ -31,8 +31,8 @@ from google.genai.types import (
Tool,
ToolListUnion,
)
from probatio import to_openapi as convert
import voluptuous as vol
from voluptuous_openapi import convert
from homeassistant.components import conversation
from homeassistant.config_entries import ConfigSubentry
@@ -2,13 +2,13 @@
from typing import Any, TypedDict
from probatio import serialize
from pyinsteon import async_close, async_connect, devices
from pyinsteon.address import Address
from pyinsteon.aldb.aldb_record import ALDBRecord
from pyinsteon.constants import LinkStatus
from pyinsteon.managers.link_manager import get_broken_links
import voluptuous as vol
import voluptuous_serialize
from homeassistant.components import websocket_api
from homeassistant.config_entries import ConfigEntry
@@ -212,10 +212,12 @@ async def websocket_get_modem_schema(
config_data = config_entry.data
if device := config_data.get(CONF_DEVICE):
ports = await async_get_usb_ports(hass=hass)
plm_schema = serialize(build_plm_schema(ports=ports, device=device))
plm_schema = voluptuous_serialize.convert(
build_plm_schema(ports=ports, device=device)
)
connection.send_result(msg[ID], plm_schema)
else:
hub_schema = serialize(build_hub_schema(**config_data))
hub_schema = voluptuous_serialize.convert(build_hub_schema(**config_data))
connection.send_result(msg[ID], hub_schema)
@@ -2,7 +2,6 @@
from typing import Any
from probatio import serialize
from pyinsteon import devices
from pyinsteon.config import (
LOAD_BUTTON,
@@ -19,6 +18,7 @@ from pyinsteon.constants import (
)
from pyinsteon.device_types.device_base import Device
import voluptuous as vol
import voluptuous_serialize
from homeassistant.components import websocket_api
from homeassistant.core import HomeAssistant
@@ -43,26 +43,26 @@ RELAY_MODES = [str(RelayMode(v)).lower() for v in list(RelayMode)]
def _bool_schema(name):
return serialize(vol.Schema({vol.Required(name): bool}))[0]
return voluptuous_serialize.convert(vol.Schema({vol.Required(name): bool}))[0]
def _byte_schema(name):
return serialize(vol.Schema({vol.Required(name): cv.byte}))[0]
return voluptuous_serialize.convert(vol.Schema({vol.Required(name): cv.byte}))[0]
def _float_schema(name):
return serialize(vol.Schema({vol.Required(name): float}))[0]
return voluptuous_serialize.convert(vol.Schema({vol.Required(name): float}))[0]
def _list_schema(name, values):
return serialize(
return voluptuous_serialize.convert(
vol.Schema({vol.Required(name): vol.In(values)}),
custom_serializer=cv.custom_serializer,
)[0]
def _multi_select_schema(name, values):
return serialize(
return voluptuous_serialize.convert(
vol.Schema({vol.Optional(name): cv.multi_select(values)}),
custom_serializer=cv.custom_serializer,
)[0]
@@ -70,7 +70,7 @@ def _multi_select_schema(name, values):
def _read_only_schema(name, value):
"""Return a constant value schema."""
return serialize(vol.Schema({vol.Required(name): value}))[0]
return voluptuous_serialize.convert(vol.Schema({vol.Required(name): value}))[0]
def get_schema(prop, name, groups):
@@ -1,6 +1,6 @@
"""Selectors for KNX."""
from collections.abc import Iterable
from collections.abc import Hashable, Iterable
from enum import Enum
from typing import Any, override
@@ -23,7 +23,7 @@ class AllSerializeFirst(vol.All):
class KNXSelectorBase:
"""Base class for KNX selectors supporting optional nested schemas."""
schema: vol.Schema | vol.Any | vol.All | GroupSelectSchema
schema: vol.Schema | vol.Any | vol.All
selector_type: str
# mark if self.schema should be serialized to `schema` key
serialize_subschema: bool = False
@@ -108,35 +108,30 @@ class GroupSelectOption(KNXSelectorBase):
}
class GroupSelectSchema:
"""Use the first validated value, like ``vol.Any``.
class GroupSelectSchema(vol.Any):
"""Use the first validated value.
A standalone validator rather than a ``vol.Any`` subclass, so it does not
reach into validation-engine internals. On total failure it raises the most
useful branch error (the first that is not an unknown-key error, else the
first) so the UI marks a real problem instead of an extra key.
This is a version of vol.Any with custom error handling to
show proper invalid markers for sub-schema items in the UI.
"""
def __init__(self, *options: vol.Schemable, msg: str | None = None) -> None:
"""Store the options to try in order."""
self.validators = options
self.msg = msg
self._compiled = [vol.Schema(option) for option in options]
def __call__(self, data: Any) -> Any:
"""Return the first option that validates, else raise the best error."""
@override
def _exec(self, funcs: Iterable, v: Any, path: list[Hashable] | None = None) -> Any:
"""Execute the validation functions."""
errors: list[vol.Invalid] = []
for option in self._compiled:
for func in funcs:
try:
return option(data)
except vol.Invalid as err:
errors.append(err)
if path is None:
return func(v)
return func(path, v)
except vol.Invalid as e:
errors.append(e)
if errors:
raise next(
(err for err in errors if err.code != "extra_keys_not_allowed"),
(err for err in errors if "extra keys not allowed" not in err.msg),
errors[0],
)
raise vol.AnyInvalid(self.msg or "no valid value found")
raise vol.AnyInvalid(self.msg or "no valid value found", path=path)
class GroupSelect(KNXSelectorBase):
@@ -2,7 +2,8 @@
from typing import Any, cast
from probatio import UNSUPPORTED, serialize as convert
import voluptuous as vol
from voluptuous_serialize import UNSUPPORTED, UnsupportedType, convert
from homeassistant.const import Platform
from homeassistant.helpers import selector
@@ -11,7 +12,9 @@ from .entity_store_schema import KNX_SCHEMA_FOR_PLATFORM
from .knx_selector import AllSerializeFirst, GroupSelectSchema, KNXSelectorBase
def knx_serializer(schema: Any) -> Any:
def knx_serializer(
schema: vol.Schema,
) -> dict[str, Any] | list[dict[str, Any]] | UnsupportedType:
"""Serialize KNX schema."""
if isinstance(schema, GroupSelectSchema):
return [
@@ -40,8 +43,5 @@ def get_serialized_schema(
) -> dict[str, Any] | list[dict[str, Any]] | None:
"""Get the schema for a specific platform."""
if knx_schema := KNX_SCHEMA_FOR_PLATFORM.get(platform):
return cast(
"dict[str, Any] | list[dict[str, Any]]",
convert(knx_schema, custom_serializer=knx_serializer),
)
return convert(knx_schema, custom_serializer=knx_serializer)
return None
+1 -1
View File
@@ -12,8 +12,8 @@ from mcp import McpError
from mcp.client.session import ClientSession
from mcp.client.sse import sse_client
from mcp.client.streamable_http import streamable_http_client
from probatio import from_openapi as convert_to_voluptuous
import voluptuous as vol
from voluptuous_openapi import convert_to_voluptuous
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_URL
@@ -15,9 +15,9 @@ from typing import Any, cast
from mcp import types
from mcp.server import Server
from mcp.server.lowlevel.helper_types import ReadResourceContents
from probatio import to_openapi as convert
from pydantic import AnyUrl
import voluptuous as vol
from voluptuous_openapi import convert
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
@@ -227,8 +227,8 @@ class MikrotikData:
_LOGGER.debug("Running command %s", cmd)
try:
if params:
return list(self.api(cmd=cmd, **params))
return list(self.api(cmd=cmd))
return list(self.api(cmd, **params))
return list(self.api(cmd))
except (
librouteros.exceptions.ConnectionClosed,
OSError,
@@ -318,8 +318,7 @@ def get_api(entry: dict[str, Any]) -> librouteros.Api:
"""Connect to Mikrotik hub."""
_LOGGER.debug("Connecting to Mikrotik hub [%s]", entry[CONF_HOST])
_login_method = (login_plain, login_token)
kwargs = {"login_methods": _login_method, "port": entry["port"], "encoding": "utf8"}
kwargs = {"port": entry["port"], "encoding": "utf8"}
if entry[CONF_VERIFY_SSL]:
ssl_context = ssl.create_default_context()
@@ -328,22 +327,30 @@ def get_api(entry: dict[str, Any]) -> librouteros.Api:
_ssl_wrapper = ssl_context.wrap_socket
kwargs["ssl_wrapper"] = _ssl_wrapper
try:
api = librouteros.connect(
entry[CONF_HOST],
entry[CONF_USERNAME],
entry[CONF_PASSWORD],
**kwargs,
)
except (
librouteros.exceptions.LibRouterosError,
OSError,
TimeoutError,
) as api_error:
_LOGGER.error("Mikrotik %s error: %s", entry[CONF_HOST], api_error)
if "invalid user name or password" in str(api_error):
raise LoginError from api_error
raise CannotConnect from api_error
_error: Exception | None = None
for method in (login_plain, login_token):
try:
kwargs["login_method"] = method
api = librouteros.connect(
entry[CONF_HOST],
entry[CONF_USERNAME],
entry[CONF_PASSWORD],
**kwargs,
)
_error = None
break
except (
librouteros.exceptions.LibRouterosError,
OSError,
TimeoutError,
) as api_error:
_error = api_error
if _error is not None:
_LOGGER.error("Mikrotik %s error: %s", entry[CONF_HOST], _error)
if "invalid user name or password" in str(_error):
raise LoginError from _error
raise CannotConnect from _error
_LOGGER.debug("Connected to %s successfully", entry[CONF_HOST])
return api
@@ -7,5 +7,5 @@
"integration_type": "device",
"iot_class": "local_polling",
"loggers": ["librouteros"],
"requirements": ["librouteros==3.2.1"]
"requirements": ["librouteros==4.1.1"]
}
@@ -14,7 +14,6 @@ from aiohttp.web import HTTPBadRequest, Request, Response, json_response
from nacl.exceptions import CryptoError
from nacl.secret import SecretBox
import voluptuous as vol
from voluptuous.humanize import humanize_error
from homeassistant.components import (
camera,
@@ -160,7 +159,7 @@ def validate_schema(schema):
try:
data = schema(data)
except vol.Invalid as ex:
err = humanize_error(data, ex)
err = vol.humanize.humanize_error(data, ex)
_LOGGER.error("Received invalid webhook payload: %s", err)
return empty_okay_response()
@@ -201,7 +200,7 @@ async def handle_webhook(
try:
req_data = WEBHOOK_PAYLOAD_SCHEMA(req_data)
except vol.Invalid as ex:
err = humanize_error(req_data, ex)
err = vol.humanize.humanize_error(req_data, ex)
_LOGGER.error(
"Received invalid webhook from %s with payload: %s", device_name, err
)
@@ -649,7 +648,7 @@ async def webhook_update_sensor_states(
try:
sensor = SENSOR_SCHEMA_FULL(sensor)
except vol.Invalid as err:
err_msg = humanize_error(sensor, err)
err_msg = vol.humanize.humanize_error(sensor, err)
_LOGGER.error(
"Received invalid sensor payload from %s for %s: %s",
device_name,
+1 -2
View File
@@ -71,7 +71,6 @@ from .const import (
DEFAULT_QOS,
DEFAULT_TRANSPORT,
DEFAULT_WILL,
DEFAULT_WS_HEADERS,
DEFAULT_WS_PATH,
DOMAIN,
MQTT_CONNECTION_STATE,
@@ -414,7 +413,7 @@ class MqttClientSetup:
tls_insecure = config.get(CONF_TLS_INSECURE)
if transport == TRANSPORT_WEBSOCKETS:
ws_path: str = config.get(CONF_WS_PATH, DEFAULT_WS_PATH)
ws_headers: dict[str, str] = config.get(CONF_WS_HEADERS, DEFAULT_WS_HEADERS)
ws_headers: dict[str, str] = config.get(CONF_WS_HEADERS, {})
self._client.ws_set_options(ws_path, ws_headers)
if certificate is not None:
self._client.tls_set(
+251 -365
View File
@@ -373,7 +373,6 @@ from .const import (
DEFAULT_CLIMATE_INITIAL_TEMPERATURE,
DEFAULT_DISCOVERY,
DEFAULT_ENCODING,
DEFAULT_KEEPALIVE,
DEFAULT_ON_COMMAND_TYPE,
DEFAULT_PAYLOAD_ARM_AWAY,
DEFAULT_PAYLOAD_ARM_CUSTOM_BYPASS,
@@ -414,7 +413,6 @@ from .const import (
DEFAULT_TILT_OPEN_POSITION,
DEFAULT_TRANSPORT,
DEFAULT_WILL,
DEFAULT_WS_PATH,
DOMAIN,
REMOTE_CODE,
REMOTE_CODE_TEXT,
@@ -441,7 +439,7 @@ ADDON_SETUP_TIMEOUT_ROUNDS = 5
CONF_CLIENT_KEY_PASSWORD = "client_key_password"
ADVANCED_OPTIONS = "advanced_options"
OTHER_SETTINGS = "other_settings"
SET_CA_CERT = "set_ca_cert"
SET_CLIENT_CERT = "set_client_cert"
@@ -1124,7 +1122,7 @@ def validate_light_platform_config(user_data: dict[str, Any]) -> dict[str, str]:
if user_data.get(CONF_MIN_KELVIN, DEFAULT_MIN_KELVIN) >= user_data.get(
CONF_MAX_KELVIN, DEFAULT_MAX_KELVIN
):
errors["other_settings"] = "max_below_min_kelvin"
errors[OTHER_SETTINGS] = "max_below_min_kelvin"
return errors
@@ -1506,7 +1504,7 @@ PLATFORM_ENTITY_FIELDS: dict[Platform, dict[str, PlatformField]] = {
selector=SUGGESTED_DISPLAY_PRECISION_SELECTOR,
required=False,
validator=cv.positive_int,
section="other_settings",
section=OTHER_SETTINGS,
),
CONF_OPTIONS: PlatformField(
selector=OPTIONS_SELECTOR,
@@ -1678,13 +1676,13 @@ PLATFORM_MQTT_FIELDS: dict[Platform, dict[str, PlatformField]] = {
selector=TIMEOUT_SELECTOR,
required=False,
validator=cv.positive_int,
section="other_settings",
section=OTHER_SETTINGS,
),
CONF_OFF_DELAY: PlatformField(
selector=TIMEOUT_SELECTOR,
required=False,
validator=cv.positive_int,
section="other_settings",
section=OTHER_SETTINGS,
),
},
Platform.BUTTON: {
@@ -3125,7 +3123,7 @@ PLATFORM_MQTT_FIELDS: dict[Platform, dict[str, PlatformField]] = {
default=False,
validator=cv.boolean,
conditions=({CONF_SCHEMA: "json"},),
section="other_settings",
section=OTHER_SETTINGS,
),
CONF_FLASH_TIME_SHORT: PlatformField(
selector=FLASH_TIME_SELECTOR,
@@ -3133,7 +3131,7 @@ PLATFORM_MQTT_FIELDS: dict[Platform, dict[str, PlatformField]] = {
validator=cv.positive_int,
default=2,
conditions=({CONF_SCHEMA: "json"},),
section="other_settings",
section=OTHER_SETTINGS,
),
CONF_FLASH_TIME_LONG: PlatformField(
selector=FLASH_TIME_SELECTOR,
@@ -3141,7 +3139,7 @@ PLATFORM_MQTT_FIELDS: dict[Platform, dict[str, PlatformField]] = {
validator=cv.positive_int,
default=10,
conditions=({CONF_SCHEMA: "json"},),
section="other_settings",
section=OTHER_SETTINGS,
),
CONF_TRANSITION: PlatformField(
selector=BOOLEAN_SELECTOR,
@@ -3149,21 +3147,21 @@ PLATFORM_MQTT_FIELDS: dict[Platform, dict[str, PlatformField]] = {
default=False,
validator=cv.boolean,
conditions=({CONF_SCHEMA: "json"},),
section="other_settings",
section=OTHER_SETTINGS,
),
CONF_MAX_KELVIN: PlatformField(
selector=KELVIN_SELECTOR,
required=False,
validator=cv.positive_int,
default=DEFAULT_MAX_KELVIN,
section="other_settings",
section=OTHER_SETTINGS,
),
CONF_MIN_KELVIN: PlatformField(
selector=KELVIN_SELECTOR,
required=False,
validator=cv.positive_int,
default=DEFAULT_MIN_KELVIN,
section="other_settings",
section=OTHER_SETTINGS,
),
},
Platform.LOCK: {
@@ -3372,7 +3370,7 @@ PLATFORM_MQTT_FIELDS: dict[Platform, dict[str, PlatformField]] = {
selector=TIMEOUT_SELECTOR,
required=False,
validator=cv.positive_int,
section="other_settings",
section=OTHER_SETTINGS,
),
},
Platform.SIREN: {
@@ -3798,10 +3796,10 @@ PLATFORM_MQTT_FIELDS: dict[Platform, dict[str, PlatformField]] = {
MQTT_DEVICE_PLATFORM_FIELDS = {
CONF_NAME: PlatformField(selector=TEXT_SELECTOR, required=True),
CONF_SW_VERSION: PlatformField(
selector=TEXT_SELECTOR, required=False, section="other_settings"
selector=TEXT_SELECTOR, required=False, section=OTHER_SETTINGS
),
CONF_HW_VERSION: PlatformField(
selector=TEXT_SELECTOR, required=False, section="other_settings"
selector=TEXT_SELECTOR, required=False, section=OTHER_SETTINGS
),
CONF_MODEL: PlatformField(selector=TEXT_SELECTOR, required=False),
CONF_MODEL_ID: PlatformField(selector=TEXT_SELECTOR, required=False),
@@ -4036,24 +4034,22 @@ def subentry_schema_default_data_from_fields(
@callback
def update_password_from_user_input(
entry_password: str | None, user_input: dict[str, Any]
) -> dict[str, Any]:
) -> None:
"""Update the password if the entry has been updated.
As we want to avoid reflecting the stored password in the UI,
we replace the suggested value in the UI with a sentitel,
and we change it back here if it was changed.
"""
substituted_used_data = dict(user_input)
# Take out the password submitted
user_password: str | None = substituted_used_data.pop(CONF_PASSWORD, None)
user_password: str | None = user_input.pop(CONF_PASSWORD, None)
# Only add the password if it has changed.
# If the sentinel password is submitted, we replace that with our current
# password from the config entry data.
password_changed = user_password is not None and user_password != PWD_NOT_CHANGED
password = user_password if password_changed else entry_password
if password is not None:
substituted_used_data[CONF_PASSWORD] = password
return substituted_used_data
user_input[CONF_PASSWORD] = password
REAUTH_SCHEMA = vol.Schema(
@@ -4063,6 +4059,35 @@ REAUTH_SCHEMA = vol.Schema(
}
)
OTHER_SETTINGS_SCHEMA = vol.Schema(
{
vol.Optional(CONF_CLIENT_ID): TEXT_SELECTOR,
vol.Optional(CONF_KEEPALIVE): KEEPALIVE_SELECTOR,
vol.Required(SET_CLIENT_CERT): BOOLEAN_SELECTOR,
vol.Optional(CONF_CLIENT_CERT): CERT_UPLOAD_SELECTOR,
vol.Optional(CONF_CLIENT_KEY): CERT_KEY_UPLOAD_SELECTOR,
vol.Optional(CONF_CLIENT_KEY_PASSWORD): PASSWORD_SELECTOR,
vol.Required(SET_CA_CERT): BROKER_VERIFICATION_SELECTOR,
vol.Optional(CONF_CERTIFICATE): CA_CERT_UPLOAD_SELECTOR,
vol.Optional(CONF_TLS_INSECURE): BOOLEAN_SELECTOR,
vol.Required(CONF_TRANSPORT, default=DEFAULT_TRANSPORT): TRANSPORT_SELECTOR,
vol.Optional(CONF_WS_PATH): TEXT_SELECTOR,
vol.Optional(CONF_WS_HEADERS): WS_HEADERS_SELECTOR,
}
)
CONFIG_DATAFLOW_SCHEMA = vol.Schema(
{
vol.Required(CONF_BROKER): TEXT_SELECTOR,
vol.Required(CONF_PORT, default=DEFAULT_PORT): PORT_SELECTOR,
vol.Required(CONF_PROTOCOL, default=DEFAULT_PROTOCOL): PROTOCOL_SELECTOR,
vol.Optional(CONF_USERNAME): TEXT_SELECTOR,
vol.Optional(CONF_PASSWORD): PASSWORD_SELECTOR,
vol.Required(OTHER_SETTINGS): section(
OTHER_SETTINGS_SCHEMA, SectionConfig({"collapsed": True})
),
}
)
class FlowHandler(ConfigFlow, domain=DOMAIN):
"""Handle a config flow."""
@@ -4072,24 +4097,26 @@ class FlowHandler(ConfigFlow, domain=DOMAIN):
_hassio_discovery: dict[str, Any] | None = None
_addon_manager: AddonManager
last_uploaded: dict[str, Any]
def __init__(self) -> None:
"""Set up flow instance."""
self.install_task: asyncio.Task | None = None
self.start_task: asyncio.Task | None = None
self.last_uploaded = {}
@override
@classmethod
@callback
@override
def async_get_supported_subentry_types(
cls, config_entry: ConfigEntry
) -> dict[str, type[ConfigSubentryFlow]]:
"""Return subentries supported by this handler."""
return {CONF_DEVICE: MQTTSubentryFlowHandler}
@override
@staticmethod
@callback
@override
def async_get_options_flow(
config_entry: ConfigEntry,
) -> MQTTOptionsFlowHandler:
@@ -4310,8 +4337,9 @@ class FlowHandler(ConfigFlow, domain=DOMAIN):
reauth_entry = self._get_reauth_entry()
if user_input:
substituted_used_data = update_password_from_user_input(
reauth_entry.data.get(CONF_PASSWORD), user_input
substituted_used_data = deepcopy(user_input)
update_password_from_user_input(
reauth_entry.data.get(CONF_PASSWORD), substituted_used_data
)
new_entry_data = {**reauth_entry.data, **substituted_used_data}
if await self.hass.async_add_executor_job(
@@ -4335,49 +4363,76 @@ class FlowHandler(ConfigFlow, domain=DOMAIN):
errors=errors,
)
@callback
def async_get_entry_defaults(self) -> dict[str, Any]:
"""Load the default settings from the entry."""
data = self._get_reconfigure_entry().data
other_settings: dict[str, Any] = {
key.schema: data[key.schema]
for key in OTHER_SETTINGS_SCHEMA.schema
if key in data
}
other_settings[SET_CLIENT_CERT] = (CONF_CLIENT_CERT in other_settings) and (
CONF_CLIENT_KEY in other_settings
)
other_settings.pop(CONF_CLIENT_CERT, None)
other_settings.pop(CONF_CLIENT_KEY, None)
conf_cert = other_settings.pop(CONF_CERTIFICATE, None)
other_settings[SET_CA_CERT] = (
"auto"
if conf_cert == "auto"
else "custom"
if conf_cert is not None
else "off"
)
if CONF_WS_HEADERS in other_settings:
other_settings[CONF_WS_HEADERS] = json_dumps(
other_settings.pop(CONF_WS_HEADERS)
)
settings: dict[str, Any] = {
key.schema: data[key.schema]
for key in CONFIG_DATAFLOW_SCHEMA.schema
if key in data
}
settings[OTHER_SETTINGS] = other_settings
if CONF_PASSWORD in settings:
# Hide entry password
settings[CONF_PASSWORD] = PWD_NOT_CHANGED
return settings
async def async_step_broker(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Confirm the setup."""
errors: dict[str, str] = {}
fields: OrderedDict[Any, Any] = OrderedDict()
validated_user_input: dict[str, Any] = {}
schema = CONFIG_DATAFLOW_SCHEMA
entry_config_update: dict[str, Any] = {}
entry_defaults: dict[str, Any] | None = None
if is_reconfigure := (self.source == SOURCE_RECONFIGURE):
reconfigure_entry = self._get_reconfigure_entry()
if await async_get_broker_settings(
entry_defaults = self.async_get_entry_defaults()
if await async_validate_broker_settings(
self,
fields,
reconfigure_entry.data if is_reconfigure else None,
user_input,
validated_user_input,
entry_config_update,
errors,
):
if is_reconfigure:
validated_user_input = update_password_from_user_input(
reconfigure_entry.data.get(CONF_PASSWORD), validated_user_input
return self.async_update_and_abort(
reconfigure_entry,
data=entry_config_update,
)
can_connect = await self.hass.async_add_executor_job(
try_connection,
validated_user_input,
return self.async_create_entry(
title=entry_config_update[CONF_BROKER],
data=entry_config_update,
)
if can_connect:
if is_reconfigure:
return self.async_update_and_abort(
reconfigure_entry,
data=validated_user_input,
)
return self.async_create_entry(
title=validated_user_input[CONF_BROKER],
data=validated_user_input,
)
errors["base"] = "cannot_connect"
return self.async_show_form(
step_id="broker", data_schema=vol.Schema(fields), errors=errors
schema = self.add_suggested_values_to_schema(
schema, (entry_defaults or {}) | (user_input or {})
)
return self.async_show_form(step_id="broker", data_schema=schema, errors=errors)
async def async_step_reconfigure(
self, user_input: dict[str, Any] | None = None
@@ -4688,8 +4743,8 @@ class MQTTSubentryFlowHandler(ConfigSubentryFlow):
if user_input is not None:
new_device_data: dict[str, Any] = user_input.copy()
_, errors = validate_user_input(user_input, MQTT_DEVICE_PLATFORM_FIELDS)
if "other_settings" in new_device_data:
new_device_data |= new_device_data.pop("other_settings")
if OTHER_SETTINGS in new_device_data:
new_device_data |= new_device_data.pop(OTHER_SETTINGS)
if not errors:
self._subentry_data[CONF_DEVICE] = cast(MqttDeviceData, new_device_data)
if self.source == SOURCE_RECONFIGURE:
@@ -5250,331 +5305,162 @@ async def _get_uploaded_file(hass: HomeAssistant, id: str) -> bytes:
return await hass.async_add_executor_job(_proces_uploaded_file)
def _validate_pki_file(
file_id: str | None, pem_data: str | None, errors: dict[str, str], error: str
) -> bool:
"""Return False if uploaded file could not be converted to PEM format."""
if file_id and not pem_data:
errors["base"] = error
return False
return True
async def async_get_broker_settings(
flow: ConfigFlow | OptionsFlow,
fields: OrderedDict[Any, Any],
async def async_validate_broker_settings(
flow: FlowHandler,
entry_config: MappingProxyType[str, Any] | None,
user_input: dict[str, Any] | None,
validated_user_input: dict[str, Any],
entry_config_update: dict[str, Any],
errors: dict[str, str],
) -> bool:
"""Build the config flow schema to collect the broker settings.
"""Validate the broker settings, and return the updated entry dataset."""
Shows advanced options if one or more are configured
or when the advanced_broker_options checkbox was selected.
Returns True when settings are collected successfully.
"""
hass = flow.hass
advanced_broker_options: bool = False
user_input_basic: dict[str, Any] = {}
current_config: dict[str, Any] = (
entry_config.copy() if entry_config is not None else {}
)
async def _async_validate_broker_settings(
config: dict[str, Any],
user_input: dict[str, Any],
validated_user_input: dict[str, Any],
errors: dict[str, str],
async def _async_process_file_upload(
upload_id: str,
field: str,
pem_type: PEMType,
error_code: str,
password: str | None = None,
) -> bool:
"""Additional validation on broker settings for better error messages."""
if CONF_PROTOCOL not in validated_user_input:
validated_user_input[CONF_PROTOCOL] = DEFAULT_PROTOCOL
# Get current certificate settings from config entry
certificate: str | None = (
"auto"
if user_input.get(SET_CA_CERT, "off") == "auto"
else config.get(CONF_CERTIFICATE)
if user_input.get(SET_CA_CERT, "off") == "custom"
else None
)
client_certificate: str | None = (
config.get(CONF_CLIENT_CERT) if user_input.get(SET_CLIENT_CERT) else None
)
client_key: str | None = (
config.get(CONF_CLIENT_KEY) if user_input.get(SET_CLIENT_CERT) else None
)
# Prepare entry update with uploaded files
validated_user_input.update(user_input)
client_certificate_id: str | None = user_input.get(CONF_CLIENT_CERT)
client_key_id: str | None = user_input.get(CONF_CLIENT_KEY)
# We do not store the private key password in the entry data
client_key_password: str | None = validated_user_input.pop(
CONF_CLIENT_KEY_PASSWORD, None
)
if (client_certificate_id and not client_key_id) or (
not client_certificate_id and client_key_id
):
errors["base"] = "invalid_inclusion"
return False
certificate_id: str | None = user_input.get(CONF_CERTIFICATE)
if certificate_id:
certificate_data_raw = await _get_uploaded_file(hass, certificate_id)
certificate = async_convert_to_pem(
certificate_data_raw, PEMType.CERTIFICATE
)
if not _validate_pki_file(
certificate_id, certificate, errors, "bad_certificate"
):
return False
# Return to form for file upload CA cert or client cert and key
if (
(
not client_certificate
and user_input.get(SET_CLIENT_CERT)
and not client_certificate_id
)
or (
not certificate
and user_input.get(SET_CA_CERT, "off") == "custom"
and not certificate_id
)
or (
user_input.get(CONF_TRANSPORT) == TRANSPORT_WEBSOCKETS
and CONF_WS_PATH not in user_input
)
):
return False
if client_certificate_id:
client_certificate_data = await _get_uploaded_file(
hass, client_certificate_id
)
client_certificate = async_convert_to_pem(
client_certificate_data, PEMType.CERTIFICATE
)
if not _validate_pki_file(
client_certificate_id, client_certificate, errors, "bad_client_cert"
):
return False
if client_key_id:
client_key_data = await _get_uploaded_file(hass, client_key_id)
client_key = async_convert_to_pem(
client_key_data, PEMType.PRIVATE_KEY, password=client_key_password
)
if not _validate_pki_file(
client_key_id, client_key, errors, "client_key_error"
):
return False
certificate_data: dict[str, Any] = {}
if certificate:
certificate_data[CONF_CERTIFICATE] = certificate
if client_certificate:
certificate_data[CONF_CLIENT_CERT] = client_certificate
certificate_data[CONF_CLIENT_KEY] = client_key
validated_user_input.update(certificate_data)
await async_create_certificate_temp_files(hass, certificate_data)
if error := await hass.async_add_executor_job(
check_certicate_chain,
):
errors["base"] = error
return False
validated_user_input.pop(SET_CA_CERT, None)
validated_user_input.pop(SET_CLIENT_CERT, None)
if validated_user_input.get(CONF_TRANSPORT, TRANSPORT_TCP) == TRANSPORT_TCP:
validated_user_input.pop(CONF_WS_PATH, None)
validated_user_input.pop(CONF_WS_HEADERS, None)
return True
"""Get uploaded file, or a preserved copy, and convert to a PEM file."""
try:
validated_user_input[CONF_WS_HEADERS] = json_loads(
validated_user_input.get(CONF_WS_HEADERS, "{}")
data_raw = await _get_uploaded_file(hass, upload_id)
except ValueError:
# Use preserved file if available.
# When an uploaded file was read, but an error occurs,
# the form will reload but the temporary file from the upload
# will not be available any more. If it was processed correctly,
# we can use the preserved copy.
if upload_id in flow.last_uploaded:
data_raw = flow.last_uploaded[upload_id]
else:
raise
else:
# Preserve a copy in case the validation fails,
# and we need it later
flow.last_uploaded[upload_id] = data_raw
pem_data = async_convert_to_pem(data_raw, pem_type, password)
if upload_id and not pem_data:
errors["base"] = error_code
return False
entry_config_update[field] = pem_data
return True
if user_input is None:
return False
hass = flow.hass
# Copy basic and other entry fields
entry_config_update |= user_input
entry_config_update.update(entry_config_update.pop(OTHER_SETTINGS))
# Pop incompatible fields for update
for key in (
SET_CA_CERT,
SET_CLIENT_CERT,
CONF_CERTIFICATE,
CONF_CLIENT_CERT,
CONF_CLIENT_KEY,
CONF_CLIENT_KEY_PASSWORD,
):
entry_config_update.pop(key, None)
# Get current CA certificate settings from config entry
if (set_ca_cert := user_input[OTHER_SETTINGS][SET_CA_CERT]) == "auto":
entry_config_update[CONF_CERTIFICATE] = "auto"
elif (
entry_config is not None
and set_ca_cert == "custom"
and (current_cert := entry_config.get(CONF_CERTIFICATE))
):
entry_config_update[CONF_CERTIFICATE] = current_cert
# Prepare entry update with uploaded certificate files
# converted to PEM format
new_client_certificate: str | None = user_input[OTHER_SETTINGS].get(
CONF_CLIENT_CERT
)
new_client_key: str | None = user_input[OTHER_SETTINGS].get(CONF_CLIENT_KEY)
set_client_cert = user_input[OTHER_SETTINGS][SET_CLIENT_CERT]
if (new_client_certificate and not new_client_key) or (
not new_client_certificate and new_client_key
):
errors["base"] = "invalid_inclusion"
return False
if new_certificate := user_input[OTHER_SETTINGS].get(CONF_CERTIFICATE):
if not await _async_process_file_upload(
new_certificate, CONF_CERTIFICATE, PEMType.CERTIFICATE, "bad_certificate"
):
return False
if new_client_certificate:
if not await _async_process_file_upload(
new_client_certificate,
CONF_CLIENT_CERT,
PEMType.CERTIFICATE,
"bad_client_cert",
):
return False
elif (
entry_config is not None
and set_client_cert
and (client_cert := entry_config.get(CONF_CLIENT_CERT))
):
entry_config_update[CONF_CLIENT_CERT] = client_cert
if new_client_key:
if not await _async_process_file_upload(
new_client_key,
CONF_CLIENT_KEY,
PEMType.PRIVATE_KEY,
"client_key_error",
password=user_input[OTHER_SETTINGS].get(CONF_CLIENT_KEY_PASSWORD),
):
return False
elif (
entry_config is not None
and set_client_cert
and (client_key := entry_config.get(CONF_CLIENT_KEY))
):
entry_config_update[CONF_CLIENT_KEY] = client_key
# We temporarily create the current and new uploaded certificate files
# and we check the certificate chain.
await async_create_certificate_temp_files(hass, entry_config_update)
if error := await hass.async_add_executor_job(
check_certicate_chain,
):
errors["base"] = error
return False
if user_input[OTHER_SETTINGS].get(CONF_TRANSPORT, TRANSPORT_TCP) == TRANSPORT_TCP:
entry_config_update.pop(CONF_WS_PATH, None)
entry_config_update.pop(CONF_WS_HEADERS, None)
else:
# Web socket transport
try:
entry_config_update[CONF_WS_HEADERS] = json_loads(
user_input[OTHER_SETTINGS].get(CONF_WS_HEADERS, "{}")
)
schema = vol.Schema({cv.string: cv.template})
schema(validated_user_input[CONF_WS_HEADERS])
schema = vol.Schema({str: str})
schema(entry_config_update[CONF_WS_HEADERS])
except (*JSON_DECODE_EXCEPTIONS, vol.MultipleInvalid):
errors["base"] = "bad_ws_headers"
return False
# Test the configuration
if entry_config is not None:
update_password_from_user_input(
entry_config.get(CONF_PASSWORD), entry_config_update
)
if await hass.async_add_executor_job(
try_connection,
entry_config_update,
):
return True
if user_input:
user_input_basic = user_input.copy()
advanced_broker_options = user_input_basic.get(ADVANCED_OPTIONS, False)
if ADVANCED_OPTIONS not in user_input or advanced_broker_options is False:
if await _async_validate_broker_settings(
current_config,
user_input_basic,
validated_user_input,
errors,
):
return True
# Get defaults settings from previous post
current_broker = user_input_basic.get(CONF_BROKER)
current_port = user_input_basic.get(CONF_PORT, DEFAULT_PORT)
current_user = user_input_basic.get(CONF_USERNAME)
current_pass = user_input_basic.get(CONF_PASSWORD)
else:
# Get default settings from entry (if any)
current_broker = current_config.get(CONF_BROKER)
current_port = current_config.get(CONF_PORT, DEFAULT_PORT)
current_user = current_config.get(CONF_USERNAME)
# Return the sentinel password to avoid exposure
current_entry_pass = current_config.get(CONF_PASSWORD)
current_pass = PWD_NOT_CHANGED if current_entry_pass else None
# Treat the previous post as an update of the current settings
# (if there was a basic broker setup step)
current_config.update(user_input_basic)
# Get default settings for advanced broker options
current_client_id = current_config.get(CONF_CLIENT_ID)
current_keepalive = current_config.get(CONF_KEEPALIVE, DEFAULT_KEEPALIVE)
current_ca_certificate = current_config.get(CONF_CERTIFICATE)
current_client_certificate = current_config.get(CONF_CLIENT_CERT)
current_client_key = current_config.get(CONF_CLIENT_KEY)
current_tls_insecure = current_config.get(CONF_TLS_INSECURE, False)
current_protocol = current_config.get(CONF_PROTOCOL, DEFAULT_PROTOCOL)
current_transport = current_config.get(CONF_TRANSPORT, DEFAULT_TRANSPORT)
current_ws_path = current_config.get(CONF_WS_PATH, DEFAULT_WS_PATH)
current_ws_headers = (
json_dumps(current_config.get(CONF_WS_HEADERS))
if CONF_WS_HEADERS in current_config
else None
)
advanced_broker_options |= bool(
current_client_id
or current_keepalive != DEFAULT_KEEPALIVE
or current_ca_certificate
or current_client_certificate
or current_client_key
or current_tls_insecure
or current_config.get(SET_CA_CERT, "off") != "off"
or current_config.get(SET_CLIENT_CERT)
or current_transport == TRANSPORT_WEBSOCKETS
)
# Build form
fields[vol.Required(CONF_BROKER, default=current_broker)] = TEXT_SELECTOR
fields[vol.Required(CONF_PORT, default=current_port)] = PORT_SELECTOR
fields[
vol.Optional(
CONF_PROTOCOL,
description={"suggested_value": current_protocol},
)
] = PROTOCOL_SELECTOR
fields[
vol.Optional(
CONF_USERNAME,
description={"suggested_value": current_user},
)
] = TEXT_SELECTOR
fields[
vol.Optional(
CONF_PASSWORD,
description={"suggested_value": current_pass},
)
] = PASSWORD_SELECTOR
# show advanced options checkbox if no defaults
# of the advanced options are overridden
if not advanced_broker_options:
fields[
vol.Optional(
ADVANCED_OPTIONS,
)
] = BOOLEAN_SELECTOR
return False
fields[
vol.Optional(
CONF_CLIENT_ID,
description={"suggested_value": current_client_id},
)
] = TEXT_SELECTOR
fields[
vol.Optional(
CONF_KEEPALIVE,
description={"suggested_value": current_keepalive},
)
] = KEEPALIVE_SELECTOR
fields[
vol.Optional(
SET_CLIENT_CERT,
default=current_client_certificate is not None
or current_config.get(SET_CLIENT_CERT) is True,
)
] = BOOLEAN_SELECTOR
if (
current_client_certificate is not None
or current_config.get(SET_CLIENT_CERT) is True
):
fields[
vol.Optional(
CONF_CLIENT_CERT,
description={"suggested_value": user_input_basic.get(CONF_CLIENT_CERT)},
)
] = CERT_UPLOAD_SELECTOR
fields[
vol.Optional(
CONF_CLIENT_KEY,
description={"suggested_value": user_input_basic.get(CONF_CLIENT_KEY)},
)
] = CERT_KEY_UPLOAD_SELECTOR
fields[
vol.Optional(
CONF_CLIENT_KEY_PASSWORD,
description={
"suggested_value": user_input_basic.get(CONF_CLIENT_KEY_PASSWORD)
},
)
] = PASSWORD_SELECTOR
verification_mode = current_config.get(SET_CA_CERT) or (
"off"
if current_ca_certificate is None
else "auto"
if current_ca_certificate == "auto"
else "custom"
)
fields[
vol.Optional(
SET_CA_CERT,
default=verification_mode,
)
] = BROKER_VERIFICATION_SELECTOR
if current_ca_certificate is not None or verification_mode == "custom":
fields[
vol.Optional(
CONF_CERTIFICATE,
user_input_basic.get(CONF_CERTIFICATE),
)
] = CA_CERT_UPLOAD_SELECTOR
fields[
vol.Optional(
CONF_TLS_INSECURE,
description={"suggested_value": current_tls_insecure},
)
] = BOOLEAN_SELECTOR
fields[
vol.Optional(
CONF_TRANSPORT,
description={"suggested_value": current_transport},
)
] = TRANSPORT_SELECTOR
if current_transport == TRANSPORT_WEBSOCKETS:
fields[
vol.Optional(CONF_WS_PATH, description={"suggested_value": current_ws_path})
] = TEXT_SELECTOR
fields[
vol.Optional(
CONF_WS_HEADERS, description={"suggested_value": current_ws_headers}
)
] = WS_HEADERS_SELECTOR
# Show form
errors["base"] = "cannot_connect"
return False
-1
View File
@@ -315,7 +315,6 @@ DEFAULT_TILT_MAX = 100
DEFAULT_TILT_MIN = 0
DEFAULT_TILT_OPEN_POSITION = 100
DEFAULT_TILT_OPTIMISTIC = False
DEFAULT_WS_HEADERS: dict[str, str] = {}
DEFAULT_WS_PATH = "/"
DEFAULT_POSITION_CLOSED = 0
DEFAULT_POSITION_OPEN = 100
+36 -71
View File
@@ -26,46 +26,53 @@
"step": {
"broker": {
"data": {
"advanced_options": "Advanced options",
"broker": "Broker",
"certificate": "Upload custom CA certificate file",
"client_cert": "Upload client certificate file",
"client_id": "Client ID (leave empty to randomly generated one)",
"client_key": "Upload private key file",
"client_key_password": "[%key:common::config_flow::data::password%]",
"keepalive": "The time between sending keep alive messages",
"password": "[%key:common::config_flow::data::password%]",
"port": "[%key:common::config_flow::data::port%]",
"protocol": "MQTT protocol",
"set_ca_cert": "Broker certificate validation",
"set_client_cert": "Use a client certificate",
"tls_insecure": "Ignore broker certificate validation",
"transport": "MQTT transport",
"username": "[%key:common::config_flow::data::username%]",
"ws_headers": "WebSocket headers in JSON format",
"ws_path": "WebSocket path"
"username": "[%key:common::config_flow::data::username%]"
},
"data_description": {
"advanced_options": "Enable and select **Submit** to set advanced options.",
"broker": "The hostname or IP address of your MQTT broker.",
"certificate": "The custom CA certificate file to validate your MQTT broker's certificate.",
"client_cert": "The client certificate to authenticate against your MQTT broker.",
"client_id": "The unique ID to identify the Home Assistant MQTT API as MQTT client. It is recommended to leave this option blank.",
"client_key": "The private key file that belongs to your client certificate.",
"client_key_password": "The password for the private key file (if set).",
"keepalive": "A value less than 90 seconds is advised.",
"password": "The password to log in to your MQTT broker.",
"port": "The port your MQTT broker listens to. For example 1883.",
"protocol": "The MQTT protocol version that is used. Note that Home Assistant will silently change to version 5 if your broker supports it.",
"set_ca_cert": "Select **Auto** for automatic CA validation, or **Custom** and select **Next** to set a custom CA certificate, to allow validating your MQTT broker's certificate.",
"set_client_cert": "Enable and select **Next** to set a client certificate and private key to authenticate against your MQTT broker.",
"tls_insecure": "Option to ignore validation of your MQTT broker's certificate.",
"transport": "The transport to be used for the connection to your MQTT broker.",
"username": "The username to log in to your MQTT broker.",
"ws_headers": "The WebSocket headers to pass through the WebSocket-based connection to your MQTT broker.",
"ws_path": "The WebSocket path to be used for the connection to your MQTT broker."
"username": "The username to log in to your MQTT broker."
},
"description": "Please enter the connection information of your MQTT broker."
"description": "Please enter the connection information of your MQTT broker.",
"sections": {
"other_settings": {
"data": {
"certificate": "Upload custom CA certificate file",
"client_cert": "Upload client certificate file",
"client_id": "Client ID (leave empty for a randomly generated one)",
"client_key": "Upload private key file",
"client_key_password": "[%key:common::config_flow::data::password%]",
"keepalive": "The time between sending keep alive messages",
"set_ca_cert": "Broker certificate validation",
"set_client_cert": "Use a client certificate",
"tls_insecure": "Ignore broker certificate validation",
"transport": "MQTT transport",
"ws_headers": "WebSocket headers in JSON format",
"ws_path": "WebSocket path"
},
"data_description": {
"certificate": "The custom CA certificate file to validate your MQTT broker's certificate.",
"client_cert": "The client certificate to authenticate against your MQTT broker.",
"client_id": "The unique ID to identify the Home Assistant MQTT API as MQTT client. It is recommended to leave this option blank.",
"client_key": "The private key file that belongs to your client certificate.",
"client_key_password": "The password for the private key file (if set).",
"keepalive": "A value less than 90 seconds is advised. Defaults to 60 seconds.",
"set_ca_cert": "When already set to **Custom**, a custom CA validation certificate is configured. Select **Auto** for automatic CA validation, or upload a custom CA certificate, to allow validating your MQTT broker's certificate.",
"set_client_cert": "When already selected, client certificate authentication is enabled. Upload a client certificate and key to enable.",
"tls_insecure": "Option to ignore validation of your MQTT broker's certificate.",
"transport": "The transport to be used for the connection to your MQTT broker.",
"ws_headers": "The WebSocket headers to pass through the WebSocket-based connection to your MQTT broker.",
"ws_path": "The WebSocket path to be used for the connection to your MQTT broker."
},
"name": "Other settings"
}
}
},
"hassio_confirm": {
"description": "Do you want to configure Home Assistant to connect to the MQTT broker provided by the {addon} app?",
@@ -1178,48 +1185,6 @@
"invalid_inclusion": "[%key:component::mqtt::config::error::invalid_inclusion%]"
},
"step": {
"broker": {
"data": {
"advanced_options": "[%key:component::mqtt::config::step::broker::data::advanced_options%]",
"broker": "[%key:component::mqtt::config::step::broker::data::broker%]",
"certificate": "[%key:component::mqtt::config::step::broker::data::certificate%]",
"client_cert": "[%key:component::mqtt::config::step::broker::data::client_cert%]",
"client_id": "[%key:component::mqtt::config::step::broker::data::client_id%]",
"client_key": "[%key:component::mqtt::config::step::broker::data::client_key%]",
"keepalive": "[%key:component::mqtt::config::step::broker::data::keepalive%]",
"password": "[%key:common::config_flow::data::password%]",
"port": "[%key:common::config_flow::data::port%]",
"protocol": "[%key:component::mqtt::config::step::broker::data::protocol%]",
"set_ca_cert": "[%key:component::mqtt::config::step::broker::data::set_ca_cert%]",
"set_client_cert": "[%key:component::mqtt::config::step::broker::data::set_client_cert%]",
"tls_insecure": "[%key:component::mqtt::config::step::broker::data::tls_insecure%]",
"transport": "[%key:component::mqtt::config::step::broker::data::transport%]",
"username": "[%key:common::config_flow::data::username%]",
"ws_headers": "[%key:component::mqtt::config::step::broker::data::ws_headers%]",
"ws_path": "[%key:component::mqtt::config::step::broker::data::ws_path%]"
},
"data_description": {
"advanced_options": "[%key:component::mqtt::config::step::broker::data_description::advanced_options%]",
"broker": "[%key:component::mqtt::config::step::broker::data_description::broker%]",
"certificate": "[%key:component::mqtt::config::step::broker::data_description::certificate%]",
"client_cert": "[%key:component::mqtt::config::step::broker::data_description::client_cert%]",
"client_id": "[%key:component::mqtt::config::step::broker::data_description::client_id%]",
"client_key": "[%key:component::mqtt::config::step::broker::data_description::client_key%]",
"keepalive": "[%key:component::mqtt::config::step::broker::data_description::keepalive%]",
"password": "[%key:component::mqtt::config::step::broker::data_description::password%]",
"port": "[%key:component::mqtt::config::step::broker::data_description::port%]",
"protocol": "[%key:component::mqtt::config::step::broker::data_description::protocol%]",
"set_ca_cert": "[%key:component::mqtt::config::step::broker::data_description::set_ca_cert%]",
"set_client_cert": "[%key:component::mqtt::config::step::broker::data_description::set_client_cert%]",
"tls_insecure": "[%key:component::mqtt::config::step::broker::data_description::tls_insecure%]",
"transport": "[%key:component::mqtt::config::step::broker::data_description::transport%]",
"username": "[%key:component::mqtt::config::step::broker::data_description::username%]",
"ws_headers": "[%key:component::mqtt::config::step::broker::data_description::ws_headers%]",
"ws_path": "[%key:component::mqtt::config::step::broker::data_description::ws_path%]"
},
"description": "[%key:component::mqtt::config::step::broker::description%]",
"title": "Broker options"
},
"options": {
"data": {
"birth_enable": "Enable birth message",
+1 -1
View File
@@ -6,8 +6,8 @@ import logging
from typing import Any
import ollama
from probatio import to_openapi as convert
import voluptuous as vol
from voluptuous_openapi import convert
from homeassistant.components import conversation
from homeassistant.config_entries import ConfigSubentry
@@ -22,8 +22,8 @@ from openai.types.chat import (
from openai.types.chat.chat_completion_message_function_tool_call_param import Function
from openai.types.shared_params import FunctionDefinition, ResponseFormatJSONSchema
from openai.types.shared_params.response_format_json_schema import JSONSchema
from probatio import to_openapi as convert
import voluptuous as vol
from voluptuous_openapi import convert
from homeassistant.components import conversation
from homeassistant.config_entries import ConfigSubentry
@@ -6,8 +6,8 @@ import logging
from typing import Any, override
import openai
from probatio import to_openapi as convert
import voluptuous as vol
from voluptuous_openapi import convert
from homeassistant.components.zone import ENTITY_ID_HOME
from homeassistant.config_entries import (
@@ -56,8 +56,8 @@ from openai.types.responses.tool_param import (
ImageGeneration,
)
from openai.types.responses.web_search_tool_param import UserLocation
from probatio import to_openapi as convert
import voluptuous as vol
from voluptuous_openapi import convert
from homeassistant.components import conversation
from homeassistant.config_entries import ConfigSubentry
@@ -18,7 +18,7 @@ from openai.types.chat import (
)
from openai.types.chat.chat_completion_message_function_tool_call_param import Function
from openai.types.shared_params import FunctionDefinition
from probatio import to_openapi as convert
from voluptuous_openapi import convert
from homeassistant.components import conversation
from homeassistant.config_entries import ConfigSubentry
@@ -7,6 +7,7 @@ from typing import TYPE_CHECKING, Any, override
from pushbullet import PushBullet, PushError
from pushbullet.channel import Channel
from pushbullet.device import Device
import voluptuous as vol
from homeassistant.components.notify import (
ATTR_DATA,
@@ -143,7 +144,7 @@ class PushBulletNotificationService(BaseNotificationService):
raise ValueError("Cannot send an empty file")
kwargs.update(filedata)
pusher.push_file(**kwargs)
elif file_url := data.get(ATTR_FILE_URL):
elif (file_url := data.get(ATTR_FILE_URL)) and vol.Url(file_url):
pusher.push_file(
file_name=file_url,
file_url=file_url,
@@ -6,7 +6,6 @@ from typing import TYPE_CHECKING, Any, Literal, override
from aiohttp import web
import voluptuous as vol
from voluptuous.humanize import humanize_error
from homeassistant.auth.models import RefreshToken, User
from homeassistant.core import Context, HomeAssistant, callback
@@ -296,7 +295,7 @@ class ActiveConnection:
err_message = "Unauthorized"
elif isinstance(err, vol.Invalid):
code = const.ERR_INVALID_FORMAT
err_message = humanize_error(msg, err)
err_message = vol.humanize.humanize_error(msg, err)
elif isinstance(err, TimeoutError):
code = const.ERR_TIMEOUT
err_message = "Timeout"
@@ -779,7 +779,7 @@ async def websocket_device_cluster_commands(
hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any]
) -> None:
"""Return a list of cluster commands."""
from probatio import serialize # noqa: PLC0415
import voluptuous_serialize # noqa: PLC0415
zha_gateway = get_zha_gateway(hass)
ieee: EUI64 = msg[ATTR_IEEE]
@@ -801,7 +801,7 @@ async def websocket_device_cluster_commands(
TYPE: CLIENT,
ID: cmd_id,
ATTR_NAME: cmd.name,
"schema": serialize(
"schema": voluptuous_serialize.convert(
cluster_command_schema_to_vol_schema(cmd.schema),
custom_serializer=cv.custom_serializer,
),
@@ -813,7 +813,7 @@ async def websocket_device_cluster_commands(
TYPE: CLUSTER_COMMAND_SERVER,
ID: cmd_id,
ATTR_NAME: cmd.name,
"schema": serialize(
"schema": voluptuous_serialize.convert(
cluster_command_schema_to_vol_schema(cmd.schema),
custom_serializer=cv.custom_serializer,
),
@@ -1087,14 +1087,16 @@ async def websocket_get_configuration(
) -> None:
"""Get ZHA configuration."""
config_entry: ConfigEntry = get_config_entry(hass)
from probatio import serialize # noqa: PLC0415
import voluptuous_serialize # noqa: PLC0415
def custom_serializer(schema: Any) -> Any:
"""Serialize additional types for the field-list serializer."""
"""Serialize additional types for voluptuous_serialize."""
if schema is cv_boolean:
return {"type": "bool"}
if schema is vol.Schema:
return serialize(schema, custom_serializer=custom_serializer)
return voluptuous_serialize.convert(
schema, custom_serializer=custom_serializer
)
return cv.custom_serializer(schema)
@@ -1104,7 +1106,7 @@ async def websocket_get_configuration(
hass, IasAce.cluster_id
):
continue
data["schemas"][section] = serialize(
data["schemas"][section] = voluptuous_serialize.convert(
schema, custom_serializer=custom_serializer
)
data["data"][section] = config_entry.options.get(CUSTOM_CONFIGURATION, {}).get(
+1 -1
View File
@@ -478,7 +478,7 @@ def stringify_invalid(
if annotation := find_annotation(config, exc.path):
message_prefix += f" at {_relpath(hass, annotation[0])}, line {annotation[1]}"
path = "->".join(str(m) for m in exc.path)
if exc.code == "extra_keys_not_allowed":
if exc.error_message == "extra keys not allowed":
return (
f"{message_prefix}: '{exc.path[-1]}' is an invalid option for '{domain}', "
f"check: {path}{message_suffix}"
-3
View File
@@ -527,9 +527,6 @@ class HomeAssistant:
self._tasks,
)
# Allow automations to set up the start triggers before changing state
await asyncio.sleep(0)
if self.state is not CoreState.starting:
_LOGGER.warning(
"Home Assistant startup has been interrupted. "
+3 -3
View File
@@ -23,8 +23,8 @@ from typing import TYPE_CHECKING, Any, cast, overload
from urllib.parse import urlparse
from uuid import UUID
from probatio import UNSUPPORTED, serialize
import voluptuous as vol
import voluptuous_serialize
from homeassistant.const import (
ATTR_AREA_ID,
@@ -1188,7 +1188,7 @@ def _custom_serializer(schema: Any, *, allow_section: bool) -> Any:
raise ValueError("Nesting expandable sections is not supported")
return {
"type": "expandable",
"schema": serialize(
"schema": voluptuous_serialize.convert(
schema.schema,
custom_serializer=functools.partial(
_custom_serializer, allow_section=False
@@ -1203,7 +1203,7 @@ def _custom_serializer(schema: Any, *, allow_section: bool) -> Any:
if isinstance(schema, selector.Selector):
return schema.serialize()
return UNSUPPORTED
return voluptuous_serialize.UNSUPPORTED
# Schemas
+2 -2
View File
@@ -4,8 +4,8 @@ from http import HTTPStatus
from typing import Any, Generic, TypeVar
from aiohttp import web
from probatio import serialize
import voluptuous as vol
import voluptuous_serialize
from homeassistant import data_entry_flow
from homeassistant.components.http import HomeAssistantView
@@ -47,7 +47,7 @@ class _BaseFlowManagerView(HomeAssistantView, Generic[_FlowManagerT]):
if (schema := result["data_schema"]) is None:
data["data_schema"] = []
else:
data["data_schema"] = serialize(
data["data_schema"] = voluptuous_serialize.convert(
schema, custom_serializer=cv.custom_serializer
)
return data
+1 -1
View File
@@ -10,9 +10,9 @@ from functools import cache, partial
from operator import attrgetter
from typing import Any, cast, override
from probatio import UNSUPPORTED, to_openapi as convert
import slugify as unicode_slug
import voluptuous as vol
from voluptuous_openapi import UNSUPPORTED, convert
from homeassistant.components.calendar import (
DOMAIN as CALENDAR_DOMAIN,
+4 -2
View File
@@ -51,7 +51,6 @@ orjson==3.11.9
packaging>=23.1
paho-mqtt==2.1.0
Pillow==12.2.0
probatio==0.5.4
propcache==0.5.2
psutil-home-assistant==0.0.1
PyJWT==2.12.1
@@ -71,7 +70,10 @@ standard-telnetlib==3.13.0
typing-extensions>=4.15.0,<5.0
ulid-transform==2.2.9
urllib3>=2.0
uv==0.11.21
uv==0.11.25
voluptuous-openapi==0.4.1
voluptuous-serialize==2.7.0
voluptuous==0.15.2
webrtc-models==0.3.0
yarl==1.24.2
zeroconf==0.150.0
Generated
-1
View File
@@ -5,7 +5,6 @@
[mypy]
python_version = 3.14
platform = linux
mypy_path = stubs
plugins = pydantic.mypy
show_error_codes = true
follow_imports = normal
+5 -2
View File
@@ -74,8 +74,10 @@ dependencies = [
"typing-extensions>=4.15.0,<5.0",
"ulid-transform==2.2.9",
"urllib3>=2.0",
"uv==0.11.21",
"probatio==0.5.4",
"uv==0.11.25",
"voluptuous==0.15.2",
"voluptuous-serialize==2.7.0",
"voluptuous-openapi==0.4.1",
"yarl==1.24.2",
"webrtc-models==0.3.0",
"zeroconf==0.150.0",
@@ -783,6 +785,7 @@ ignore = [
]
[tool.ruff.lint.flake8-import-conventions.extend-aliases]
voluptuous = "vol"
"homeassistant.components.air_quality.PLATFORM_SCHEMA" = "AIR_QUALITY_PLATFORM_SCHEMA"
"homeassistant.components.alarm_control_panel.PLATFORM_SCHEMA" = "ALARM_CONTROL_PANEL_PLATFORM_SCHEMA"
"homeassistant.components.binary_sensor.PLATFORM_SCHEMA" = "BINARY_SENSOR_PLATFORM_SCHEMA"
+4 -2
View File
@@ -37,7 +37,6 @@ mutagen==1.47.0
orjson==3.11.9
packaging>=23.1
Pillow==12.2.0
probatio==0.5.4
propcache==0.5.2
psutil-home-assistant==0.0.1
PyJWT==2.12.1
@@ -56,7 +55,10 @@ standard-telnetlib==3.13.0
typing-extensions>=4.15.0,<5.0
ulid-transform==2.2.9
urllib3>=2.0
uv==0.11.21
uv==0.11.25
voluptuous-openapi==0.4.1
voluptuous-serialize==2.7.0
voluptuous==0.15.2
webrtc-models==0.3.0
yarl==1.24.2
zeroconf==0.150.0
+2 -2
View File
@@ -1474,7 +1474,7 @@ libpyvivotek==0.6.1
librehardwaremonitor-api==1.11.1
# homeassistant.components.mikrotik
librouteros==3.2.1
librouteros==4.1.1
# homeassistant.components.soundtouch
libsoundtouch==0.8
@@ -2148,7 +2148,7 @@ pyegps==0.2.5
pyemoncms==0.1.3
# homeassistant.components.enphase_envoy
pyenphase==3.0.0
pyenphase==3.0.1
# homeassistant.components.envertech_evt800
pyenvertechevt800==0.2.4
+1 -1
View File
@@ -10,7 +10,7 @@
# ast-serialize is an internal mypy dependency
ast-serialize==0.3.0
astroid==4.0.4
coverage==7.14.2
coverage==7.14.3
freezegun==1.5.5
# librt is an internal mypy dependency
librt==0.11.0
-12
View File
@@ -1,13 +1 @@
"""Home Assistant scripts."""
import contextlib
# Scripts such as hassfest (and the libraries they import) use voluptuous. Alias it
# to probatio before anything imports it, the same as homeassistant/__init__.py does
# for the application itself. This must run before the first `import voluptuous`.
# Some scripts (for example check_requirements) run before dependencies are
# installed, so probatio may be absent; those scripts do not need the alias.
with contextlib.suppress(ImportError):
from probatio.compat import install_as_voluptuous
install_as_voluptuous()
-3
View File
@@ -31,9 +31,6 @@ HEADER: Final = """
GENERAL_SETTINGS: Final[dict[str, str]] = {
"python_version": ".".join(str(x) for x in REQUIRED_PYTHON_VER[:2]),
"platform": "linux",
# voluptuous is aliased to probatio at runtime; this stub path re-exports
# probatio's types under the voluptuous name for not-yet-migrated integrations.
"mypy_path": "stubs",
"plugins": ", ".join( # noqa: FLY002
[
"pydantic.mypy",
-1
View File
@@ -1 +0,0 @@
from probatio import * # noqa: F403
-2
View File
@@ -1,2 +0,0 @@
# See voluptuous/__init__.pyi. Re-exports probatio.error under the voluptuous name.
from probatio.error import * # noqa: F403
-5
View File
@@ -1,5 +0,0 @@
# See voluptuous/__init__.pyi. Re-exports probatio.humanize under the voluptuous name.
from probatio.humanize import (
MAX_VALIDATION_ERROR_ITEM_LENGTH as MAX_VALIDATION_ERROR_ITEM_LENGTH,
humanize_error as humanize_error,
)
+2 -2
View File
@@ -3,7 +3,7 @@
import asyncio
from unittest.mock import patch
from probatio import serialize
import voluptuous_serialize
from homeassistant import data_entry_flow
from homeassistant.auth import auth_manager_from_config, models as auth_models
@@ -251,7 +251,7 @@ async def test_setup_user_notify_service(hass: HomeAssistant) -> None:
schema = step["data_schema"]
schema({"notify_service": "test2"})
# ensure the schema can be serialized
assert serialize(schema) == [
assert voluptuous_serialize.convert(schema) == [
{
"name": "notify_service",
"options": [
+2 -2
View File
@@ -221,7 +221,7 @@ async def test_generate_data_service_structure_fields(
},
},
vol.Invalid,
r"not a valid option.*",
r"extra keys not allowed.*",
),
(
{
@@ -248,7 +248,7 @@ async def test_generate_data_service_structure_fields(
},
},
vol.Invalid,
r"not a valid option .*",
r"extra keys not allowed .*",
),
(
{
+10 -4
View File
@@ -1,8 +1,8 @@
"""The tests for Climate device actions."""
from probatio import serialize
import pytest
from pytest_unordered import unordered
import voluptuous_serialize
from homeassistant.components import automation
from homeassistant.components.climate import DOMAIN, HVACMode, const, device_action
@@ -398,7 +398,9 @@ async def test_capabilities(
assert capabilities and "extra_fields" in capabilities
assert (
serialize(capabilities["extra_fields"], custom_serializer=cv.custom_serializer)
voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
)
== expected_capabilities
)
@@ -514,7 +516,9 @@ async def test_capabilities_legacy(
assert capabilities and "extra_fields" in capabilities
assert (
serialize(capabilities["extra_fields"], custom_serializer=cv.custom_serializer)
voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
)
== expected_capabilities
)
@@ -552,6 +556,8 @@ async def test_capabilities_missing_entity(
assert capabilities and "extra_fields" in capabilities
assert (
serialize(capabilities["extra_fields"], custom_serializer=cv.custom_serializer)
voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
)
== expected_capabilities
)
@@ -1,8 +1,8 @@
"""The tests for Climate device conditions."""
from probatio import serialize
import pytest
from pytest_unordered import unordered
import voluptuous_serialize
from homeassistant.components import automation
from homeassistant.components.climate import DOMAIN, HVACMode, const, device_condition
@@ -424,7 +424,9 @@ async def test_capabilities(
assert capabilities and "extra_fields" in capabilities
assert (
serialize(capabilities["extra_fields"], custom_serializer=cv.custom_serializer)
voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
)
== expected_capabilities
)
@@ -541,7 +543,9 @@ async def test_capabilities_legacy(
assert capabilities and "extra_fields" in capabilities
assert (
serialize(capabilities["extra_fields"], custom_serializer=cv.custom_serializer)
voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
)
== expected_capabilities
)
@@ -580,6 +584,8 @@ async def test_capabilities_missing_entity(
assert capabilities and "extra_fields" in capabilities
assert (
serialize(capabilities["extra_fields"], custom_serializer=cv.custom_serializer)
voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
)
== expected_capabilities
)
@@ -1,8 +1,8 @@
"""The tests for Climate device triggers."""
from probatio import serialize
import pytest
from pytest_unordered import unordered
import voluptuous_serialize
from homeassistant.components import automation
from homeassistant.components.climate import (
@@ -332,7 +332,7 @@ async def test_get_trigger_capabilities_hvac_mode(hass: HomeAssistant) -> None:
)
assert capabilities and "extra_fields" in capabilities
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [
{
@@ -382,7 +382,7 @@ async def test_get_trigger_capabilities_temp_humid(
assert capabilities and "extra_fields" in capabilities
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [
{
@@ -3335,8 +3335,8 @@ async def test_flow_with_multiple_schema_errors_base(
assert data == {
"errors": {
"base": [
"not a valid option @ data['invalid']",
"not a valid option @ data['invalid_2']",
"extra keys not allowed @ data['invalid']",
"extra keys not allowed @ data['invalid_2']",
],
"latitude": "required key not provided",
}
@@ -1,8 +1,8 @@
"""The tests for Device Tracker device triggers."""
from probatio import serialize
import pytest
from pytest_unordered import unordered
import voluptuous_serialize
from homeassistant.components import automation, zone
from homeassistant.components.device_automation import DeviceAutomationType
@@ -327,7 +327,7 @@ async def test_get_trigger_capabilities(
)
assert capabilities and "extra_fields" in capabilities
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [
{
@@ -366,7 +366,7 @@ async def test_get_trigger_capabilities_legacy(
)
assert capabilities and "extra_fields" in capabilities
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [
{
+5 -5
View File
@@ -279,7 +279,7 @@ async def test_service_schema_validation(
) -> None:
"""Test easyEnergy service schema validation."""
with pytest.raises(vol.error.Error, match=error_message):
with pytest.raises(vol.er.Error, match=error_message):
await hass.services.async_call(
DOMAIN,
service,
@@ -307,7 +307,7 @@ async def test_service_schema_validation_vat(
) -> None:
"""Test easyEnergy service schema validation for VAT."""
with pytest.raises(vol.error.Error, match=error_message):
with pytest.raises(vol.er.Error, match=error_message):
await hass.services.async_call(
DOMAIN,
service,
@@ -341,7 +341,7 @@ async def test_service_schema_validation_usage_price_type(
) -> None:
"""Test usage service schema validation for price type."""
with pytest.raises(vol.error.Error, match=error_message):
with pytest.raises(vol.er.Error, match=error_message):
await hass.services.async_call(
DOMAIN,
service,
@@ -384,7 +384,7 @@ async def test_service_schema_validation_granularity(
if service == ENERGY_USAGE_SERVICE_NAME:
data["incl_vat"] = True
with pytest.raises(vol.error.Error, match=error_message):
with pytest.raises(vol.er.Error, match=error_message):
await hass.services.async_call(
DOMAIN,
service,
@@ -401,7 +401,7 @@ async def test_service_schema_validation_return_vat(
) -> None:
"""Test return prices do not accept VAT selection."""
with pytest.raises(vol.error.Error, match="not a valid option .+"):
with pytest.raises(vol.er.Error, match="extra keys not allowed .+"):
await hass.services.async_call(
DOMAIN,
ENERGY_RETURN_SERVICE_NAME,
+4 -4
View File
@@ -71,23 +71,23 @@ def config_entry_data(
@pytest.mark.parametrize(
("config_entry_data", "service_data", "error", "error_message"),
[
({}, {}, vol.error.Error, "required key not provided .+"),
({}, {}, vol.er.Error, "required key not provided .+"),
(
{"config_entry": True},
{},
vol.error.Error,
vol.er.Error,
"required key not provided .+",
),
(
{},
{"incl_vat": True},
vol.error.Error,
vol.er.Error,
"required key not provided .+",
),
(
{"config_entry": True},
{"incl_vat": "incorrect vat"},
vol.error.Error,
vol.er.Error,
"expected bool for dictionary value .+",
),
(
+1 -1
View File
@@ -547,7 +547,7 @@ async def test_themes_reload_themes(
"modes": {"light": {}, "dank": {}},
}
},
"not a valid option.*dank",
"extra keys not allowed.*dank",
None,
),
],
@@ -1,8 +1,8 @@
"""The tests for Humidifier device actions."""
from probatio import serialize
import pytest
from pytest_unordered import unordered
import voluptuous_serialize
from homeassistant.components import automation
from homeassistant.components.device_automation import DeviceAutomationType
@@ -489,7 +489,9 @@ async def test_capabilities(
assert capabilities and "extra_fields" in capabilities
assert (
serialize(capabilities["extra_fields"], custom_serializer=cv.custom_serializer)
voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
)
== expected_capabilities
)
@@ -631,7 +633,9 @@ async def test_capabilities_legacy(
assert capabilities and "extra_fields" in capabilities
assert (
serialize(capabilities["extra_fields"], custom_serializer=cv.custom_serializer)
voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
)
== expected_capabilities
)
@@ -670,6 +674,8 @@ async def test_capabilities_missing_entity(
assert capabilities and "extra_fields" in capabilities
assert (
serialize(capabilities["extra_fields"], custom_serializer=cv.custom_serializer)
voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
)
== expected_capabilities
)
@@ -1,8 +1,8 @@
"""The tests for Humidifier device conditions."""
from probatio import serialize
import pytest
from pytest_unordered import unordered
import voluptuous_serialize
from homeassistant.components import automation
from homeassistant.components.device_automation import DeviceAutomationType
@@ -485,7 +485,9 @@ async def test_capabilities(
assert capabilities and "extra_fields" in capabilities
assert (
serialize(capabilities["extra_fields"], custom_serializer=cv.custom_serializer)
voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
)
== expected_capabilities
)
@@ -657,7 +659,9 @@ async def test_capabilities_legacy(
assert capabilities and "extra_fields" in capabilities
assert (
serialize(capabilities["extra_fields"], custom_serializer=cv.custom_serializer)
voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
)
== expected_capabilities
)
@@ -696,6 +700,8 @@ async def test_capabilities_missing_entity(
assert capabilities and "extra_fields" in capabilities
assert (
serialize(capabilities["extra_fields"], custom_serializer=cv.custom_serializer)
voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
)
== expected_capabilities
)
@@ -2,9 +2,9 @@
import datetime
from probatio import serialize
import pytest
from pytest_unordered import unordered
import voluptuous_serialize
from homeassistant.components import automation
from homeassistant.components.device_automation import DeviceAutomationType
@@ -536,7 +536,7 @@ async def test_get_trigger_capabilities_on(hass: HomeAssistant) -> None:
assert capabilities and "extra_fields" in capabilities
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [
{
@@ -563,7 +563,7 @@ async def test_get_trigger_capabilities_off(hass: HomeAssistant) -> None:
assert capabilities and "extra_fields" in capabilities
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [
{
@@ -590,7 +590,7 @@ async def test_get_trigger_capabilities_humidity(hass: HomeAssistant) -> None:
assert capabilities and "extra_fields" in capabilities
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [
{
+1 -1
View File
@@ -4,8 +4,8 @@ from collections.abc import Callable
from typing import Any
from unittest.mock import AsyncMock, patch
from probatio import serialize as convert
import pytest
from voluptuous_serialize import convert
from homeassistant import config_entries
from homeassistant.components.insteon.config_flow import (
@@ -30,7 +30,7 @@
'current_address': '0.0.0',
'version': '0.0.0',
}),
'yaml_configuration_error': "not a valid option @ data['knx']['wrong_key']",
'yaml_configuration_error': "extra keys not allowed @ data['knx']['wrong_key']",
})
# ---
# name: test_diagnostic_redact[hass_config0]
+1 -1
View File
@@ -442,7 +442,7 @@ async def test_validate_entity(
assert res["success"], res
assert res["result"]["success"] is False
# This shall test that a required key of the second GroupSelect schema is missing
# and not yield the "not a valid option" error of the first GroupSelect Schema
# and not yield the "extra keys not allowed" error of the first GroupSelect Schema
assert res["result"]["errors"][0]["path"] == [
"data",
"knx",
+3 -3
View File
@@ -2,8 +2,8 @@
import logging
from probatio import serialize
import pytest
import voluptuous_serialize
from homeassistant.components import automation
from homeassistant.components.device_automation import (
@@ -298,7 +298,7 @@ async def test_get_trigger_capabilities(
)
assert capabilities and "extra_fields" in capabilities
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [
{
@@ -402,7 +402,7 @@ async def test_invalid_device_trigger(
)
assert (
"Unnamed automation failed to setup triggers and has been disabled: "
"not a valid option @ data['invalid']. Got None"
"extra keys not allowed @ data['invalid']. Got None"
in caplog.records[0].message
)
+1 -1
View File
@@ -2,9 +2,9 @@
from typing import Any
from probatio import serialize as convert
import pytest
import voluptuous as vol
from voluptuous_serialize import convert
from homeassistant.components.knx.const import ColorTempModes
from homeassistant.components.knx.storage.knx_selector import (
+1 -1
View File
@@ -338,6 +338,6 @@ async def test_invalid_trigger(
)
assert (
"Unnamed automation failed to setup triggers and has been disabled: "
"not a valid option @ data['invalid']. Got None"
"extra keys not allowed @ data['invalid']. Got None"
in caplog.records[0].message
)
+5 -5
View File
@@ -1,10 +1,10 @@
"""Tests for LCN device triggers."""
from probatio import serialize
from pypck.inputs import ModSendKeysHost, ModStatusAccessControl
from pypck.lcn_addr import LcnAddr
from pypck.lcn_defs import AccessControlPeriphery, KeyAction, SendKeyCommand
from pytest_unordered import unordered
import voluptuous_serialize
from homeassistant.components import automation
from homeassistant.components.device_automation import DeviceAutomationType
@@ -347,7 +347,7 @@ async def test_get_transponder_trigger_capabilities(
)
assert capabilities and "extra_fields" in capabilities
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [
{
@@ -379,7 +379,7 @@ async def test_get_fingerprint_trigger_capabilities(
)
assert capabilities and "extra_fields" in capabilities
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [
{
@@ -411,7 +411,7 @@ async def test_get_transmitter_trigger_capabilities(
)
assert capabilities and "extra_fields" in capabilities
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [
{
@@ -464,7 +464,7 @@ async def test_get_send_keys_trigger_capabilities(
)
assert capabilities and "extra_fields" in capabilities
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [
{
@@ -1379,8 +1379,7 @@ async def test_reload_after_invalid_config(
assert await mqtt_mock_entry()
assert hass.states.get("alarm_control_panel.test") is None
assert (
"not a valid option, did you mean 'availability_topic' or "
"'command_topic'? @ data['invalid_topic'] for "
"extra keys not allowed @ data['invalid_topic'] for "
"manually configured MQTT alarm_control_panel item, "
"in ?, line ? Got {'name': 'test', 'invalid_topic': 'test-topic'}"
in caplog.text
File diff suppressed because it is too large Load Diff
+1 -1
View File
@@ -906,7 +906,7 @@ async def test_setup_manual_mqtt_with_platform_key(
"""Test set up a manual MQTT item with a platform key."""
assert await mqtt_mock_entry()
assert (
"not a valid option @ data['platform']"
"extra keys not allowed @ data['platform']"
" for manually configured MQTT light item" in caplog.text
)
+1 -1
View File
@@ -879,7 +879,7 @@ async def test_update_with_bad_config_not_breaks_discovery(
# Update with bad identifier
async_fire_mqtt_message(hass, "homeassistant/tag/bla1/config", data2)
await hass.async_block_till_done()
assert "not a valid option @ data['device']['bad_key']" in caplog.text
assert "extra keys not allowed @ data['device']['bad_key']" in caplog.text
# Topic update
async_fire_mqtt_message(hass, "homeassistant/tag/bla1/config", data3)
@@ -1,8 +1,8 @@
"""The tests for Number device actions."""
from probatio import serialize
import pytest
from pytest_unordered import unordered
import voluptuous_serialize
from homeassistant.components import automation
from homeassistant.components.device_automation import DeviceAutomationType
@@ -264,7 +264,7 @@ async def test_capabilities(
assert capabilities and "extra_fields" in capabilities
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [{"name": "value", "required": True, "type": "float"}]
@@ -296,6 +296,6 @@ async def test_capabilities_legacy(
assert capabilities and "extra_fields" in capabilities
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [{"name": "value", "required": True, "type": "float"}]
@@ -1,8 +1,8 @@
"""The tests for Select device actions."""
from probatio import serialize
import pytest
from pytest_unordered import unordered
import voluptuous_serialize
from homeassistant.components import automation
from homeassistant.components.device_automation import DeviceAutomationType
@@ -329,7 +329,7 @@ async def test_get_action_capabilities(
capabilities = await async_get_action_capabilities(hass, config)
assert capabilities
assert "extra_fields" in capabilities
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [
{
@@ -349,7 +349,7 @@ async def test_get_action_capabilities(
capabilities = await async_get_action_capabilities(hass, config)
assert capabilities
assert "extra_fields" in capabilities
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [
{
@@ -370,7 +370,7 @@ async def test_get_action_capabilities(
capabilities = await async_get_action_capabilities(hass, config)
assert capabilities
assert "extra_fields" in capabilities
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [
{
@@ -386,7 +386,7 @@ async def test_get_action_capabilities(
capabilities = await async_get_action_capabilities(hass, config)
assert capabilities
assert "extra_fields" in capabilities
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [
{
@@ -431,7 +431,7 @@ async def test_get_action_capabilities_legacy(
capabilities = await async_get_action_capabilities(hass, config)
assert capabilities
assert "extra_fields" in capabilities
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [
{
@@ -451,7 +451,7 @@ async def test_get_action_capabilities_legacy(
capabilities = await async_get_action_capabilities(hass, config)
assert capabilities
assert "extra_fields" in capabilities
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [
{
@@ -472,7 +472,7 @@ async def test_get_action_capabilities_legacy(
capabilities = await async_get_action_capabilities(hass, config)
assert capabilities
assert "extra_fields" in capabilities
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [
{
@@ -488,7 +488,7 @@ async def test_get_action_capabilities_legacy(
capabilities = await async_get_action_capabilities(hass, config)
assert capabilities
assert "extra_fields" in capabilities
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [
{
@@ -1,8 +1,8 @@
"""The tests for Select device conditions."""
from probatio import serialize
import pytest
from pytest_unordered import unordered
import voluptuous_serialize
from homeassistant.components import automation
from homeassistant.components.device_automation import DeviceAutomationType
@@ -271,7 +271,7 @@ async def test_get_condition_capabilities(
capabilities = await async_get_condition_capabilities(hass, config)
assert capabilities
assert "extra_fields" in capabilities
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [
{
@@ -297,7 +297,7 @@ async def test_get_condition_capabilities(
capabilities = await async_get_condition_capabilities(hass, config)
assert capabilities
assert "extra_fields" in capabilities
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [
{
@@ -333,7 +333,7 @@ async def test_get_condition_capabilities_legacy(
capabilities = await async_get_condition_capabilities(hass, config)
assert capabilities
assert "extra_fields" in capabilities
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [
{
@@ -359,7 +359,7 @@ async def test_get_condition_capabilities_legacy(
capabilities = await async_get_condition_capabilities(hass, config)
assert capabilities
assert "extra_fields" in capabilities
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [
{
@@ -1,8 +1,8 @@
"""The tests for Select device triggers."""
from probatio import serialize
import pytest
from pytest_unordered import unordered
import voluptuous_serialize
from homeassistant.components import automation
from homeassistant.components.device_automation import DeviceAutomationType
@@ -306,7 +306,7 @@ async def test_get_trigger_capabilities(
capabilities = await async_get_trigger_capabilities(hass, config)
assert capabilities
assert "extra_fields" in capabilities
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [
{
@@ -340,7 +340,7 @@ async def test_get_trigger_capabilities(
capabilities = await async_get_trigger_capabilities(hass, config)
assert capabilities
assert "extra_fields" in capabilities
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [
{
@@ -382,7 +382,7 @@ async def test_get_trigger_capabilities_unknown(
capabilities = await async_get_trigger_capabilities(hass, config)
assert capabilities
assert "extra_fields" in capabilities
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [
{
@@ -426,7 +426,7 @@ async def test_get_trigger_capabilities_legacy(
capabilities = await async_get_trigger_capabilities(hass, config)
assert capabilities
assert "extra_fields" in capabilities
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [
{
@@ -460,7 +460,7 @@ async def test_get_trigger_capabilities_legacy(
capabilities = await async_get_trigger_capabilities(hass, config)
assert capabilities
assert "extra_fields" in capabilities
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [
{
@@ -140,7 +140,7 @@ def test_send_message_with_bad_data_throws_vol_error(
signal_notification_service.send_message(MESSAGE, data={"test": "test"})
assert "Sending signal message" in caplog.text
assert "not a valid option" in str(exc.value)
assert "extra keys not allowed" in str(exc.value)
def test_send_message_styled_with_bad_data_throws_vol_error(
+3 -3
View File
@@ -1,8 +1,8 @@
"""The tests for Text device actions."""
from probatio import serialize
import pytest
from pytest_unordered import unordered
import voluptuous_serialize
from homeassistant.components import automation
from homeassistant.components.device_automation import DeviceAutomationType
@@ -248,7 +248,7 @@ async def test_capabilities(
assert capabilities and "extra_fields" in capabilities
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [{"name": "value", "required": True, "type": "string"}]
@@ -270,6 +270,6 @@ async def test_capabilities_legacy(
assert capabilities and "extra_fields" in capabilities
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [{"name": "value", "required": True, "type": "string"}]
+2 -2
View File
@@ -3,8 +3,8 @@
import logging
from typing import Any
from probatio import serialize
import pytest
import voluptuous_serialize
from zigpy.application import ControllerApplication
from zigpy.types.basic import uint16_t
from zigpy.zcl.clusters import lighting
@@ -81,7 +81,7 @@ async def test_zcl_schema_conversions(hass: HomeAssistant) -> None:
"required": False,
},
]
vol_schema = serialize(
vol_schema = voluptuous_serialize.convert(
cluster_command_schema_to_vol_schema(command_schema),
custom_serializer=cv.custom_serializer,
)
@@ -2,8 +2,8 @@
from unittest.mock import patch
from probatio import serialize
import pytest
import voluptuous_serialize
from zwave_js_server.client import Client
from zwave_js_server.const import CommandClass
from zwave_js_server.model.node import Node
@@ -547,7 +547,7 @@ async def test_get_action_capabilities(
)
assert capabilities and "extra_fields" in capabilities
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [
{
@@ -605,7 +605,7 @@ async def test_get_action_capabilities(
("94", "Z-Wave Plus Info"),
]
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [
{
@@ -642,7 +642,7 @@ async def test_get_action_capabilities(
)
assert capabilities and "extra_fields" in capabilities
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [
{
@@ -675,7 +675,7 @@ async def test_get_action_capabilities(
)
assert capabilities and "extra_fields" in capabilities
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [
{
@@ -730,7 +730,7 @@ async def test_get_action_capabilities_lock_triggers(
)
assert capabilities and "extra_fields" in capabilities
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [{"type": "string", "name": "code_slot", "required": True}]
@@ -747,7 +747,7 @@ async def test_get_action_capabilities_lock_triggers(
)
assert capabilities and "extra_fields" in capabilities
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [
{"type": "string", "name": "code_slot", "required": True},
@@ -781,7 +781,7 @@ async def test_get_action_capabilities_meter_triggers(
)
assert capabilities and "extra_fields" in capabilities
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [{"type": "string", "name": "value", "optional": True, "required": False}]
@@ -2,9 +2,9 @@
from unittest.mock import patch
from probatio import serialize
import pytest
import voluptuous as vol
import voluptuous_serialize
from zwave_js_server.const import CommandClass
from zwave_js_server.event import Event
@@ -446,7 +446,7 @@ async def test_get_condition_capabilities_node_status(
},
)
assert capabilities and "extra_fields" in capabilities
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [
{
@@ -499,7 +499,7 @@ async def test_get_condition_capabilities_value(
("134", "Version"),
]
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [
{
@@ -543,7 +543,7 @@ async def test_get_condition_capabilities_config_parameter(
)
assert capabilities and "extra_fields" in capabilities
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [
{
@@ -574,7 +574,7 @@ async def test_get_condition_capabilities_config_parameter(
)
assert capabilities and "extra_fields" in capabilities
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [
{
@@ -2,9 +2,9 @@
from unittest.mock import patch
from probatio import serialize
import pytest
from pytest_unordered import unordered
import voluptuous_serialize
from zwave_js_server.const import CommandClass
from zwave_js_server.event import Event
from zwave_js_server.model.node import Node
@@ -197,7 +197,7 @@ async def test_get_trigger_capabilities_notification_notification(
)
assert capabilities and "extra_fields" in capabilities
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == unordered(
[
@@ -337,7 +337,7 @@ async def test_get_trigger_capabilities_entry_control_notification(
)
assert capabilities and "extra_fields" in capabilities
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == unordered(
[
@@ -589,7 +589,7 @@ async def test_get_trigger_capabilities_node_status(
)
assert capabilities and "extra_fields" in capabilities
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [
{
@@ -797,7 +797,7 @@ async def test_get_trigger_capabilities_basic_value_notification(
)
assert capabilities and "extra_fields" in capabilities
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [
{
@@ -993,7 +993,7 @@ async def test_get_trigger_capabilities_central_scene_value_notification(
)
assert capabilities and "extra_fields" in capabilities
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [
{
@@ -1186,7 +1186,7 @@ async def test_get_trigger_capabilities_scene_activation_value_notification(
)
assert capabilities and "extra_fields" in capabilities
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [
{
@@ -1423,7 +1423,7 @@ async def test_get_trigger_capabilities_value_updated_value(
)
assert capabilities and "extra_fields" in capabilities
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [
{
@@ -1583,7 +1583,7 @@ async def test_get_trigger_capabilities_value_updated_config_parameter_range(
)
assert capabilities and "extra_fields" in capabilities
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [
{
@@ -1633,7 +1633,7 @@ async def test_get_trigger_capabilities_value_updated_config_parameter_enumerate
)
assert capabilities and "extra_fields" in capabilities
assert serialize(
assert voluptuous_serialize.convert(
capabilities["extra_fields"], custom_serializer=cv.custom_serializer
) == [
{
+1 -2
View File
@@ -84,8 +84,7 @@ async def test_bad_core_config(hass: HomeAssistant) -> None:
error = CheckConfigError(
(
f"Invalid config for 'homeassistant' at {YAML_CONFIG_FILE}, line 2:"
" expected 'metric' or 'us_customary' or 'imperial' for dictionary"
" value 'unit_system', got 'bad'"
" not a valid value for dictionary value 'unit_system', got 'bad'"
),
"homeassistant",
{"unit_system": "bad"},
+12
View File
@@ -2,7 +2,19 @@
import contextlib
from astroid import nodes
from pylint.checkers import BaseChecker
from pylint.testutils.unittest_linter import UnittestLinter
from pylint.utils.ast_walker import ASTWalker
def walk_checker(
linter: UnittestLinter, checker: BaseChecker, node: nodes.NodeNG
) -> None:
"""Run the given checker over the parsed node."""
walker = ASTWalker(linter)
walker.add_checker(checker)
walker.walk(node)
@contextlib.contextmanager
@@ -2,13 +2,12 @@
import astroid
from pylint.testutils import UnittestLinter
from pylint.utils.ast_walker import ASTWalker
from pylint_home_assistant.checkers.actions.service_registration import (
ServiceRegistrationChecker,
)
import pytest
from tests.pylint import assert_no_messages
from tests.pylint import assert_no_messages, walk_checker
@pytest.fixture(name="registration_checker")
@@ -68,11 +67,9 @@ def test_no_warning(
) -> None:
"""Test cases that should not trigger a warning."""
root_node = astroid.parse(code, module_name)
walker = ASTWalker(linter)
walker.add_checker(registration_checker)
with assert_no_messages(linter):
walker.walk(root_node)
walk_checker(linter, registration_checker, root_node)
def test_hass_services_register_flagged(
@@ -87,9 +84,7 @@ async def async_setup_entry(hass, entry):
""",
"homeassistant.components.test_integration",
)
walker = ASTWalker(linter)
walker.add_checker(registration_checker)
walker.walk(root_node)
walk_checker(linter, registration_checker, root_node)
messages = linter.release_messages()
assert len(messages) == 1
@@ -108,9 +103,7 @@ async def async_setup_entry(hass, entry):
""",
"homeassistant.components.test_integration",
)
walker = ASTWalker(linter)
walker.add_checker(registration_checker)
walker.walk(root_node)
walk_checker(linter, registration_checker, root_node)
messages = linter.release_messages()
assert len(messages) == 1
@@ -128,9 +121,7 @@ async def async_setup_entry(hass, entry):
""",
"homeassistant.components.test_integration",
)
walker = ASTWalker(linter)
walker.add_checker(registration_checker)
walker.walk(root_node)
walk_checker(linter, registration_checker, root_node)
messages = linter.release_messages()
assert len(messages) == 1
@@ -148,9 +139,7 @@ async def async_setup_entry(hass, entry):
""",
"homeassistant.components.test_integration",
)
walker = ASTWalker(linter)
walker.add_checker(registration_checker)
walker.walk(root_node)
walk_checker(linter, registration_checker, root_node)
messages = linter.release_messages()
assert len(messages) == 1
@@ -170,9 +159,7 @@ async def async_setup_entry(hass, entry):
""",
"homeassistant.components.test_integration",
)
walker = ASTWalker(linter)
walker.add_checker(registration_checker)
walker.walk(root_node)
walk_checker(linter, registration_checker, root_node)
messages = linter.release_messages()
assert len(messages) == 3
@@ -193,9 +180,7 @@ async def async_setup_entry(hass, entry):
""",
"homeassistant.components.test_integration",
)
walker = ASTWalker(linter)
walker.add_checker(registration_checker)
walker.walk(root_node)
walk_checker(linter, registration_checker, root_node)
messages = linter.release_messages()
assert len(messages) == 1
@@ -219,8 +204,6 @@ async def async_setup_entry(hass, entry):
""",
"homeassistant.components.test_integration",
)
walker = ASTWalker(linter)
walker.add_checker(registration_checker)
with assert_no_messages(linter):
walker.walk(root_node)
walk_checker(linter, registration_checker, root_node)
@@ -2,13 +2,12 @@
import astroid
from pylint.testutils import UnittestLinter
from pylint.utils.ast_walker import ASTWalker
from pylint_home_assistant.checkers.actions.swallowed_exceptions import (
SwallowedActionExceptionsChecker,
)
import pytest
from tests.pylint import assert_no_messages
from tests.pylint import assert_no_messages, walk_checker
@pytest.fixture(name="error_propagation_checker")
@@ -129,11 +128,9 @@ def test_no_warning(
) -> None:
"""Test cases that should not trigger a warning."""
root_node = astroid.parse(code, "homeassistant.components.test_integration.switch")
walker = ASTWalker(linter)
walker.add_checker(error_propagation_checker)
with assert_no_messages(linter):
walker.walk(root_node)
walk_checker(linter, error_propagation_checker, root_node)
def test_log_only_flagged(
@@ -152,9 +149,7 @@ class MySwitch(SwitchEntity):
""",
"homeassistant.components.test_integration.switch",
)
walker = ASTWalker(linter)
walker.add_checker(error_propagation_checker)
walker.walk(root_node)
walk_checker(linter, error_propagation_checker, root_node)
messages = linter.release_messages()
assert len(messages) == 1
@@ -179,9 +174,7 @@ class MySwitch(SwitchEntity):
""",
"homeassistant.components.test_integration.switch",
)
walker = ASTWalker(linter)
walker.add_checker(error_propagation_checker)
walker.walk(root_node)
walk_checker(linter, error_propagation_checker, root_node)
messages = linter.release_messages()
assert len(messages) == 1
@@ -203,9 +196,7 @@ class MySwitch(SwitchEntity):
""",
"homeassistant.components.test_integration.switch",
)
walker = ASTWalker(linter)
walker.add_checker(error_propagation_checker)
walker.walk(root_node)
walk_checker(linter, error_propagation_checker, root_node)
messages = linter.release_messages()
assert len(messages) == 1
@@ -229,9 +220,7 @@ class MySwitch(SwitchEntity):
""",
"homeassistant.components.test_integration.switch",
)
walker = ASTWalker(linter)
walker.add_checker(error_propagation_checker)
walker.walk(root_node)
walk_checker(linter, error_propagation_checker, root_node)
messages = linter.release_messages()
assert len(messages) == 1
@@ -255,9 +244,7 @@ class MySwitch(SwitchEntity):
""",
"homeassistant.components.test_integration.switch",
)
walker = ASTWalker(linter)
walker.add_checker(error_propagation_checker)
walker.walk(root_node)
walk_checker(linter, error_propagation_checker, root_node)
messages = linter.release_messages()
assert len(messages) == 1
@@ -280,9 +267,7 @@ class MySwitch(SwitchEntity):
""",
"homeassistant.components.test_integration.switch",
)
walker = ASTWalker(linter)
walker.add_checker(error_propagation_checker)
walker.walk(root_node)
walk_checker(linter, error_propagation_checker, root_node)
messages = linter.release_messages()
assert len(messages) == 1
@@ -304,9 +289,7 @@ class MySwitch(SwitchEntity):
""",
"homeassistant.components.test_integration.switch",
)
walker = ASTWalker(linter)
walker.add_checker(error_propagation_checker)
walker.walk(root_node)
walk_checker(linter, error_propagation_checker, root_node)
messages = linter.release_messages()
assert len(messages) == 1
@@ -334,9 +317,7 @@ class MySwitch(SwitchEntity):
""",
"homeassistant.components.test_integration.switch",
)
walker = ASTWalker(linter)
walker.add_checker(error_propagation_checker)
walker.walk(root_node)
walk_checker(linter, error_propagation_checker, root_node)
messages = linter.release_messages()
assert len(messages) == 2
@@ -356,9 +337,7 @@ class MySwitch(SwitchEntity):
""",
"homeassistant.components.test_integration.switch",
)
walker = ASTWalker(linter)
walker.add_checker(error_propagation_checker)
walker.walk(root_node)
walk_checker(linter, error_propagation_checker, root_node)
messages = linter.release_messages()
assert len(messages) == 1
@@ -379,11 +358,9 @@ class MySwitch(SwitchEntity):
""",
"homeassistant.components.test_integration.switch",
)
walker = ASTWalker(linter)
walker.add_checker(error_propagation_checker)
with assert_no_messages(linter):
walker.walk(root_node)
walk_checker(linter, error_propagation_checker, root_node)
def test_decorator_swallows_flagged(
@@ -408,9 +385,7 @@ class MySwitch(SwitchEntity):
""",
"homeassistant.components.test_integration.switch",
)
walker = ASTWalker(linter)
walker.add_checker(error_propagation_checker)
walker.walk(root_node)
walk_checker(linter, error_propagation_checker, root_node)
messages = linter.release_messages()
assert len(messages) == 1
@@ -441,9 +416,7 @@ class MySwitch(SwitchEntity):
""",
"homeassistant.components.test_integration.switch",
)
walker = ASTWalker(linter)
walker.add_checker(error_propagation_checker)
walker.walk(root_node)
walk_checker(linter, error_propagation_checker, root_node)
messages = linter.release_messages()
assert len(messages) == 1
@@ -471,11 +444,9 @@ class MySwitch(SwitchEntity):
""",
"homeassistant.components.test_integration.switch",
)
walker = ASTWalker(linter)
walker.add_checker(error_propagation_checker)
with assert_no_messages(linter):
walker.walk(root_node)
walk_checker(linter, error_propagation_checker, root_node)
def test_custom_service_method_flagged(
@@ -502,9 +473,7 @@ class MyFan(FanEntity):
""",
"homeassistant.components.test_integration.fan",
)
walker = ASTWalker(linter)
walker.add_checker(error_propagation_checker)
walker.walk(root_node)
walk_checker(linter, error_propagation_checker, root_node)
messages = linter.release_messages()
assert len(messages) == 1
@@ -535,11 +504,9 @@ class MyFan(FanEntity):
""",
"homeassistant.components.test_integration.fan",
)
walker = ASTWalker(linter)
walker.add_checker(error_propagation_checker)
with assert_no_messages(linter):
walker.walk(root_node)
walk_checker(linter, error_propagation_checker, root_node)
def test_unregistered_custom_method_ignored(
@@ -558,11 +525,9 @@ class MyFan(FanEntity):
""",
"homeassistant.components.test_integration.fan",
)
walker = ASTWalker(linter)
walker.add_checker(error_propagation_checker)
with assert_no_messages(linter):
walker.walk(root_node)
walk_checker(linter, error_propagation_checker, root_node)
def test_standalone_service_handler_flagged(
@@ -583,9 +548,7 @@ async def _handle_do_thing(call):
""",
"homeassistant.components.test_integration.services",
)
walker = ASTWalker(linter)
walker.add_checker(error_propagation_checker)
walker.walk(root_node)
walk_checker(linter, error_propagation_checker, root_node)
messages = linter.release_messages()
assert len(messages) == 1
@@ -610,9 +573,7 @@ async def _handle_reset(call):
""",
"homeassistant.components.test_integration.services",
)
walker = ASTWalker(linter)
walker.add_checker(error_propagation_checker)
walker.walk(root_node)
walk_checker(linter, error_propagation_checker, root_node)
messages = linter.release_messages()
assert len(messages) == 1
@@ -637,11 +598,9 @@ async def _handle_do_thing(call):
""",
"homeassistant.components.test_integration.services",
)
walker = ASTWalker(linter)
walker.add_checker(error_propagation_checker)
with assert_no_messages(linter):
walker.walk(root_node)
walk_checker(linter, error_propagation_checker, root_node)
def test_not_integration_module_ignored(
@@ -660,8 +619,6 @@ class MySwitch(SwitchEntity):
""",
"tests.components.test_integration.test_switch",
)
walker = ASTWalker(linter)
walker.add_checker(error_propagation_checker)
with assert_no_messages(linter):
walker.walk(root_node)
walk_checker(linter, error_propagation_checker, root_node)
+6 -19
View File
@@ -8,10 +8,9 @@ from pathlib import Path
import astroid
from pylint.checkers import BaseChecker
from pylint.testutils.unittest_linter import UnittestLinter
from pylint.utils.ast_walker import ASTWalker
import pytest
from tests.pylint import assert_no_messages
from tests.pylint import assert_no_messages, walk_checker
@pytest.mark.parametrize(
@@ -62,11 +61,9 @@ def test_enforce_config_flow_no_name(
) -> None:
"""Good test cases."""
root_node = astroid.parse(code, module_name)
walker = ASTWalker(linter)
walker.add_checker(enforce_config_flow_no_name_checker)
with assert_no_messages(linter):
walker.walk(root_node)
walk_checker(linter, enforce_config_flow_no_name_checker, root_node)
@pytest.mark.parametrize(
@@ -110,10 +107,8 @@ def test_enforce_config_flow_no_name_bad(
) -> None:
"""Bad test cases."""
root_node = astroid.parse(code, module_name)
walker = ASTWalker(linter)
walker.add_checker(enforce_config_flow_no_name_checker)
walker.walk(root_node)
walk_checker(linter, enforce_config_flow_no_name_checker, root_node)
messages = linter.release_messages()
assert len(messages) == 1
assert messages[0].msg_id == "home-assistant-config-flow-name-field"
@@ -134,11 +129,9 @@ def test_enforce_config_flow_no_name_subentry_flow(
)
"""
root_node = astroid.parse(code, "homeassistant.components.test.config_flow")
walker = ASTWalker(linter)
walker.add_checker(enforce_config_flow_no_name_checker)
with assert_no_messages(linter):
walker.walk(root_node)
walk_checker(linter, enforce_config_flow_no_name_checker, root_node)
def test_enforce_config_flow_no_name_helper_integration(
@@ -160,11 +153,8 @@ def test_enforce_config_flow_no_name_helper_integration(
root_node = astroid.parse(code, "homeassistant.components.my_helper.config_flow")
root_node.file = str(integration_dir / "config_flow.py")
walker = ASTWalker(linter)
walker.add_checker(enforce_config_flow_no_name_checker)
with assert_no_messages(linter):
walker.walk(root_node)
walk_checker(linter, enforce_config_flow_no_name_checker, root_node)
def test_enforce_config_flow_no_name_non_helper_integration(
@@ -184,10 +174,7 @@ def test_enforce_config_flow_no_name_non_helper_integration(
root_node = astroid.parse(code, "homeassistant.components.my_device.config_flow")
root_node.file = str(integration_dir / "config_flow.py")
walker = ASTWalker(linter)
walker.add_checker(enforce_config_flow_no_name_checker)
walker.walk(root_node)
walk_checker(linter, enforce_config_flow_no_name_checker, root_node)
messages = linter.release_messages()
assert len(messages) == 1
assert messages[0].msg_id == "home-assistant-config-flow-name-field"
+3 -8
View File
@@ -3,10 +3,9 @@
import astroid
from pylint.checkers import BaseChecker
from pylint.testutils.unittest_linter import UnittestLinter
from pylint.utils.ast_walker import ASTWalker
import pytest
from tests.pylint import assert_no_messages
from tests.pylint import assert_no_messages, walk_checker
@pytest.mark.parametrize(
@@ -71,11 +70,9 @@ def test_enforce_config_flow_no_polling(
) -> None:
"""Good test cases."""
root_node = astroid.parse(code, module_name)
walker = ASTWalker(linter)
walker.add_checker(enforce_config_flow_no_polling_checker)
with assert_no_messages(linter):
walker.walk(root_node)
walk_checker(linter, enforce_config_flow_no_polling_checker, root_node)
@pytest.mark.parametrize(
@@ -133,10 +130,8 @@ def test_enforce_config_flow_no_polling_bad(
) -> None:
"""Bad test cases."""
root_node = astroid.parse(code, module_name)
walker = ASTWalker(linter)
walker.add_checker(enforce_config_flow_no_polling_checker)
walker.walk(root_node)
walk_checker(linter, enforce_config_flow_no_polling_checker, root_node)
messages = linter.release_messages()
assert len(messages) == 1
assert messages[0].msg_id == "home-assistant-config-flow-polling-field"
@@ -3,10 +3,9 @@
import astroid
from pylint.checkers import BaseChecker
from pylint.testutils.unittest_linter import UnittestLinter
from pylint.utils.ast_walker import ASTWalker
import pytest
from tests.pylint import assert_no_messages
from tests.pylint import assert_no_messages, walk_checker
@pytest.mark.parametrize(
@@ -64,11 +63,9 @@ def test_enforce_unique_id_no_ip(
) -> None:
"""Good test cases."""
root_node = astroid.parse(code, module_name)
walker = ASTWalker(linter)
walker.add_checker(enforce_config_entry_unique_id_no_ip_checker)
with assert_no_messages(linter):
walker.walk(root_node)
walk_checker(linter, enforce_config_entry_unique_id_no_ip_checker, root_node)
@pytest.mark.parametrize(
@@ -121,10 +118,8 @@ def test_enforce_unique_id_no_ip_bad_call(
) -> None:
"""Bad async_set_unique_id call test cases."""
root_node = astroid.parse(code, module_name)
walker = ASTWalker(linter)
walker.add_checker(enforce_config_entry_unique_id_no_ip_checker)
walker.walk(root_node)
walk_checker(linter, enforce_config_entry_unique_id_no_ip_checker, root_node)
messages = linter.release_messages()
assert len(messages) == 1
assert messages[0].msg_id == "home-assistant-unique-id-ip-based"
@@ -182,10 +177,8 @@ def test_enforce_unique_id_no_ip_bad_call_variable(
) -> None:
"""Bad async_set_unique_id call test cases."""
root_node = astroid.parse(code, module_name)
walker = ASTWalker(linter)
walker.add_checker(enforce_config_entry_unique_id_no_ip_checker)
walker.walk(root_node)
walk_checker(linter, enforce_config_entry_unique_id_no_ip_checker, root_node)
messages = linter.release_messages()
assert len(messages) == 1
assert messages[0].msg_id == "home-assistant-unique-id-ip-based"
@@ -4,7 +4,6 @@ from pathlib import Path
import astroid
from pylint.testutils import MessageTest, UnittestLinter
from pylint.utils.ast_walker import ASTWalker
from pylint_home_assistant.checkers.quality_scale.config_entry_unloading import (
ConfigEntryUnloadingChecker,
)
@@ -12,7 +11,7 @@ from pylint_home_assistant.helpers.quality_scale import clear_quality_scale_cach
import pytest
import yaml
from tests.pylint import assert_adds_messages, assert_no_messages
from tests.pylint import assert_adds_messages, assert_no_messages, walk_checker
@pytest.fixture(name="unloading_checker")
@@ -56,11 +55,8 @@ async def async_unload_entry(hass, entry):
)
root_node.file = str(integration_dir / "__init__.py")
walker = ASTWalker(linter)
walker.add_checker(unloading_checker)
with assert_no_messages(linter):
walker.walk(root_node)
walk_checker(linter, unloading_checker, root_node)
def test_unload_entry_missing_fires(
@@ -81,9 +77,6 @@ async def async_setup_entry(hass, entry):
)
root_node.file = str(integration_dir / "__init__.py")
walker = ASTWalker(linter)
walker.add_checker(unloading_checker)
with assert_adds_messages(
linter,
MessageTest(
@@ -93,7 +86,7 @@ async def async_setup_entry(hass, entry):
col_offset=0,
),
):
walker.walk(root_node)
walk_checker(linter, unloading_checker, root_node)
@pytest.mark.parametrize(
@@ -146,8 +139,5 @@ async def async_setup_entry(hass, entry):
)
root_node.file = str(integration_dir / "__init__.py")
walker = ASTWalker(linter)
walker.add_checker(unloading_checker)
with assert_no_messages(linter):
walker.walk(root_node)
walk_checker(linter, unloading_checker, root_node)
+4 -14
View File
@@ -4,13 +4,12 @@ from pathlib import Path
import astroid
from pylint.testutils import MessageTest, UnittestLinter
from pylint.utils.ast_walker import ASTWalker
from pylint_home_assistant.checkers.quality_scale.diagnostics import DiagnosticsChecker
from pylint_home_assistant.helpers.quality_scale import clear_quality_scale_cache
import pytest
import yaml
from tests.pylint import assert_adds_messages, assert_no_messages
from tests.pylint import assert_adds_messages, assert_no_messages, walk_checker
@pytest.fixture(name="diagnostics_checker")
@@ -75,11 +74,8 @@ def test_diagnostics_present(
root_node = astroid.parse(code, "homeassistant.components.test_int.diagnostics")
root_node.file = str(integration_dir / "diagnostics.py")
walker = ASTWalker(linter)
walker.add_checker(diagnostics_checker)
with assert_no_messages(linter):
walker.walk(root_node)
walk_checker(linter, diagnostics_checker, root_node)
def test_diagnostics_missing_fires(
@@ -100,9 +96,6 @@ async def async_setup(hass, config):
)
root_node.file = str(integration_dir / "diagnostics.py")
walker = ASTWalker(linter)
walker.add_checker(diagnostics_checker)
with assert_adds_messages(
linter,
MessageTest(
@@ -112,7 +105,7 @@ async def async_setup(hass, config):
col_offset=0,
),
):
walker.walk(root_node)
walk_checker(linter, diagnostics_checker, root_node)
@pytest.mark.parametrize(
@@ -165,8 +158,5 @@ async def async_setup(hass, config):
)
root_node.file = str(integration_dir / "diagnostics.py")
walker = ASTWalker(linter)
walker.add_checker(diagnostics_checker)
with assert_no_messages(linter):
walker.walk(root_node)
walk_checker(linter, diagnostics_checker, root_node)
@@ -6,7 +6,6 @@ from pathlib import Path
import astroid
from astroid import nodes
from pylint.testutils import MessageTest, UnittestLinter
from pylint.utils.ast_walker import ASTWalker
from pylint_home_assistant.checkers.quality_scale.entity_unique_id import (
EntityUniqueIdChecker,
)
@@ -15,7 +14,7 @@ from pylint_home_assistant.helpers.quality_scale import clear_quality_scale_cach
import pytest
import yaml
from tests.pylint import assert_adds_messages, assert_no_messages
from tests.pylint import assert_adds_messages, assert_no_messages, walk_checker
@pytest.fixture(name="entity_unique_id_checker")
@@ -258,10 +257,8 @@ def test_handled(
_create_quality_scale(integration_dir, {"entity-unique-id": "done"})
root_node = _parse(code, integration_dir)
walker = ASTWalker(linter)
walker.add_checker(entity_unique_id_checker)
with assert_no_messages(linter):
walker.walk(root_node)
walk_checker(linter, entity_unique_id_checker, root_node)
@pytest.mark.parametrize(
@@ -351,10 +348,8 @@ def test_ancestor_satisfies_rule(
astroid.parse(ancestor_code, "homeassistant.components.test_integration.eui_entity")
root_node = _parse(sensor_code, integration_dir)
walker = ASTWalker(linter)
walker.add_checker(entity_unique_id_checker)
with assert_no_messages(linter):
walker.walk(root_node)
walk_checker(linter, entity_unique_id_checker, root_node)
def test_missing_fires(
@@ -377,10 +372,8 @@ class MySensor(Entity):
)
class_node = next(root_node.nodes_of_class(nodes.ClassDef))
walker = ASTWalker(linter)
walker.add_checker(entity_unique_id_checker)
with assert_adds_messages(linter, _expect_missing(class_node)):
walker.walk(root_node)
walk_checker(linter, entity_unique_id_checker, root_node)
@pytest.mark.parametrize(
@@ -444,10 +437,8 @@ def test_conditional_self_assignment_fires(
for cls in root_node.nodes_of_class(nodes.ClassDef)
if cls.name == class_name
)
walker = ASTWalker(linter)
walker.add_checker(entity_unique_id_checker)
with assert_adds_messages(linter, _expect_missing(class_node)):
walker.walk(root_node)
walk_checker(linter, entity_unique_id_checker, root_node)
def test_explicit_none_class_body_fires(
@@ -470,10 +461,8 @@ class MySensor(Entity):
)
class_node = next(root_node.nodes_of_class(nodes.ClassDef))
walker = ASTWalker(linter)
walker.add_checker(entity_unique_id_checker)
with assert_adds_messages(linter, _expect_missing(class_node)):
walker.walk(root_node)
walk_checker(linter, entity_unique_id_checker, root_node)
def test_subclass_nullifies_ancestor_value(
@@ -510,10 +499,8 @@ class MySensor(TestIntegrationBaseEntity):
for cls in root_node.nodes_of_class(nodes.ClassDef)
if cls.name == "MySensor"
)
walker = ASTWalker(linter)
walker.add_checker(entity_unique_id_checker)
with assert_adds_messages(linter, _expect_missing(class_node)):
walker.walk(root_node)
walk_checker(linter, entity_unique_id_checker, root_node)
def test_explicit_none_self_assign_fires(
@@ -537,10 +524,8 @@ class MySensor(Entity):
)
class_node = next(root_node.nodes_of_class(nodes.ClassDef))
walker = ASTWalker(linter)
walker.add_checker(entity_unique_id_checker)
with assert_adds_messages(linter, _expect_missing(class_node)):
walker.walk(root_node)
walk_checker(linter, entity_unique_id_checker, root_node)
def test_bare_annotation_only_fires(
@@ -563,10 +548,8 @@ class MySensor(Entity):
)
class_node = next(root_node.nodes_of_class(nodes.ClassDef))
walker = ASTWalker(linter)
walker.add_checker(entity_unique_id_checker)
with assert_adds_messages(linter, _expect_missing(class_node)):
walker.walk(root_node)
walk_checker(linter, entity_unique_id_checker, root_node)
def test_entity_default_does_not_satisfy(
@@ -596,10 +579,8 @@ class MySensor(Entity):
)
class_node = next(root_node.nodes_of_class(nodes.ClassDef))
walker = ASTWalker(linter)
walker.add_checker(entity_unique_id_checker)
with assert_adds_messages(linter, _expect_missing(class_node)):
walker.walk(root_node)
walk_checker(linter, entity_unique_id_checker, root_node)
@pytest.mark.parametrize(
@@ -638,10 +619,8 @@ def test_class_not_subject_to_rule(
root_node = _parse(code, integration_dir)
walker = ASTWalker(linter)
walker.add_checker(entity_unique_id_checker)
with assert_no_messages(linter):
walker.walk(root_node)
walk_checker(linter, entity_unique_id_checker, root_node)
def test_leaf_class_still_fires(
@@ -671,10 +650,8 @@ class SomethingUnrelated:
for cls in root_node.nodes_of_class(nodes.ClassDef)
if cls.name == "LonelySensor"
)
walker = ASTWalker(linter)
walker.add_checker(entity_unique_id_checker)
with assert_adds_messages(linter, _expect_missing(class_node)):
walker.walk(root_node)
walk_checker(linter, entity_unique_id_checker, root_node)
def test_dict_status_done_fires(
@@ -700,10 +677,8 @@ class MySensor(Entity):
)
class_node = next(root_node.nodes_of_class(nodes.ClassDef))
walker = ASTWalker(linter)
walker.add_checker(entity_unique_id_checker)
with assert_adds_messages(linter, _expect_missing(class_node)):
walker.walk(root_node)
walk_checker(linter, entity_unique_id_checker, root_node)
@pytest.mark.parametrize(
@@ -773,10 +748,8 @@ class MySensor(Entity):
"""
root_node = _parse(code, integration_dir, module_name, file_name)
walker = ASTWalker(linter)
walker.add_checker(entity_unique_id_checker)
with assert_no_messages(linter):
walker.walk(root_node)
walk_checker(linter, entity_unique_id_checker, root_node)
def _find_attr_value_node(
@@ -848,10 +821,8 @@ def test_static_class_body_string_in_multi_entry_fires(
root_node = _parse(code, integration_dir)
value_node = _find_attr_value_node(root_node)
walker = ASTWalker(linter)
walker.add_checker(entity_unique_id_checker)
with assert_adds_messages(linter, _expect_static(value_node, "MySensor")):
walker.walk(root_node)
walk_checker(linter, entity_unique_id_checker, root_node)
def test_static_class_body_string_no_manifest_fires(
@@ -875,10 +846,8 @@ class MySensor(Entity):
)
value_node = _find_attr_value_node(root_node)
walker = ASTWalker(linter)
walker.add_checker(entity_unique_id_checker)
with assert_adds_messages(linter, _expect_static(value_node, "MySensor")):
walker.walk(root_node)
walk_checker(linter, entity_unique_id_checker, root_node)
_STATIC_CLASS_BODY_STRING = """
@@ -946,7 +915,5 @@ def test_static_rule_does_not_warn(
_create_quality_scale(integration_dir, {"entity-unique-id": rule_status})
root_node = _parse(code, integration_dir)
walker = ASTWalker(linter)
walker.add_checker(entity_unique_id_checker)
with assert_no_messages(linter):
walker.walk(root_node)
walk_checker(linter, entity_unique_id_checker, root_node)
@@ -5,7 +5,6 @@ from pathlib import Path
import astroid
from astroid import nodes
from pylint.testutils import MessageTest, UnittestLinter
from pylint.utils.ast_walker import ASTWalker
from pylint_home_assistant.checkers.quality_scale.has_entity_name import (
HasEntityNameChecker,
)
@@ -13,7 +12,7 @@ from pylint_home_assistant.helpers.quality_scale import clear_quality_scale_cach
import pytest
import yaml
from tests.pylint import assert_adds_messages, assert_no_messages
from tests.pylint import assert_adds_messages, assert_no_messages, walk_checker
@pytest.fixture(name="has_entity_name_checker")
@@ -145,10 +144,8 @@ def test_handled(
_create_quality_scale(integration_dir, {"has-entity-name": "done"})
root_node = _parse(code, integration_dir)
walker = ASTWalker(linter)
walker.add_checker(has_entity_name_checker)
with assert_no_messages(linter):
walker.walk(root_node)
walk_checker(linter, has_entity_name_checker, root_node)
def test_ancestor_class_level(
@@ -180,10 +177,8 @@ class MySensor(TestIntegrationBaseEntity):
integration_dir,
)
walker = ASTWalker(linter)
walker.add_checker(has_entity_name_checker)
with assert_no_messages(linter):
walker.walk(root_node)
walk_checker(linter, has_entity_name_checker, root_node)
def test_ancestor_self_assign(
@@ -216,10 +211,8 @@ class MySensor(TestIntegrationBaseEntity):
integration_dir,
)
walker = ASTWalker(linter)
walker.add_checker(has_entity_name_checker)
with assert_no_messages(linter):
walker.walk(root_node)
walk_checker(linter, has_entity_name_checker, root_node)
def test_missing_fires(
@@ -242,10 +235,8 @@ class MySensor(Entity):
)
class_node = next(root_node.nodes_of_class(nodes.ClassDef))
walker = ASTWalker(linter)
walker.add_checker(has_entity_name_checker)
with assert_adds_messages(linter, _expect_missing(class_node)):
walker.walk(root_node)
walk_checker(linter, has_entity_name_checker, root_node)
@pytest.mark.parametrize(
@@ -333,10 +324,8 @@ def test_conditional_self_assignment_fires(
for cls in root_node.nodes_of_class(nodes.ClassDef)
if cls.name == class_name
)
walker = ASTWalker(linter)
walker.add_checker(has_entity_name_checker)
with assert_adds_messages(linter, _expect_missing(class_node)):
walker.walk(root_node)
walk_checker(linter, has_entity_name_checker, root_node)
def test_explicit_false_fires(
@@ -359,10 +348,8 @@ class MySensor(Entity):
)
class_node = next(root_node.nodes_of_class(nodes.ClassDef))
walker = ASTWalker(linter)
walker.add_checker(has_entity_name_checker)
with assert_adds_messages(linter, _expect_missing(class_node)):
walker.walk(root_node)
walk_checker(linter, has_entity_name_checker, root_node)
def test_generic_subscript_base_sets_flag(
@@ -394,10 +381,8 @@ class MySensor(TestIntegrationGenericBase[int]):
integration_dir,
)
walker = ASTWalker(linter)
walker.add_checker(has_entity_name_checker)
with assert_no_messages(linter):
walker.walk(root_node)
walk_checker(linter, has_entity_name_checker, root_node)
def test_two_level_subscript_chain(
@@ -438,10 +423,8 @@ class MyLight(TestIntegrationLightBase):
file_name="light.py",
)
walker = ASTWalker(linter)
walker.add_checker(has_entity_name_checker)
with assert_no_messages(linter):
walker.walk(root_node)
walk_checker(linter, has_entity_name_checker, root_node)
def test_entity_description_fallback(
@@ -471,10 +454,8 @@ class MyEntity(Entity):
integration_dir,
)
walker = ASTWalker(linter)
walker.add_checker(has_entity_name_checker)
with assert_no_messages(linter):
walker.walk(root_node)
walk_checker(linter, has_entity_name_checker, root_node)
def test_entity_description_subscripted_annotation(
@@ -504,10 +485,8 @@ class MyEntity[T](Entity):
integration_dir,
)
walker = ASTWalker(linter)
walker.add_checker(has_entity_name_checker)
with assert_no_messages(linter):
walker.walk(root_node)
walk_checker(linter, has_entity_name_checker, root_node)
def test_entity_description_without_flag_still_fires(
@@ -542,10 +521,8 @@ class MyEntity(Entity):
for cls in root_node.nodes_of_class(nodes.ClassDef)
if cls.name == "MyEntity"
)
walker = ASTWalker(linter)
walker.add_checker(has_entity_name_checker)
with assert_adds_messages(linter, _expect_missing(class_node)):
walker.walk(root_node)
walk_checker(linter, has_entity_name_checker, root_node)
def test_entity_description_set_in_ancestor(
@@ -585,10 +562,8 @@ class MySensor(TestIntegrationBaseEntity):
integration_dir,
)
walker = ASTWalker(linter)
walker.add_checker(has_entity_name_checker)
with assert_no_messages(linter):
walker.walk(root_node)
walk_checker(linter, has_entity_name_checker, root_node)
def test_mixin_subclassed_in_same_module_ignored(
@@ -613,10 +588,8 @@ class ActualEntity(MyClimateMixin):
integration_dir,
)
walker = ASTWalker(linter)
walker.add_checker(has_entity_name_checker)
with assert_no_messages(linter):
walker.walk(root_node)
walk_checker(linter, has_entity_name_checker, root_node)
def test_subclassed_via_subscript_ignored(
@@ -641,10 +614,8 @@ class ConcreteEntity(GenericBase[int]):
integration_dir,
)
walker = ASTWalker(linter)
walker.add_checker(has_entity_name_checker)
with assert_no_messages(linter):
walker.walk(root_node)
walk_checker(linter, has_entity_name_checker, root_node)
def test_leaf_class_still_fires(
@@ -674,10 +645,8 @@ class SomethingUnrelated:
for cls in root_node.nodes_of_class(nodes.ClassDef)
if cls.name == "LonelySensor"
)
walker = ASTWalker(linter)
walker.add_checker(has_entity_name_checker)
with assert_adds_messages(linter, _expect_missing(class_node)):
walker.walk(root_node)
walk_checker(linter, has_entity_name_checker, root_node)
def test_non_entity_class_ignored(
@@ -697,10 +666,8 @@ class NotAnEntity:
integration_dir,
)
walker = ASTWalker(linter)
walker.add_checker(has_entity_name_checker)
with assert_no_messages(linter):
walker.walk(root_node)
walk_checker(linter, has_entity_name_checker, root_node)
def test_dict_status_done_fires(
@@ -726,10 +693,8 @@ class MySensor(Entity):
)
class_node = next(root_node.nodes_of_class(nodes.ClassDef))
walker = ASTWalker(linter)
walker.add_checker(has_entity_name_checker)
with assert_adds_messages(linter, _expect_missing(class_node)):
walker.walk(root_node)
walk_checker(linter, has_entity_name_checker, root_node)
@pytest.mark.parametrize(
@@ -799,7 +764,5 @@ class MySensor(Entity):
"""
root_node = _parse(code, integration_dir, module_name, file_name)
walker = ASTWalker(linter)
walker.add_checker(has_entity_name_checker)
with assert_no_messages(linter):
walker.walk(root_node)
walk_checker(linter, has_entity_name_checker, root_node)
@@ -4,7 +4,6 @@ from pathlib import Path
import astroid
from pylint.testutils import MessageTest, UnittestLinter
from pylint.utils.ast_walker import ASTWalker
from pylint_home_assistant.checkers.quality_scale.parallel_updates import (
ParallelUpdatesChecker,
)
@@ -12,7 +11,7 @@ from pylint_home_assistant.helpers.quality_scale import clear_quality_scale_cach
import pytest
import yaml
from tests.pylint import assert_adds_messages, assert_no_messages
from tests.pylint import assert_adds_messages, assert_no_messages, walk_checker
@pytest.fixture(name="parallel_updates_checker")
@@ -61,11 +60,8 @@ def test_parallel_updates_present(
root_node = astroid.parse("PARALLEL_UPDATES = 1\n", module_name)
root_node.file = str(integration_dir / "sensor.py")
walker = ASTWalker(linter)
walker.add_checker(parallel_updates_checker)
with assert_no_messages(linter):
walker.walk(root_node)
walk_checker(linter, parallel_updates_checker, root_node)
def test_parallel_updates_zero(
@@ -82,11 +78,8 @@ def test_parallel_updates_zero(
)
root_node.file = str(integration_dir / "sensor.py")
walker = ASTWalker(linter)
walker.add_checker(parallel_updates_checker)
with assert_no_messages(linter):
walker.walk(root_node)
walk_checker(linter, parallel_updates_checker, root_node)
def test_parallel_updates_annotated_assignment(
@@ -104,11 +97,8 @@ def test_parallel_updates_annotated_assignment(
)
root_node.file = str(integration_dir / "sensor.py")
walker = ASTWalker(linter)
walker.add_checker(parallel_updates_checker)
with assert_no_messages(linter):
walker.walk(root_node)
walk_checker(linter, parallel_updates_checker, root_node)
def test_parallel_updates_missing_fires(
@@ -126,9 +116,6 @@ def test_parallel_updates_missing_fires(
)
root_node.file = str(integration_dir / "sensor.py")
walker = ASTWalker(linter)
walker.add_checker(parallel_updates_checker)
with assert_adds_messages(
linter,
MessageTest(
@@ -138,7 +125,7 @@ def test_parallel_updates_missing_fires(
col_offset=0,
),
):
walker.walk(root_node)
walk_checker(linter, parallel_updates_checker, root_node)
def test_parallel_updates_missing_status_done_dict(
@@ -159,9 +146,6 @@ def test_parallel_updates_missing_status_done_dict(
)
root_node.file = str(integration_dir / "sensor.py")
walker = ASTWalker(linter)
walker.add_checker(parallel_updates_checker)
with assert_adds_messages(
linter,
MessageTest(
@@ -171,7 +155,7 @@ def test_parallel_updates_missing_status_done_dict(
col_offset=0,
),
):
walker.walk(root_node)
walk_checker(linter, parallel_updates_checker, root_node)
@pytest.mark.parametrize(
@@ -231,8 +215,5 @@ def test_parallel_updates_not_fired(
)
root_node.file = str(integration_dir / "sensor.py")
walker = ASTWalker(linter)
walker.add_checker(parallel_updates_checker)
with assert_no_messages(linter):
walker.walk(root_node)
walk_checker(linter, parallel_updates_checker, root_node)
@@ -4,7 +4,6 @@ from pathlib import Path
import astroid
from pylint.testutils import MessageTest, UnittestLinter
from pylint.utils.ast_walker import ASTWalker
from pylint_home_assistant.checkers.quality_scale.reauthentication_flow import (
ReauthenticationFlowChecker,
)
@@ -12,7 +11,7 @@ from pylint_home_assistant.helpers.quality_scale import clear_quality_scale_cach
import pytest
import yaml
from tests.pylint import assert_adds_messages, assert_no_messages
from tests.pylint import assert_adds_messages, assert_no_messages, walk_checker
@pytest.fixture(name="reauth_checker")
@@ -54,11 +53,8 @@ class MyConfigFlow(ConfigFlow, domain=DOMAIN):
)
root_node.file = str(integration_dir / "config_flow.py")
walker = ASTWalker(linter)
walker.add_checker(reauth_checker)
with assert_no_messages(linter):
walker.walk(root_node)
walk_checker(linter, reauth_checker, root_node)
def test_reauth_missing_fires(
@@ -80,9 +76,6 @@ class MyConfigFlow(ConfigFlow, domain=DOMAIN):
)
root_node.file = str(integration_dir / "config_flow.py")
walker = ASTWalker(linter)
walker.add_checker(reauth_checker)
with assert_adds_messages(
linter,
MessageTest(
@@ -92,7 +85,7 @@ class MyConfigFlow(ConfigFlow, domain=DOMAIN):
col_offset=0,
),
):
walker.walk(root_node)
walk_checker(linter, reauth_checker, root_node)
@pytest.mark.parametrize(
@@ -146,8 +139,5 @@ class MyConfigFlow(ConfigFlow, domain=DOMAIN):
)
root_node.file = str(integration_dir / "config_flow.py")
walker = ASTWalker(linter)
walker.add_checker(reauth_checker)
with assert_no_messages(linter):
walker.walk(root_node)
walk_checker(linter, reauth_checker, root_node)
+7 -20
View File
@@ -5,10 +5,9 @@ from pylint.checkers import BaseChecker
from pylint.interfaces import UNDEFINED
from pylint.testutils import MessageTest
from pylint.testutils.unittest_linter import UnittestLinter
from pylint.utils.ast_walker import ASTWalker
import pytest
from . import assert_adds_messages, assert_no_messages
from . import assert_adds_messages, assert_no_messages, walk_checker
@pytest.mark.parametrize(
@@ -54,11 +53,9 @@ def test_enforce_class_module_good(
) -> None:
"""Good test cases."""
root_node = astroid.parse(code, path)
walker = ASTWalker(linter)
walker.add_checker(enforce_class_module_checker)
with assert_no_messages(linter):
walker.walk(root_node)
walk_checker(linter, enforce_class_module_checker, root_node)
@pytest.mark.parametrize(
@@ -90,11 +87,9 @@ def test_enforce_class_platform_good(
pass
"""
root_node = astroid.parse(code, path)
walker = ASTWalker(linter)
walker.add_checker(enforce_class_module_checker)
with assert_no_messages(linter):
walker.walk(root_node)
walk_checker(linter, enforce_class_module_checker, root_node)
@pytest.mark.parametrize(
@@ -128,8 +123,6 @@ def test_enforce_class_module_bad_simple(
""",
path,
)
walker = ASTWalker(linter)
walker.add_checker(enforce_class_module_checker)
with assert_adds_messages(
linter,
@@ -154,7 +147,7 @@ def test_enforce_class_module_bad_simple(
end_col_offset=35,
),
):
walker.walk(root_node)
walk_checker(linter, enforce_class_module_checker, root_node)
@pytest.mark.parametrize(
@@ -185,8 +178,6 @@ def test_enforce_class_module_bad_nested(
""",
path,
)
walker = ASTWalker(linter)
walker.add_checker(enforce_class_module_checker)
with assert_adds_messages(
linter,
@@ -211,7 +202,7 @@ def test_enforce_class_module_bad_nested(
end_col_offset=21,
),
):
walker.walk(root_node)
walk_checker(linter, enforce_class_module_checker, root_node)
@pytest.mark.parametrize(
@@ -236,11 +227,9 @@ def test_enforce_entity_good(
pass
"""
root_node = astroid.parse(code, path)
walker = ASTWalker(linter)
walker.add_checker(enforce_class_module_checker)
with assert_no_messages(linter):
walker.walk(root_node)
walk_checker(linter, enforce_class_module_checker, root_node)
@pytest.mark.parametrize(
@@ -265,8 +254,6 @@ def test_enforce_entity_bad(
pass
"""
root_node = astroid.parse(code, path)
walker = ASTWalker(linter)
walker.add_checker(enforce_class_module_checker)
with assert_adds_messages(
linter,
@@ -281,4 +268,4 @@ def test_enforce_entity_bad(
end_col_offset=18,
),
):
walker.walk(root_node)
walk_checker(linter, enforce_class_module_checker, root_node)
+7 -20
View File
@@ -5,10 +5,9 @@ from pylint.checkers import BaseChecker
from pylint.interfaces import UNDEFINED
from pylint.testutils import MessageTest
from pylint.testutils.unittest_linter import UnittestLinter
from pylint.utils.ast_walker import ASTWalker
import pytest
from . import assert_adds_messages, assert_no_messages
from . import assert_adds_messages, assert_no_messages, walk_checker
def test_good_callback(linter: UnittestLinter, decorator_checker: BaseChecker) -> None:
@@ -24,11 +23,9 @@ def test_good_callback(linter: UnittestLinter, decorator_checker: BaseChecker) -
"""
root_node = astroid.parse(code)
walker = ASTWalker(linter)
walker.add_checker(decorator_checker)
with assert_no_messages(linter):
walker.walk(root_node)
walk_checker(linter, decorator_checker, root_node)
def test_bad_callback(linter: UnittestLinter, decorator_checker: BaseChecker) -> None:
@@ -44,8 +41,6 @@ def test_bad_callback(linter: UnittestLinter, decorator_checker: BaseChecker) ->
"""
root_node = astroid.parse(code)
walker = ASTWalker(linter)
walker.add_checker(decorator_checker)
with assert_adds_messages(
linter,
@@ -60,7 +55,7 @@ def test_bad_callback(linter: UnittestLinter, decorator_checker: BaseChecker) ->
end_col_offset=15,
),
):
walker.walk(root_node)
walk_checker(linter, decorator_checker, root_node)
@pytest.mark.parametrize(
@@ -110,11 +105,9 @@ def test_good_fixture(
"""
root_node = astroid.parse(code, path)
walker = ASTWalker(linter)
walker.add_checker(decorator_checker)
with assert_no_messages(linter):
walker.walk(root_node)
walk_checker(linter, decorator_checker, root_node)
@pytest.mark.parametrize(
@@ -146,8 +139,6 @@ def test_bad_fixture_session_scope(
"""
root_node = astroid.parse(code, path)
walker = ASTWalker(linter)
walker.add_checker(decorator_checker)
with assert_adds_messages(
linter,
@@ -162,7 +153,7 @@ def test_bad_fixture_session_scope(
end_col_offset=32,
),
):
walker.walk(root_node)
walk_checker(linter, decorator_checker, root_node)
@pytest.mark.parametrize(
@@ -193,8 +184,6 @@ def test_bad_fixture_package_scope(
"""
root_node = astroid.parse(code, path)
walker = ASTWalker(linter)
walker.add_checker(decorator_checker)
with assert_adds_messages(
linter,
@@ -209,7 +198,7 @@ def test_bad_fixture_package_scope(
end_col_offset=32,
),
):
walker.walk(root_node)
walk_checker(linter, decorator_checker, root_node)
@pytest.mark.parametrize(
@@ -247,8 +236,6 @@ def test_bad_fixture_autouse(
"""
root_node = astroid.parse(code, path)
walker = ASTWalker(linter)
walker.add_checker(decorator_checker)
with assert_adds_messages(
linter,
@@ -263,4 +250,4 @@ def test_bad_fixture_autouse(
end_col_offset=17 + len(keywords),
),
):
walker.walk(root_node)
walk_checker(linter, decorator_checker, root_node)

Some files were not shown because too many files have changed in this diff Show More