Compare commits

...

4 Commits

Author SHA1 Message Date
Franck Nijhof 5fa4e09127 Merge branch 'dev' into frenck-2026-0609 2026-05-21 17:19:54 +02:00
Franck Nijhof cf2ee1b32f Address copilot feedback 2026-05-21 14:36:02 +00:00
Franck Nijhof 93f60c5594 Address copilot feedback 2026-05-21 12:02:25 +00:00
Franck Nijhof a190117a9a Add pylint checker for missing flow form translations 2026-05-21 11:18:18 +00:00
19 changed files with 1188 additions and 0 deletions
@@ -173,6 +173,7 @@ class BlinkConfigFlow(ConfigFlow, domain=DOMAIN):
errors["base"] = "unknown"
config_entry = self._get_reconfigure_entry()
# pylint: disable-next=home-assistant-config-flow-field-not-translated
return self.async_show_form(
step_id="reconfigure",
data_schema=vol.Schema(
@@ -31,6 +31,7 @@ class DownloaderConfigFlow(ConfigFlow, domain=DOMAIN):
else:
return self.async_create_entry(title=DEFAULT_NAME, data=user_input)
# pylint: disable-next=home-assistant-config-flow-field-not-translated
return self.async_show_form(
step_id="user",
data_schema=vol.Schema(
@@ -52,4 +52,5 @@ class EGPSConfigFlow(ConfigFlow, domain=DOMAIN):
else:
return self.async_abort(reason="no_device")
# pylint: disable-next=home-assistant-config-flow-field-not-translated
return self.async_show_form(step_id="user", data_schema=vol.Schema(data_schema))
@@ -277,6 +277,7 @@ class FluxLedConfigFlow(ConfigFlow, domain=DOMAIN):
# Check if there is at least one device
if not devices_name:
return self.async_abort(reason="no_devices_found")
# pylint: disable-next=home-assistant-config-flow-field-not-translated
return self.async_show_form(
step_id="pick_device",
data_schema=vol.Schema({vol.Required(CONF_DEVICE): vol.In(devices_name)}),
@@ -125,6 +125,7 @@ class Gogogate2FlowHandler(ConfigFlow, domain=DOMAIN):
CONF_DEVICE: DEVICE_NAMES[self._device_type],
CONF_IP_ADDRESS: self._ip_address,
}
# pylint: disable-next=home-assistant-config-flow-field-not-translated
return self.async_show_form(
step_id="user",
data_schema=vol.Schema(
@@ -140,6 +140,7 @@ class HueFlowHandler(ConfigFlow, domain=DOMAIN):
if not self.discovered_bridges:
return await self.async_step_manual()
# pylint: disable-next=home-assistant-config-flow-field-not-translated
return self.async_show_form(
step_id="init",
data_schema=vol.Schema(
@@ -75,6 +75,7 @@ class IslamicPrayerFlowHandler(ConfigFlow, domain=DOMAIN):
CONF_LATITUDE: self.hass.config.latitude,
CONF_LONGITUDE: self.hass.config.longitude,
}
# pylint: disable-next=home-assistant-config-flow-field-not-translated
return self.async_show_form(
step_id="user",
data_schema=vol.Schema(
@@ -100,6 +100,7 @@ class LaCrosseViewConfigFlow(ConfigFlow, domain=DOMAIN):
if not user_input:
_LOGGER.debug("Showing initial location selection")
# pylint: disable-next=home-assistant-config-flow-field-not-translated
return self.async_show_form(
step_id="location",
data_schema=vol.Schema(
@@ -75,6 +75,7 @@ class LookinFlowHandler(ConfigFlow, domain=DOMAIN):
data={CONF_HOST: host},
)
# pylint: disable-next=home-assistant-config-flow-field-not-translated
return self.async_show_form(
step_id="user",
data_schema=vol.Schema({vol.Required(CONF_HOST): str}),
@@ -98,6 +98,7 @@ class MelnorConfigFlow(ConfigFlow, domain=DOMAIN):
if not addresses:
return self.async_abort(reason="no_devices_found")
# pylint: disable-next=home-assistant-config-flow-field-not-translated
return self.async_show_form(
step_id="pick_device",
data_schema=vol.Schema({vol.Required(CONF_ADDRESS): vol.In(addresses)}),
@@ -24,6 +24,7 @@ class MeteoclimaticFlowHandler(ConfigFlow, domain=DOMAIN):
if user_input is None:
user_input = {}
# pylint: disable-next=home-assistant-config-flow-field-not-translated
return self.async_show_form(
step_id="user",
data_schema=vol.Schema(
@@ -117,6 +117,7 @@ class FlowHandler(ConfigFlow, domain=DOMAIN):
},
)
# pylint: disable-next=home-assistant-config-flow-field-not-translated
return self.async_show_form(
step_id="confirm",
data_schema=vol.Schema(
@@ -206,6 +206,7 @@ class PlaatoOptionsFlowHandler(OptionsFlow):
if user_input is not None:
return self.async_create_entry(title="", data=user_input)
# pylint: disable-next=home-assistant-options-flow-field-not-translated
return self.async_show_form(
step_id="user",
data_schema=vol.Schema(
@@ -64,6 +64,7 @@ class SimpleFinConfigFlow(ConfigFlow, domain=DOMAIN):
data={CONF_ACCESS_URL: user_input[CONF_ACCESS_URL]},
)
# pylint: disable-next=home-assistant-config-flow-field-not-translated
return self.async_show_form(
step_id="user",
data_schema=vol.Schema(
@@ -66,6 +66,7 @@ class SmhiFlowHandler(ConfigFlow, domain=DOMAIN):
CONF_LATITUDE: self.hass.config.latitude,
CONF_LONGITUDE: self.hass.config.longitude,
}
# pylint: disable-next=home-assistant-config-flow-field-not-translated
return self.async_show_form(
step_id="user",
data_schema=vol.Schema(
@@ -110,6 +110,7 @@ class SnoozConfigFlow(ConfigFlow, domain=DOMAIN):
if not self._discovered_devices:
return self.async_abort(reason="no_devices_found")
# pylint: disable-next=home-assistant-config-flow-field-not-translated
return self.async_show_form(
step_id="user",
data_schema=vol.Schema(
@@ -75,6 +75,7 @@ class TuyaConfigFlow(ConfigFlow, domain=DOMAIN):
) -> ConfigFlowResult:
"""Step scan."""
if user_input is None:
# pylint: disable-next=home-assistant-config-flow-field-not-translated
return self.async_show_form(
step_id="scan",
data_schema=vol.Schema(
@@ -99,6 +100,7 @@ class TuyaConfigFlow(ConfigFlow, domain=DOMAIN):
if not ret:
# Try to get a new QR code on failure
await self.__async_get_qr_code(self.__user_code)
# pylint: disable-next=home-assistant-config-flow-field-not-translated
return self.async_show_form(
step_id="scan",
errors={"base": "login_error"},
@@ -0,0 +1,439 @@
"""Checker for missing config/options/subentry flow form translations.
When a flow calls ``async_show_form`` with a ``data_schema``, every
field in the schema needs a corresponding translation entry in
``strings.json``.
The expected translation paths are::
config.step.{step_id}.data.{field_name}
config.step.{step_id}.sections.{section_key}.data.{field_name}
options.step.{step_id}.data.{field_name}
options.step.{step_id}.sections.{section_key}.data.{field_name}
config_subentries.{subentry_type}.step.{step_id}.data.{field_name}
- ``W7420``: Missing config flow field translation
- ``W7421``: Missing options flow field translation
- ``W7422``: Missing subentry flow field translation
"""
import astroid
from astroid import nodes
from pylint.checkers import BaseChecker
from pylint.lint import PyLinter
from pylint_home_assistant.helpers.module_info import get_module_platform
from pylint_home_assistant.helpers.translations import load_translations
_InferenceError = astroid.exceptions.InferenceError
def _extract_step_id(call: nodes.Call) -> str | None:
"""Extract step_id from an async_show_form call.
Falls back to the enclosing method name if step_id is omitted
(e.g., ``async_step_user`` -> ``"user"``).
"""
for kw in call.keywords:
if kw.arg == "step_id":
if isinstance(kw.value, nodes.Const) and isinstance(kw.value.value, str):
return kw.value.value
try:
for inferred in kw.value.infer():
if isinstance(inferred, nodes.Const) and isinstance(
inferred.value, str
):
return str(inferred.value)
except _InferenceError:
pass
return None
# No step_id keyword: infer from enclosing async_step_* method
current = call.parent
while current is not None:
if isinstance(current, nodes.FunctionDef):
if current.name.startswith("async_step_"):
return str(current.name).removeprefix("async_step_")
break
current = current.parent
return None
# Result types for schema extraction
class _Field:
"""A regular form field."""
__slots__ = ("name",)
def __init__(self, name: str) -> None:
self.name = name
class _Section:
"""A section containing nested fields."""
__slots__ = ("fields", "key")
def __init__(self, key: str, fields: list[str]) -> None:
self.key = key
self.fields = fields
def _extract_schema_items(call: nodes.Call) -> list[_Field | _Section]:
"""Extract fields and sections from the data_schema keyword argument."""
schema_node = None
for kw in call.keywords:
if kw.arg == "data_schema":
schema_node = kw.value
break
if schema_node is None:
return []
return _extract_items_from_node(schema_node)
def _extract_items_from_node(
node: nodes.NodeNG,
) -> list[_Field | _Section]:
"""Recursively extract fields and sections from a schema node."""
items: list[_Field | _Section] = []
# vol.Schema({...}) or similar call wrapping a dict
if isinstance(node, nodes.Call) and node.args:
return _extract_items_from_node(node.args[0])
# Direct dict literal
if isinstance(node, nodes.Dict):
for key, value in node.items:
if isinstance(key, nodes.DictUnpack):
try:
for inferred in value.infer():
if isinstance(inferred, nodes.Dict):
items.extend(_extract_items_from_node(inferred))
except _InferenceError:
pass
continue
name = _resolve_field_name(key)
if name is None:
continue
# Check if the value is a section(...) call
section_fields = _extract_section_fields(value)
if section_fields is not None:
items.append(_Section(name, section_fields))
else:
items.append(_Field(name))
return items
# Variable reference: try to infer to a Call or Dict
try:
for inferred in node.infer():
if isinstance(inferred, (nodes.Call, nodes.Dict)):
return _extract_items_from_node(inferred)
except _InferenceError:
pass
return items
def _extract_section_fields(node: nodes.NodeNG) -> list[str] | None:
"""Extract field names from a section(...) call.
Returns a list of field names if the node is a section() call,
or None if it's not a section.
"""
if not isinstance(node, nodes.Call):
return None
# Match section(...) or data_entry_flow.section(...)
if isinstance(node.func, nodes.Name):
if node.func.name != "section":
return None
elif isinstance(node.func, nodes.Attribute):
if node.func.attrname != "section":
return None
else:
return None
if not node.args:
return None
# section(vol.Schema({...}), ...) - first arg is the schema
inner_items = _extract_items_from_node(node.args[0])
return [item.name for item in inner_items if isinstance(item, _Field)]
def _resolve_field_name(node: nodes.NodeNG) -> str | None:
"""Resolve a schema key to a field name string."""
if isinstance(node, nodes.Call) and isinstance(node.func, nodes.Attribute):
if node.func.attrname in ("Required", "Optional") and node.args:
return _resolve_field_name(node.args[0])
if isinstance(node, nodes.Const) and isinstance(node.value, str):
return node.value
try:
for inferred in node.infer():
if isinstance(inferred, nodes.Const) and isinstance(inferred.value, str):
return inferred.value
except _InferenceError:
pass
return None
def _find_enclosing_class(node: nodes.Call) -> nodes.ClassDef | None:
"""Find the enclosing class definition for a call node."""
current = node.parent
while current is not None:
if isinstance(current, nodes.ClassDef):
return current
current = current.parent
return None
def _get_flow_type(class_node: nodes.ClassDef) -> str | None:
"""Determine flow type from a class definition."""
for base in class_node.bases:
match base:
case nodes.Name(name=name):
if "SubentryFlow" in name:
return "subentry"
if "OptionsFlow" in name:
return "options"
if "ConfigFlow" in name or "FlowHandler" in name:
return "config"
case nodes.Attribute(attrname=name):
if "SubentryFlow" in name:
return "subentry"
if "OptionsFlow" in name:
return "options"
if "ConfigFlow" in name or "FlowHandler" in name:
return "config"
try:
for ancestor in class_node.ancestors():
if "SubentryFlow" in ancestor.name:
return "subentry"
if "OptionsFlow" in ancestor.name:
return "options"
if "ConfigFlow" in ancestor.name:
return "config"
except _InferenceError:
pass
return None
def _resolve_subentry_types(module: nodes.Module, handler_class_name: str) -> list[str]:
"""Resolve subentry type names for a handler class."""
subentry_types: list[str] = []
for node in module.body:
if not isinstance(node, nodes.ClassDef):
continue
if _get_flow_type(node) != "config":
continue
for method in node.mymethods():
if method.name != "async_get_supported_subentry_types":
continue
for child in method.body:
if not isinstance(child, nodes.Return):
continue
if not isinstance(child.value, nodes.Dict):
continue
for key, value in child.value.items:
if isinstance(key, nodes.DictUnpack):
continue
key_str = _resolve_field_name(key)
if key_str and isinstance(value, nodes.Name):
if value.name == handler_class_name:
subentry_types.append(key_str)
return subentry_types
class ConfigFlowTranslationsChecker(BaseChecker):
"""Checker for missing flow form translations."""
name = "home_assistant_config_flow_translations"
priority = -1
msgs = {
"W7420": (
"Form field '%s' in step '%s' is missing a translation in "
"strings.json (expected at %s)",
"home-assistant-config-flow-field-not-translated",
"Used when a config flow form field does not have a "
"corresponding translation in strings.json.",
),
"W7421": (
"Form field '%s' in step '%s' is missing a translation in "
"strings.json (expected at %s)",
"home-assistant-options-flow-field-not-translated",
"Used when an options flow form field does not have a "
"corresponding translation in strings.json.",
),
"W7422": (
"Form field '%s' in step '%s' is missing a translation in "
"strings.json (expected at %s)",
"home-assistant-subentry-flow-field-not-translated",
"Used when a subentry flow form field does not have a "
"corresponding translation in strings.json.",
),
}
options = ()
_translations: dict | None
_is_config_flow: bool
_module_node: nodes.Module | None
def visit_module(self, node: nodes.Module) -> None:
"""Load translations for config_flow modules."""
platform = get_module_platform(node.name)
self._is_config_flow = platform == "config_flow"
self._translations = None
self._module_node = None
if self._is_config_flow:
self._translations = load_translations(node)
self._module_node = node
def visit_call(self, node: nodes.Call) -> None:
"""Check async_show_form calls for translated fields."""
if not self._is_config_flow or self._translations is None:
return
if not isinstance(node.func, nodes.Attribute):
return
if node.func.attrname != "async_show_form":
return
step_id = _extract_step_id(node)
if step_id is None:
return
schema_items = _extract_schema_items(node)
if not schema_items:
return
class_node = _find_enclosing_class(node)
if class_node is None:
return
flow_type = _get_flow_type(class_node) or "config"
if flow_type == "config":
self._check_flow(
node,
step_id,
schema_items,
"config",
"home-assistant-config-flow-field-not-translated",
)
elif flow_type == "options":
self._check_flow(
node,
step_id,
schema_items,
"options",
"home-assistant-options-flow-field-not-translated",
)
elif flow_type == "subentry":
self._check_subentry_flow(node, step_id, schema_items, class_node)
def _check_flow(
self,
node: nodes.Call,
step_id: str,
schema_items: list[_Field | _Section],
flow_key: str,
msg_id: str,
) -> None:
"""Check fields against translations for config/options flows."""
assert self._translations is not None
step_trans = (
self._translations.get(flow_key, {}).get("step", {}).get(step_id, {})
)
data_trans = step_trans.get("data", {})
sections_trans = step_trans.get("sections", {})
for item in schema_items:
if isinstance(item, _Field):
if item.name not in data_trans:
path = f"{flow_key}.step.{step_id}.data.{item.name}"
self.add_message(
msg_id,
node=node,
args=(item.name, step_id, path),
)
elif isinstance(item, _Section):
section_data = sections_trans.get(item.key, {}).get("data", {})
for field in item.fields:
if field not in section_data:
path = f"{flow_key}.step.{step_id}.sections.{item.key}.data.{field}"
self.add_message(
msg_id,
node=node,
args=(field, step_id, path),
)
def _check_subentry_flow(
self,
node: nodes.Call,
step_id: str,
schema_items: list[_Field | _Section],
class_node: nodes.ClassDef,
) -> None:
"""Check subentry flow fields against translations."""
if self._module_node is None:
return
subentry_types = _resolve_subentry_types(self._module_node, class_node.name)
if not subentry_types:
return
assert self._translations is not None
config_subentries = self._translations.get("config_subentries", {})
sub0 = subentry_types[0]
for item in schema_items:
if isinstance(item, _Section):
for field in item.fields:
if not any(
field
in config_subentries.get(st, {})
.get("step", {})
.get(step_id, {})
.get("sections", {})
.get(item.key, {})
.get("data", {})
for st in subentry_types
):
path = f"config_subentries.{sub0}.step.{step_id}.sections.{item.key}.data.{field}"
self.add_message(
"home-assistant-subentry-flow-field-not-translated",
node=node,
args=(field, step_id, path),
)
elif isinstance(item, _Field):
if not any(
item.name
in config_subentries.get(st, {})
.get("step", {})
.get(step_id, {})
.get("data", {})
for st in subentry_types
):
path = f"config_subentries.{sub0}.step.{step_id}.data.{item.name}"
self.add_message(
"home-assistant-subentry-flow-field-not-translated",
node=node,
args=(item.name, step_id, path),
)
def register(linter: PyLinter) -> None:
"""Register the checker."""
linter.register_checker(ConfigFlowTranslationsChecker(linter))
+731
View File
@@ -0,0 +1,731 @@
"""Tests for the config flow translations checker."""
import json
from pathlib import Path
import astroid
from pylint.testutils import UnittestLinter
from pylint.utils.ast_walker import ASTWalker
from pylint_home_assistant.checkers.flow_translations import (
ConfigFlowTranslationsChecker,
)
from pylint_home_assistant.helpers.translations import clear_translations_cache
import pytest
from . import assert_no_messages
@pytest.fixture(name="flow_translations_checker")
def flow_translations_checker_fixture(
linter: UnittestLinter,
) -> ConfigFlowTranslationsChecker:
"""Fixture to provide a config flow translations checker."""
clear_translations_cache()
return ConfigFlowTranslationsChecker(linter)
def _make_integration(tmp_path: Path, strings: dict | None = None) -> Path:
"""Create a fake integration with optional strings.json."""
integration_dir = tmp_path / "homeassistant" / "components" / "test_int"
integration_dir.mkdir(parents=True)
if strings is not None:
(integration_dir / "strings.json").write_text(json.dumps(strings))
return integration_dir
# --- Config flow tests ---
def test_config_flow_translated_ok(
linter: UnittestLinter,
flow_translations_checker: ConfigFlowTranslationsChecker,
tmp_path: Path,
) -> None:
"""No warning when all config flow fields have translations."""
integration_dir = _make_integration(
tmp_path,
{"config": {"step": {"user": {"data": {"host": "Host", "port": "Port"}}}}},
)
root_node = astroid.parse(
"""
class MyConfigFlow(ConfigFlow, domain=DOMAIN):
async def async_step_user(self, user_input=None):
return self.async_show_form(
step_id="user",
data_schema=vol.Schema({
vol.Required("host"): str,
vol.Optional("port"): int,
}),
)
""",
"homeassistant.components.test_int.config_flow",
)
root_node.file = str(integration_dir / "config_flow.py")
walker = ASTWalker(linter)
walker.add_checker(flow_translations_checker)
with assert_no_messages(linter):
walker.walk(root_node)
def test_config_flow_missing_field_flagged(
linter: UnittestLinter,
flow_translations_checker: ConfigFlowTranslationsChecker,
tmp_path: Path,
) -> None:
"""Warning when a config flow field is missing a translation."""
integration_dir = _make_integration(
tmp_path,
{"config": {"step": {"user": {"data": {"host": "Host"}}}}},
)
root_node = astroid.parse(
"""
class MyConfigFlow(ConfigFlow, domain=DOMAIN):
async def async_step_user(self, user_input=None):
return self.async_show_form(
step_id="user",
data_schema=vol.Schema({
vol.Required("host"): str,
vol.Required("missing_field"): str,
}),
)
""",
"homeassistant.components.test_int.config_flow",
)
root_node.file = str(integration_dir / "config_flow.py")
walker = ASTWalker(linter)
walker.add_checker(flow_translations_checker)
walker.walk(root_node)
messages = linter.release_messages()
assert len(messages) == 1
assert messages[0].msg_id == "home-assistant-config-flow-field-not-translated"
assert "missing_field" in messages[0].args[0]
# --- Options flow tests ---
def test_options_flow_translated_ok(
linter: UnittestLinter,
flow_translations_checker: ConfigFlowTranslationsChecker,
tmp_path: Path,
) -> None:
"""No warning when options flow fields have translations."""
integration_dir = _make_integration(
tmp_path,
{"options": {"step": {"init": {"data": {"interval": "Update interval"}}}}},
)
root_node = astroid.parse(
"""
class MyOptionsFlow(OptionsFlow):
async def async_step_init(self, user_input=None):
return self.async_show_form(
step_id="init",
data_schema=vol.Schema({
vol.Required("interval"): int,
}),
)
""",
"homeassistant.components.test_int.config_flow",
)
root_node.file = str(integration_dir / "config_flow.py")
walker = ASTWalker(linter)
walker.add_checker(flow_translations_checker)
with assert_no_messages(linter):
walker.walk(root_node)
def test_options_flow_missing_field_flagged(
linter: UnittestLinter,
flow_translations_checker: ConfigFlowTranslationsChecker,
tmp_path: Path,
) -> None:
"""Warning when an options flow field is missing a translation."""
integration_dir = _make_integration(
tmp_path,
{"options": {"step": {"init": {"data": {"interval": "Update interval"}}}}},
)
root_node = astroid.parse(
"""
class MyOptionsFlow(OptionsFlow):
async def async_step_init(self, user_input=None):
return self.async_show_form(
step_id="init",
data_schema=vol.Schema({
vol.Required("interval"): int,
vol.Required("missing"): str,
}),
)
""",
"homeassistant.components.test_int.config_flow",
)
root_node.file = str(integration_dir / "config_flow.py")
walker = ASTWalker(linter)
walker.add_checker(flow_translations_checker)
walker.walk(root_node)
messages = linter.release_messages()
assert len(messages) == 1
assert messages[0].msg_id == "home-assistant-options-flow-field-not-translated"
# --- Subentry flow tests ---
def test_subentry_flow_translated_ok(
linter: UnittestLinter,
flow_translations_checker: ConfigFlowTranslationsChecker,
tmp_path: Path,
) -> None:
"""No warning when subentry flow fields have translations."""
integration_dir = _make_integration(
tmp_path,
{
"config_subentries": {
"my_sub": {
"step": {"user": {"data": {"name": "Name"}}},
}
}
},
)
root_node = astroid.parse(
"""
class MyConfigFlow(ConfigFlow, domain=DOMAIN):
@classmethod
def async_get_supported_subentry_types(cls, config_entry):
return {"my_sub": MySubentryFlow}
class MySubentryFlow(ConfigSubentryFlow):
async def async_step_user(self, user_input=None):
return self.async_show_form(
step_id="user",
data_schema=vol.Schema({
vol.Required("name"): str,
}),
)
""",
"homeassistant.components.test_int.config_flow",
)
root_node.file = str(integration_dir / "config_flow.py")
walker = ASTWalker(linter)
walker.add_checker(flow_translations_checker)
with assert_no_messages(linter):
walker.walk(root_node)
def test_subentry_flow_missing_field_flagged(
linter: UnittestLinter,
flow_translations_checker: ConfigFlowTranslationsChecker,
tmp_path: Path,
) -> None:
"""Warning when a subentry flow field is missing a translation."""
integration_dir = _make_integration(
tmp_path,
{
"config_subentries": {
"my_sub": {
"step": {"user": {"data": {"name": "Name"}}},
}
}
},
)
root_node = astroid.parse(
"""
class MyConfigFlow(ConfigFlow, domain=DOMAIN):
@classmethod
def async_get_supported_subentry_types(cls, config_entry):
return {"my_sub": MySubentryFlow}
class MySubentryFlow(ConfigSubentryFlow):
async def async_step_user(self, user_input=None):
return self.async_show_form(
step_id="user",
data_schema=vol.Schema({
vol.Required("name"): str,
vol.Required("missing"): str,
}),
)
""",
"homeassistant.components.test_int.config_flow",
)
root_node.file = str(integration_dir / "config_flow.py")
walker = ASTWalker(linter)
walker.add_checker(flow_translations_checker)
walker.walk(root_node)
messages = linter.release_messages()
assert len(messages) == 1
assert messages[0].msg_id == "home-assistant-subentry-flow-field-not-translated"
def test_subentry_shared_handler_any_type_ok(
linter: UnittestLinter,
flow_translations_checker: ConfigFlowTranslationsChecker,
tmp_path: Path,
) -> None:
"""No warning when field exists in any of the mapped subentry types."""
integration_dir = _make_integration(
tmp_path,
{
"config_subentries": {
"type_a": {
"step": {"user": {"data": {"name": "Name"}}},
},
"type_b": {
"step": {"user": {"data": {}}},
},
}
},
)
root_node = astroid.parse(
"""
class MyConfigFlow(ConfigFlow, domain=DOMAIN):
@classmethod
def async_get_supported_subentry_types(cls, config_entry):
return {"type_a": SharedFlow, "type_b": SharedFlow}
class SharedFlow(ConfigSubentryFlow):
async def async_step_user(self, user_input=None):
return self.async_show_form(
step_id="user",
data_schema=vol.Schema({
vol.Required("name"): str,
}),
)
""",
"homeassistant.components.test_int.config_flow",
)
root_node.file = str(integration_dir / "config_flow.py")
walker = ASTWalker(linter)
walker.add_checker(flow_translations_checker)
with assert_no_messages(linter):
walker.walk(root_node)
# --- Inference tests ---
def test_implicit_step_id_from_method(
linter: UnittestLinter,
flow_translations_checker: ConfigFlowTranslationsChecker,
tmp_path: Path,
) -> None:
"""Test that step_id is inferred from async_step_* method name."""
integration_dir = _make_integration(
tmp_path,
{"config": {"step": {"user": {"data": {"host": "Host"}}}}},
)
root_node = astroid.parse(
"""
class MyConfigFlow(ConfigFlow, domain=DOMAIN):
async def async_step_user(self, user_input=None):
return self.async_show_form(
data_schema=vol.Schema({vol.Required("host"): str}),
)
""",
"homeassistant.components.test_int.config_flow",
)
root_node.file = str(integration_dir / "config_flow.py")
walker = ASTWalker(linter)
walker.add_checker(flow_translations_checker)
with assert_no_messages(linter):
walker.walk(root_node)
def test_implicit_step_id_missing_field_flagged(
linter: UnittestLinter,
flow_translations_checker: ConfigFlowTranslationsChecker,
tmp_path: Path,
) -> None:
"""Test that missing field is flagged when step_id is inferred."""
integration_dir = _make_integration(
tmp_path,
{"config": {"step": {"user": {"data": {"host": "Host"}}}}},
)
root_node = astroid.parse(
"""
class MyConfigFlow(ConfigFlow, domain=DOMAIN):
async def async_step_user(self, user_input=None):
return self.async_show_form(
data_schema=vol.Schema({
vol.Required("host"): str,
vol.Required("missing"): str,
}),
)
""",
"homeassistant.components.test_int.config_flow",
)
root_node.file = str(integration_dir / "config_flow.py")
walker = ASTWalker(linter)
walker.add_checker(flow_translations_checker)
walker.walk(root_node)
messages = linter.release_messages()
assert len(messages) == 1
assert messages[0].msg_id == "home-assistant-config-flow-field-not-translated"
assert "missing" in messages[0].args[0]
def test_section_fields_translated_ok(
linter: UnittestLinter,
flow_translations_checker: ConfigFlowTranslationsChecker,
tmp_path: Path,
) -> None:
"""Test that section fields are checked against sections translations."""
integration_dir = _make_integration(
tmp_path,
{
"config": {
"step": {
"user": {
"data": {"host": "Host"},
"sections": {
"advanced": {
"data": {"ssl": "Use SSL", "verify": "Verify"},
}
},
}
}
}
},
)
root_node = astroid.parse(
"""
class MyConfigFlow(ConfigFlow, domain=DOMAIN):
async def async_step_user(self, user_input=None):
return self.async_show_form(
step_id="user",
data_schema=vol.Schema({
vol.Required("host"): str,
vol.Required("advanced"): section(
vol.Schema({
vol.Required("ssl"): bool,
vol.Required("verify"): bool,
}),
{"collapsed": True},
),
}),
)
""",
"homeassistant.components.test_int.config_flow",
)
root_node.file = str(integration_dir / "config_flow.py")
walker = ASTWalker(linter)
walker.add_checker(flow_translations_checker)
with assert_no_messages(linter):
walker.walk(root_node)
def test_section_missing_field_flagged(
linter: UnittestLinter,
flow_translations_checker: ConfigFlowTranslationsChecker,
tmp_path: Path,
) -> None:
"""Test that missing section fields are flagged."""
integration_dir = _make_integration(
tmp_path,
{
"config": {
"step": {
"user": {
"data": {"host": "Host"},
"sections": {
"advanced": {
"data": {"ssl": "Use SSL"},
}
},
}
}
}
},
)
root_node = astroid.parse(
"""
class MyConfigFlow(ConfigFlow, domain=DOMAIN):
async def async_step_user(self, user_input=None):
return self.async_show_form(
step_id="user",
data_schema=vol.Schema({
vol.Required("host"): str,
vol.Required("advanced"): section(
vol.Schema({
vol.Required("ssl"): bool,
vol.Required("missing_field"): bool,
}),
{"collapsed": True},
),
}),
)
""",
"homeassistant.components.test_int.config_flow",
)
root_node.file = str(integration_dir / "config_flow.py")
walker = ASTWalker(linter)
walker.add_checker(flow_translations_checker)
walker.walk(root_node)
messages = linter.release_messages()
assert len(messages) == 1
assert "missing_field" in messages[0].args[0]
def test_section_attribute_form_ok(
linter: UnittestLinter,
flow_translations_checker: ConfigFlowTranslationsChecker,
tmp_path: Path,
) -> None:
"""Test that data_entry_flow.section(...) is recognized as a section."""
integration_dir = _make_integration(
tmp_path,
{
"config": {
"step": {
"user": {
"data": {"host": "Host"},
"sections": {
"advanced": {
"data": {"ssl": "Use SSL"},
}
},
}
}
}
},
)
root_node = astroid.parse(
"""
class MyConfigFlow(ConfigFlow, domain=DOMAIN):
async def async_step_user(self, user_input=None):
return self.async_show_form(
step_id="user",
data_schema=vol.Schema({
vol.Required("host"): str,
vol.Required("advanced"): data_entry_flow.section(
vol.Schema({vol.Required("ssl"): bool}),
{"collapsed": True},
),
}),
)
""",
"homeassistant.components.test_int.config_flow",
)
root_node.file = str(integration_dir / "config_flow.py")
walker = ASTWalker(linter)
walker.add_checker(flow_translations_checker)
with assert_no_messages(linter):
walker.walk(root_node)
def test_constant_field_names_resolved(
linter: UnittestLinter,
flow_translations_checker: ConfigFlowTranslationsChecker,
tmp_path: Path,
) -> None:
"""Test that constant field names are resolved via inference."""
integration_dir = _make_integration(
tmp_path,
{"config": {"step": {"user": {"data": {"host": "Host"}}}}},
)
root_node = astroid.parse(
"""
CONF_HOST = "host"
class MyConfigFlow(ConfigFlow, domain=DOMAIN):
async def async_step_user(self, user_input=None):
return self.async_show_form(
step_id="user",
data_schema=vol.Schema({vol.Required(CONF_HOST): str}),
)
""",
"homeassistant.components.test_int.config_flow",
)
root_node.file = str(integration_dir / "config_flow.py")
walker = ASTWalker(linter)
walker.add_checker(flow_translations_checker)
with assert_no_messages(linter):
walker.walk(root_node)
def test_dict_unpacking_in_schema(
linter: UnittestLinter,
flow_translations_checker: ConfigFlowTranslationsChecker,
tmp_path: Path,
) -> None:
"""Test that **dict unpacking in schema dicts is resolved."""
integration_dir = _make_integration(
tmp_path,
{"config": {"step": {"user": {"data": {"host": "Host", "port": "Port"}}}}},
)
root_node = astroid.parse(
"""
BASE = {vol.Required("host"): str}
class MyConfigFlow(ConfigFlow, domain=DOMAIN):
async def async_step_user(self, user_input=None):
return self.async_show_form(
step_id="user",
data_schema=vol.Schema({**BASE, vol.Optional("port"): int}),
)
""",
"homeassistant.components.test_int.config_flow",
)
root_node.file = str(integration_dir / "config_flow.py")
walker = ASTWalker(linter)
walker.add_checker(flow_translations_checker)
with assert_no_messages(linter):
walker.walk(root_node)
def test_schema_variable_resolved(
linter: UnittestLinter,
flow_translations_checker: ConfigFlowTranslationsChecker,
tmp_path: Path,
) -> None:
"""Test that schema passed via a variable is resolved."""
integration_dir = _make_integration(
tmp_path,
{"config": {"step": {"user": {"data": {"host": "Host"}}}}},
)
root_node = astroid.parse(
"""
USER_SCHEMA = vol.Schema({vol.Required("host"): str})
class MyConfigFlow(ConfigFlow, domain=DOMAIN):
async def async_step_user(self, user_input=None):
return self.async_show_form(step_id="user", data_schema=USER_SCHEMA)
""",
"homeassistant.components.test_int.config_flow",
)
root_node.file = str(integration_dir / "config_flow.py")
walker = ASTWalker(linter)
walker.add_checker(flow_translations_checker)
with assert_no_messages(linter):
walker.walk(root_node)
# --- Edge cases ---
def test_no_strings_json_no_warning(
linter: UnittestLinter,
flow_translations_checker: ConfigFlowTranslationsChecker,
tmp_path: Path,
) -> None:
"""No warning when strings.json doesn't exist."""
integration_dir = _make_integration(tmp_path)
root_node = astroid.parse(
"""
class MyConfigFlow(ConfigFlow, domain=DOMAIN):
async def async_step_user(self, user_input=None):
return self.async_show_form(
step_id="user",
data_schema=vol.Schema({vol.Required("host"): str}),
)
""",
"homeassistant.components.test_int.config_flow",
)
root_node.file = str(integration_dir / "config_flow.py")
walker = ASTWalker(linter)
walker.add_checker(flow_translations_checker)
with assert_no_messages(linter):
walker.walk(root_node)
def test_not_config_flow_module_ignored(
linter: UnittestLinter,
flow_translations_checker: ConfigFlowTranslationsChecker,
tmp_path: Path,
) -> None:
"""Non-config_flow modules are ignored."""
integration_dir = _make_integration(
tmp_path, {"config": {"step": {"user": {"data": {}}}}}
)
root_node = astroid.parse(
"""
class MyConfigFlow(ConfigFlow, domain=DOMAIN):
async def async_step_user(self, user_input=None):
return self.async_show_form(
step_id="user",
data_schema=vol.Schema({vol.Required("host"): str}),
)
""",
"homeassistant.components.test_int.sensor",
)
root_node.file = str(integration_dir / "sensor.py")
walker = ASTWalker(linter)
walker.add_checker(flow_translations_checker)
with assert_no_messages(linter):
walker.walk(root_node)
def test_no_data_schema_no_warning(
linter: UnittestLinter,
flow_translations_checker: ConfigFlowTranslationsChecker,
tmp_path: Path,
) -> None:
"""No warning when async_show_form has no data_schema."""
integration_dir = _make_integration(
tmp_path, {"config": {"step": {"link": {"title": "Link"}}}}
)
root_node = astroid.parse(
"""
class MyConfigFlow(ConfigFlow, domain=DOMAIN):
async def async_step_link(self, user_input=None):
return self.async_show_form(step_id="link")
""",
"homeassistant.components.test_int.config_flow",
)
root_node.file = str(integration_dir / "config_flow.py")
walker = ASTWalker(linter)
walker.add_checker(flow_translations_checker)
with assert_no_messages(linter):
walker.walk(root_node)