mirror of
https://github.com/home-assistant/core.git
synced 2025-06-25 01:21:51 +02:00
Add missing type annotations to Airvisual (#52615)
This commit is contained in:
@ -9,6 +9,7 @@ homeassistant.components.actiontec.*
|
||||
homeassistant.components.aftership.*
|
||||
homeassistant.components.air_quality.*
|
||||
homeassistant.components.airly.*
|
||||
homeassistant.components.airvisual.*
|
||||
homeassistant.components.aladdin_connect.*
|
||||
homeassistant.components.alarm_control_panel.*
|
||||
homeassistant.components.amazon_polly.*
|
||||
|
@ -1,6 +1,10 @@
|
||||
"""The airvisual component."""
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Mapping, MutableMapping
|
||||
from datetime import timedelta
|
||||
from math import ceil
|
||||
from typing import Any, Dict, cast
|
||||
|
||||
from pyairvisual import CloudAPI, NodeSamba
|
||||
from pyairvisual.errors import (
|
||||
@ -10,6 +14,7 @@ from pyairvisual.errors import (
|
||||
NodeProError,
|
||||
)
|
||||
|
||||
from homeassistant.config_entries import ConfigEntry
|
||||
from homeassistant.const import (
|
||||
ATTR_ATTRIBUTION,
|
||||
CONF_API_KEY,
|
||||
@ -20,7 +25,7 @@ from homeassistant.const import (
|
||||
CONF_SHOW_ON_MAP,
|
||||
CONF_STATE,
|
||||
)
|
||||
from homeassistant.core import callback
|
||||
from homeassistant.core import HomeAssistant, callback
|
||||
from homeassistant.exceptions import ConfigEntryAuthFailed
|
||||
from homeassistant.helpers import (
|
||||
aiohttp_client,
|
||||
@ -57,11 +62,8 @@ CONFIG_SCHEMA = cv.deprecated(DOMAIN)
|
||||
|
||||
|
||||
@callback
|
||||
def async_get_geography_id(geography_dict):
|
||||
def async_get_geography_id(geography_dict: Mapping[str, Any]) -> str:
|
||||
"""Generate a unique ID from a geography dict."""
|
||||
if not geography_dict:
|
||||
return
|
||||
|
||||
if CONF_CITY in geography_dict:
|
||||
return ", ".join(
|
||||
(
|
||||
@ -76,7 +78,9 @@ def async_get_geography_id(geography_dict):
|
||||
|
||||
|
||||
@callback
|
||||
def async_get_cloud_api_update_interval(hass, api_key, num_consumers):
|
||||
def async_get_cloud_api_update_interval(
|
||||
hass: HomeAssistant, api_key: str, num_consumers: int
|
||||
) -> timedelta:
|
||||
"""Get a leveled scan interval for a particular cloud API key.
|
||||
|
||||
This will shift based on the number of active consumers, thus keeping the user
|
||||
@ -97,18 +101,22 @@ def async_get_cloud_api_update_interval(hass, api_key, num_consumers):
|
||||
|
||||
|
||||
@callback
|
||||
def async_get_cloud_coordinators_by_api_key(hass, api_key):
|
||||
def async_get_cloud_coordinators_by_api_key(
|
||||
hass: HomeAssistant, api_key: str
|
||||
) -> list[DataUpdateCoordinator]:
|
||||
"""Get all DataUpdateCoordinator objects related to a particular API key."""
|
||||
coordinators = []
|
||||
for entry_id, coordinator in hass.data[DOMAIN][DATA_COORDINATOR].items():
|
||||
config_entry = hass.config_entries.async_get_entry(entry_id)
|
||||
if config_entry.data.get(CONF_API_KEY) == api_key:
|
||||
if config_entry and config_entry.data.get(CONF_API_KEY) == api_key:
|
||||
coordinators.append(coordinator)
|
||||
return coordinators
|
||||
|
||||
|
||||
@callback
|
||||
def async_sync_geo_coordinator_update_intervals(hass, api_key):
|
||||
def async_sync_geo_coordinator_update_intervals(
|
||||
hass: HomeAssistant, api_key: str
|
||||
) -> None:
|
||||
"""Sync the update interval for geography-based data coordinators (by API key)."""
|
||||
coordinators = async_get_cloud_coordinators_by_api_key(hass, api_key)
|
||||
|
||||
@ -129,7 +137,9 @@ def async_sync_geo_coordinator_update_intervals(hass, api_key):
|
||||
|
||||
|
||||
@callback
|
||||
def _standardize_geography_config_entry(hass, config_entry):
|
||||
def _standardize_geography_config_entry(
|
||||
hass: HomeAssistant, config_entry: ConfigEntry
|
||||
) -> None:
|
||||
"""Ensure that geography config entries have appropriate properties."""
|
||||
entry_updates = {}
|
||||
|
||||
@ -162,9 +172,11 @@ def _standardize_geography_config_entry(hass, config_entry):
|
||||
|
||||
|
||||
@callback
|
||||
def _standardize_node_pro_config_entry(hass, config_entry):
|
||||
def _standardize_node_pro_config_entry(
|
||||
hass: HomeAssistant, config_entry: ConfigEntry
|
||||
) -> None:
|
||||
"""Ensure that Node/Pro config entries have appropriate properties."""
|
||||
entry_updates = {}
|
||||
entry_updates: dict[str, Any] = {}
|
||||
|
||||
if CONF_INTEGRATION_TYPE not in config_entry.data:
|
||||
# If the config entry data doesn't contain the integration type, add it:
|
||||
@ -179,7 +191,7 @@ def _standardize_node_pro_config_entry(hass, config_entry):
|
||||
hass.config_entries.async_update_entry(config_entry, **entry_updates)
|
||||
|
||||
|
||||
async def async_setup_entry(hass, config_entry):
|
||||
async def async_setup_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> bool:
|
||||
"""Set up AirVisual as config entry."""
|
||||
hass.data.setdefault(DOMAIN, {DATA_COORDINATOR: {}, DATA_LISTENER: {}})
|
||||
|
||||
@ -189,7 +201,7 @@ async def async_setup_entry(hass, config_entry):
|
||||
websession = aiohttp_client.async_get_clientsession(hass)
|
||||
cloud_api = CloudAPI(config_entry.data[CONF_API_KEY], session=websession)
|
||||
|
||||
async def async_update_data():
|
||||
async def async_update_data() -> dict[str, Any]:
|
||||
"""Get new data from the API."""
|
||||
if CONF_CITY in config_entry.data:
|
||||
api_coro = cloud_api.air_quality.city(
|
||||
@ -204,7 +216,8 @@ async def async_setup_entry(hass, config_entry):
|
||||
)
|
||||
|
||||
try:
|
||||
return await api_coro
|
||||
data = await api_coro
|
||||
return cast(Dict[str, Any], data)
|
||||
except (InvalidKeyError, KeyExpiredError) as ex:
|
||||
raise ConfigEntryAuthFailed from ex
|
||||
except AirVisualError as err:
|
||||
@ -242,13 +255,14 @@ async def async_setup_entry(hass, config_entry):
|
||||
|
||||
_standardize_node_pro_config_entry(hass, config_entry)
|
||||
|
||||
async def async_update_data():
|
||||
async def async_update_data() -> dict[str, Any]:
|
||||
"""Get new data from the API."""
|
||||
try:
|
||||
async with NodeSamba(
|
||||
config_entry.data[CONF_IP_ADDRESS], config_entry.data[CONF_PASSWORD]
|
||||
) as node:
|
||||
return await node.async_get_latest_measurements()
|
||||
data = await node.async_get_latest_measurements()
|
||||
return cast(Dict[str, Any], data)
|
||||
except NodeProError as err:
|
||||
raise UpdateFailed(f"Error while retrieving data: {err}") from err
|
||||
|
||||
@ -275,7 +289,7 @@ async def async_setup_entry(hass, config_entry):
|
||||
return True
|
||||
|
||||
|
||||
async def async_migrate_entry(hass, config_entry):
|
||||
async def async_migrate_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> bool:
|
||||
"""Migrate an old config entry."""
|
||||
version = config_entry.version
|
||||
|
||||
@ -317,7 +331,7 @@ async def async_migrate_entry(hass, config_entry):
|
||||
return True
|
||||
|
||||
|
||||
async def async_unload_entry(hass, config_entry):
|
||||
async def async_unload_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> bool:
|
||||
"""Unload an AirVisual config entry."""
|
||||
unload_ok = await hass.config_entries.async_unload_platforms(
|
||||
config_entry, PLATFORMS
|
||||
@ -338,7 +352,7 @@ async def async_unload_entry(hass, config_entry):
|
||||
return unload_ok
|
||||
|
||||
|
||||
async def async_reload_entry(hass, config_entry):
|
||||
async def async_reload_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> None:
|
||||
"""Handle an options update."""
|
||||
await hass.config_entries.async_reload(config_entry.entry_id)
|
||||
|
||||
@ -346,16 +360,19 @@ async def async_reload_entry(hass, config_entry):
|
||||
class AirVisualEntity(CoordinatorEntity):
|
||||
"""Define a generic AirVisual entity."""
|
||||
|
||||
def __init__(self, coordinator):
|
||||
def __init__(self, coordinator: DataUpdateCoordinator) -> None:
|
||||
"""Initialize."""
|
||||
super().__init__(coordinator)
|
||||
self._attr_extra_state_attributes = {ATTR_ATTRIBUTION: DEFAULT_ATTRIBUTION}
|
||||
|
||||
async def async_added_to_hass(self):
|
||||
self._attr_extra_state_attributes: MutableMapping[str, Any] = {
|
||||
ATTR_ATTRIBUTION: DEFAULT_ATTRIBUTION
|
||||
}
|
||||
|
||||
async def async_added_to_hass(self) -> None:
|
||||
"""Register callbacks."""
|
||||
|
||||
@callback
|
||||
def update():
|
||||
def update() -> None:
|
||||
"""Update the state."""
|
||||
self.update_from_latest_data()
|
||||
self.async_write_ha_state()
|
||||
@ -365,6 +382,6 @@ class AirVisualEntity(CoordinatorEntity):
|
||||
self.update_from_latest_data()
|
||||
|
||||
@callback
|
||||
def update_from_latest_data(self):
|
||||
def update_from_latest_data(self) -> None:
|
||||
"""Update the entity from the latest data."""
|
||||
raise NotImplementedError
|
||||
|
@ -1,4 +1,6 @@
|
||||
"""Define a config flow manager for AirVisual."""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
|
||||
from pyairvisual import CloudAPI, NodeSamba
|
||||
@ -11,6 +13,7 @@ from pyairvisual.errors import (
|
||||
import voluptuous as vol
|
||||
|
||||
from homeassistant import config_entries
|
||||
from homeassistant.config_entries import ConfigEntry, OptionsFlow
|
||||
from homeassistant.const import (
|
||||
CONF_API_KEY,
|
||||
CONF_IP_ADDRESS,
|
||||
@ -21,6 +24,7 @@ from homeassistant.const import (
|
||||
CONF_STATE,
|
||||
)
|
||||
from homeassistant.core import callback
|
||||
from homeassistant.data_entry_flow import FlowResult
|
||||
from homeassistant.helpers import aiohttp_client, config_validation as cv
|
||||
|
||||
from . import async_get_geography_id
|
||||
@ -64,13 +68,13 @@ class AirVisualFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
|
||||
|
||||
VERSION = 2
|
||||
|
||||
def __init__(self):
|
||||
def __init__(self) -> None:
|
||||
"""Initialize the config flow."""
|
||||
self._entry_data_for_reauth = None
|
||||
self._geo_id = None
|
||||
self._entry_data_for_reauth: dict[str, str] = {}
|
||||
self._geo_id: str | None = None
|
||||
|
||||
@property
|
||||
def geography_coords_schema(self):
|
||||
def geography_coords_schema(self) -> vol.Schema:
|
||||
"""Return the data schema for the cloud API."""
|
||||
return API_KEY_DATA_SCHEMA.extend(
|
||||
{
|
||||
@ -83,7 +87,9 @@ class AirVisualFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
|
||||
}
|
||||
)
|
||||
|
||||
async def _async_finish_geography(self, user_input, integration_type):
|
||||
async def _async_finish_geography(
|
||||
self, user_input: dict[str, str], integration_type: str
|
||||
) -> FlowResult:
|
||||
"""Validate a Cloud API key."""
|
||||
websession = aiohttp_client.async_get_clientsession(self.hass)
|
||||
cloud_api = CloudAPI(user_input[CONF_API_KEY], session=websession)
|
||||
@ -142,25 +148,29 @@ class AirVisualFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
|
||||
data={**user_input, CONF_INTEGRATION_TYPE: integration_type},
|
||||
)
|
||||
|
||||
async def _async_init_geography(self, user_input, integration_type):
|
||||
async def _async_init_geography(
|
||||
self, user_input: dict[str, str], integration_type: str
|
||||
) -> FlowResult:
|
||||
"""Handle the initialization of the integration via the cloud API."""
|
||||
self._geo_id = async_get_geography_id(user_input)
|
||||
await self._async_set_unique_id(self._geo_id)
|
||||
self._abort_if_unique_id_configured()
|
||||
return await self._async_finish_geography(user_input, integration_type)
|
||||
|
||||
async def _async_set_unique_id(self, unique_id):
|
||||
async def _async_set_unique_id(self, unique_id: str) -> None:
|
||||
"""Set the unique ID of the config flow and abort if it already exists."""
|
||||
await self.async_set_unique_id(unique_id)
|
||||
self._abort_if_unique_id_configured()
|
||||
|
||||
@staticmethod
|
||||
@callback
|
||||
def async_get_options_flow(config_entry):
|
||||
def async_get_options_flow(config_entry: ConfigEntry) -> OptionsFlow:
|
||||
"""Define the config flow to handle options."""
|
||||
return AirVisualOptionsFlowHandler(config_entry)
|
||||
|
||||
async def async_step_geography_by_coords(self, user_input=None):
|
||||
async def async_step_geography_by_coords(
|
||||
self, user_input: dict[str, str] | None = None
|
||||
) -> FlowResult:
|
||||
"""Handle the initialization of the cloud API based on latitude/longitude."""
|
||||
if not user_input:
|
||||
return self.async_show_form(
|
||||
@ -171,7 +181,9 @@ class AirVisualFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
|
||||
user_input, INTEGRATION_TYPE_GEOGRAPHY_COORDS
|
||||
)
|
||||
|
||||
async def async_step_geography_by_name(self, user_input=None):
|
||||
async def async_step_geography_by_name(
|
||||
self, user_input: dict[str, str] | None = None
|
||||
) -> FlowResult:
|
||||
"""Handle the initialization of the cloud API based on city/state/country."""
|
||||
if not user_input:
|
||||
return self.async_show_form(
|
||||
@ -182,7 +194,9 @@ class AirVisualFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
|
||||
user_input, INTEGRATION_TYPE_GEOGRAPHY_NAME
|
||||
)
|
||||
|
||||
async def async_step_node_pro(self, user_input=None):
|
||||
async def async_step_node_pro(
|
||||
self, user_input: dict[str, str] | None = None
|
||||
) -> FlowResult:
|
||||
"""Handle the initialization of the integration with a Node/Pro."""
|
||||
if not user_input:
|
||||
return self.async_show_form(step_id="node_pro", data_schema=NODE_PRO_SCHEMA)
|
||||
@ -208,13 +222,15 @@ class AirVisualFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
|
||||
data={**user_input, CONF_INTEGRATION_TYPE: INTEGRATION_TYPE_NODE_PRO},
|
||||
)
|
||||
|
||||
async def async_step_reauth(self, data):
|
||||
async def async_step_reauth(self, data: dict[str, str]) -> FlowResult:
|
||||
"""Handle configuration by re-auth."""
|
||||
self._entry_data_for_reauth = data
|
||||
self._geo_id = async_get_geography_id(data)
|
||||
return await self.async_step_reauth_confirm()
|
||||
|
||||
async def async_step_reauth_confirm(self, user_input=None):
|
||||
async def async_step_reauth_confirm(
|
||||
self, user_input: dict[str, str] | None = None
|
||||
) -> FlowResult:
|
||||
"""Handle re-auth completion."""
|
||||
if not user_input:
|
||||
return self.async_show_form(
|
||||
@ -227,7 +243,9 @@ class AirVisualFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
|
||||
conf, self._entry_data_for_reauth[CONF_INTEGRATION_TYPE]
|
||||
)
|
||||
|
||||
async def async_step_user(self, user_input=None):
|
||||
async def async_step_user(
|
||||
self, user_input: dict[str, str] | None = None
|
||||
) -> FlowResult:
|
||||
"""Handle the start of the config flow."""
|
||||
if not user_input:
|
||||
return self.async_show_form(
|
||||
@ -244,11 +262,13 @@ class AirVisualFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
|
||||
class AirVisualOptionsFlowHandler(config_entries.OptionsFlow):
|
||||
"""Handle an AirVisual options flow."""
|
||||
|
||||
def __init__(self, config_entry):
|
||||
def __init__(self, config_entry: ConfigEntry) -> None:
|
||||
"""Initialize."""
|
||||
self.config_entry = config_entry
|
||||
|
||||
async def async_step_init(self, user_input=None):
|
||||
async def async_step_init(
|
||||
self, user_input: dict[str, str] | None = None
|
||||
) -> FlowResult:
|
||||
"""Manage the options."""
|
||||
if user_input is not None:
|
||||
return self.async_create_entry(title="", data=user_input)
|
||||
|
@ -1,5 +1,8 @@
|
||||
"""Support for AirVisual air quality sensors."""
|
||||
from __future__ import annotations
|
||||
|
||||
from homeassistant.components.sensor import SensorEntity
|
||||
from homeassistant.config_entries import ConfigEntry
|
||||
from homeassistant.const import (
|
||||
ATTR_LATITUDE,
|
||||
ATTR_LONGITUDE,
|
||||
@ -18,7 +21,10 @@ from homeassistant.const import (
|
||||
PERCENTAGE,
|
||||
TEMP_CELSIUS,
|
||||
)
|
||||
from homeassistant.core import callback
|
||||
from homeassistant.core import HomeAssistant, callback
|
||||
from homeassistant.helpers.entity import DeviceInfo
|
||||
from homeassistant.helpers.entity_platform import AddEntitiesCallback
|
||||
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
|
||||
|
||||
from . import AirVisualEntity
|
||||
from .const import (
|
||||
@ -141,10 +147,15 @@ POLLUTANT_UNITS = {
|
||||
}
|
||||
|
||||
|
||||
async def async_setup_entry(hass, config_entry, async_add_entities):
|
||||
async def async_setup_entry(
|
||||
hass: HomeAssistant,
|
||||
config_entry: ConfigEntry,
|
||||
async_add_entities: AddEntitiesCallback,
|
||||
) -> None:
|
||||
"""Set up AirVisual sensors based on a config entry."""
|
||||
coordinator = hass.data[DOMAIN][DATA_COORDINATOR][config_entry.entry_id]
|
||||
|
||||
sensors: list[AirVisualGeographySensor | AirVisualNodeProSensor]
|
||||
if config_entry.data[CONF_INTEGRATION_TYPE] in [
|
||||
INTEGRATION_TYPE_GEOGRAPHY_COORDS,
|
||||
INTEGRATION_TYPE_GEOGRAPHY_NAME,
|
||||
@ -174,7 +185,16 @@ async def async_setup_entry(hass, config_entry, async_add_entities):
|
||||
class AirVisualGeographySensor(AirVisualEntity, SensorEntity):
|
||||
"""Define an AirVisual sensor related to geography data via the Cloud API."""
|
||||
|
||||
def __init__(self, coordinator, config_entry, kind, name, icon, unit, locale):
|
||||
def __init__(
|
||||
self,
|
||||
coordinator: DataUpdateCoordinator,
|
||||
config_entry: ConfigEntry,
|
||||
kind: str,
|
||||
name: str,
|
||||
icon: str,
|
||||
unit: str | None,
|
||||
locale: str,
|
||||
) -> None:
|
||||
"""Initialize."""
|
||||
super().__init__(coordinator)
|
||||
|
||||
@ -203,7 +223,7 @@ class AirVisualGeographySensor(AirVisualEntity, SensorEntity):
|
||||
return super().available and self.coordinator.data["current"]["pollution"]
|
||||
|
||||
@callback
|
||||
def update_from_latest_data(self):
|
||||
def update_from_latest_data(self) -> None:
|
||||
"""Update the entity from the latest data."""
|
||||
try:
|
||||
data = self.coordinator.data["current"]["pollution"]
|
||||
@ -260,7 +280,15 @@ class AirVisualGeographySensor(AirVisualEntity, SensorEntity):
|
||||
class AirVisualNodeProSensor(AirVisualEntity, SensorEntity):
|
||||
"""Define an AirVisual sensor related to a Node/Pro unit."""
|
||||
|
||||
def __init__(self, coordinator, kind, name, device_class, icon, unit):
|
||||
def __init__(
|
||||
self,
|
||||
coordinator: DataUpdateCoordinator,
|
||||
kind: str,
|
||||
name: str,
|
||||
device_class: str | None,
|
||||
icon: str | None,
|
||||
unit: str,
|
||||
) -> None:
|
||||
"""Initialize."""
|
||||
super().__init__(coordinator)
|
||||
|
||||
@ -274,7 +302,7 @@ class AirVisualNodeProSensor(AirVisualEntity, SensorEntity):
|
||||
self._kind = kind
|
||||
|
||||
@property
|
||||
def device_info(self):
|
||||
def device_info(self) -> DeviceInfo:
|
||||
"""Return device registry information for this entity."""
|
||||
return {
|
||||
"identifiers": {(DOMAIN, self.coordinator.data["serial_number"])},
|
||||
@ -288,7 +316,7 @@ class AirVisualNodeProSensor(AirVisualEntity, SensorEntity):
|
||||
}
|
||||
|
||||
@callback
|
||||
def update_from_latest_data(self):
|
||||
def update_from_latest_data(self) -> None:
|
||||
"""Update the entity from the latest data."""
|
||||
if self._kind == SENSOR_KIND_AQI:
|
||||
if self.coordinator.data["settings"]["is_aqi_usa"]:
|
||||
|
11
mypy.ini
11
mypy.ini
@ -110,6 +110,17 @@ no_implicit_optional = true
|
||||
warn_return_any = true
|
||||
warn_unreachable = true
|
||||
|
||||
[mypy-homeassistant.components.airvisual.*]
|
||||
check_untyped_defs = true
|
||||
disallow_incomplete_defs = true
|
||||
disallow_subclassing_any = true
|
||||
disallow_untyped_calls = true
|
||||
disallow_untyped_decorators = true
|
||||
disallow_untyped_defs = true
|
||||
no_implicit_optional = true
|
||||
warn_return_any = true
|
||||
warn_unreachable = true
|
||||
|
||||
[mypy-homeassistant.components.aladdin_connect.*]
|
||||
check_untyped_defs = true
|
||||
disallow_incomplete_defs = true
|
||||
|
Reference in New Issue
Block a user