Compare commits

..

2 Commits

Author SHA1 Message Date
farmio
a166da552b fix type 2026-01-13 22:31:56 +01:00
farmio
31ae6951db KNX Expose: Add support for sending value periodically 2026-01-13 22:16:57 +01:00
1033 changed files with 1950 additions and 18801 deletions

View File

@@ -260,7 +260,7 @@ jobs:
run: |
echo "::add-matcher::.github/workflows/matchers/codespell.json"
- name: Run prek
uses: j178/prek-action@9d6a3097e0c1865ecce00cfb89fe80f2ee91b547 # v1.0.12
uses: j178/prek-action@91fd7d7cf70ae1dee9f4f44e7dfa5d1073fe6623 # v1.0.11
env:
PREK_SKIP: no-commit-to-branch,mypy,pylint,gen_requirements_all,hassfest,hassfest-metadata,hassfest-mypy-config
RUFF_OUTPUT_FORMAT: github

View File

@@ -39,7 +39,7 @@ repos:
- id: prettier
additional_dependencies:
- prettier@3.6.2
- prettier-plugin-sort-json@4.2.0
- prettier-plugin-sort-json@4.1.1
- repo: https://github.com/cdce8p/python-typing-update
rev: v0.6.0
hooks:

View File

@@ -407,7 +407,6 @@ homeassistant.components.person.*
homeassistant.components.pi_hole.*
homeassistant.components.ping.*
homeassistant.components.plugwise.*
homeassistant.components.pooldose.*
homeassistant.components.portainer.*
homeassistant.components.powerfox.*
homeassistant.components.powerwall.*

2
.vscode/tasks.json vendored
View File

@@ -120,7 +120,7 @@
{
"label": "Generate Requirements",
"type": "shell",
"command": "${command:python.interpreterPath} -m script.gen_requirements_all",
"command": "./script/gen_requirements_all.py",
"group": {
"kind": "build",
"isDefault": true

2
CODEOWNERS generated
View File

@@ -711,8 +711,6 @@ build.json @home-assistant/supervisor
/tests/components/homematic/ @pvizeli
/homeassistant/components/homematicip_cloud/ @hahn-th
/tests/components/homematicip_cloud/ @hahn-th
/homeassistant/components/homevolt/ @danielhiversen
/tests/components/homevolt/ @danielhiversen
/homeassistant/components/homewizard/ @DCSBL
/tests/components/homewizard/ @DCSBL
/homeassistant/components/honeywell/ @rdfurman @mkmer

View File

@@ -123,7 +123,6 @@ SERVICE_TRIGGER = "trigger"
NEW_TRIGGERS_CONDITIONS_FEATURE_FLAG = "new_triggers_conditions"
_EXPERIMENTAL_CONDITION_PLATFORMS = {
"fan",
"light",
}

View File

@@ -4,7 +4,6 @@ from __future__ import annotations
import json
import logging
from typing import Any
from azure.servicebus import ServiceBusMessage
from azure.servicebus.aio import ServiceBusClient, ServiceBusSender
@@ -93,7 +92,7 @@ class ServiceBusNotificationService(BaseNotificationService):
"""Initialize the service."""
self._client = client
async def async_send_message(self, message: str, **kwargs: Any) -> None:
async def async_send_message(self, message, **kwargs):
"""Send a message."""
dto = {ATTR_ASB_MESSAGE: message}

View File

@@ -15,13 +15,5 @@
"get_events": {
"service": "mdi:calendar-month"
}
},
"triggers": {
"event_ended": {
"trigger": "mdi:calendar-end"
},
"event_started": {
"trigger": "mdi:calendar-start"
}
}
}

View File

@@ -45,14 +45,6 @@
"title": "Detected use of deprecated action calendar.list_events"
}
},
"selector": {
"trigger_offset_type": {
"options": {
"after": "After",
"before": "Before"
}
}
},
"services": {
"create_event": {
"description": "Adds a new calendar event.",
@@ -111,35 +103,5 @@
"name": "Get events"
}
},
"title": "Calendar",
"triggers": {
"event_ended": {
"description": "Triggers when a calendar event ends.",
"fields": {
"offset": {
"description": "Offset from the end of the event.",
"name": "Offset"
},
"offset_type": {
"description": "Whether to trigger before or after the end of the event, if an offset is defined.",
"name": "Offset type"
}
},
"name": "Calendar event ended"
},
"event_started": {
"description": "Triggers when a calendar event starts.",
"fields": {
"offset": {
"description": "Offset from the start of the event.",
"name": "Offset"
},
"offset_type": {
"description": "Whether to trigger before or after the start of the event, if an offset is defined.",
"name": "Offset type"
}
},
"name": "Calendar event started"
}
}
"title": "Calendar"
}

View File

@@ -2,7 +2,6 @@
from __future__ import annotations
import asyncio
from collections.abc import Awaitable, Callable
from dataclasses import dataclass
import datetime
@@ -11,15 +10,8 @@ from typing import TYPE_CHECKING, Any, cast
import voluptuous as vol
from homeassistant.const import (
ATTR_ENTITY_ID,
CONF_ENTITY_ID,
CONF_EVENT,
CONF_OFFSET,
CONF_OPTIONS,
CONF_TARGET,
)
from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback, split_entity_id
from homeassistant.const import CONF_ENTITY_ID, CONF_EVENT, CONF_OFFSET, CONF_OPTIONS
from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.automation import move_top_level_schema_fields_to_options
@@ -28,13 +20,12 @@ from homeassistant.helpers.event import (
async_track_point_in_time,
async_track_time_interval,
)
from homeassistant.helpers.target import TargetEntityChangeTracker, TargetSelection
from homeassistant.helpers.trigger import Trigger, TriggerActionRunner, TriggerConfig
from homeassistant.helpers.typing import ConfigType
from homeassistant.util import dt as dt_util
from . import CalendarEntity, CalendarEvent
from .const import DATA_COMPONENT, DOMAIN
from .const import DATA_COMPONENT
_LOGGER = logging.getLogger(__name__)
@@ -42,35 +33,19 @@ EVENT_START = "start"
EVENT_END = "end"
UPDATE_INTERVAL = datetime.timedelta(minutes=15)
CONF_OFFSET_TYPE = "offset_type"
OFFSET_TYPE_BEFORE = "before"
OFFSET_TYPE_AFTER = "after"
_SINGLE_ENTITY_EVENT_OPTIONS_SCHEMA = {
_OPTIONS_SCHEMA_DICT = {
vol.Required(CONF_ENTITY_ID): cv.entity_id,
vol.Optional(CONF_EVENT, default=EVENT_START): vol.In({EVENT_START, EVENT_END}),
vol.Optional(CONF_OFFSET, default=datetime.timedelta(0)): cv.time_period,
}
_SINGLE_ENTITY_EVENT_TRIGGER_SCHEMA = vol.Schema(
_CONFIG_SCHEMA = vol.Schema(
{
vol.Required(CONF_OPTIONS): _SINGLE_ENTITY_EVENT_OPTIONS_SCHEMA,
vol.Required(CONF_OPTIONS): _OPTIONS_SCHEMA_DICT,
},
)
_EVENT_TRIGGER_SCHEMA = vol.Schema(
{
vol.Required(CONF_OPTIONS, default={}): {
vol.Required(CONF_OFFSET, default=datetime.timedelta(0)): cv.time_period,
vol.Required(CONF_OFFSET_TYPE, default=OFFSET_TYPE_BEFORE): vol.In(
{OFFSET_TYPE_BEFORE, OFFSET_TYPE_AFTER}
),
},
vol.Required(CONF_TARGET): cv.TARGET_FIELDS,
}
)
# mypy: disallow-any-generics
@@ -80,7 +55,6 @@ class QueuedCalendarEvent:
trigger_time: datetime.datetime
event: CalendarEvent
entity_id: str
@dataclass
@@ -120,7 +94,7 @@ class Timespan:
return f"[{self.start}, {self.end})"
type EventFetcher = Callable[[Timespan], Awaitable[list[tuple[str, CalendarEvent]]]]
type EventFetcher = Callable[[Timespan], Awaitable[list[CalendarEvent]]]
type QueuedEventFetcher = Callable[[Timespan], Awaitable[list[QueuedCalendarEvent]]]
@@ -136,24 +110,15 @@ def get_entity(hass: HomeAssistant, entity_id: str) -> CalendarEntity:
return entity
def event_fetcher(hass: HomeAssistant, entity_ids: set[str]) -> EventFetcher:
def event_fetcher(hass: HomeAssistant, entity_id: str) -> EventFetcher:
"""Build an async_get_events wrapper to fetch events during a time span."""
async def async_get_events(timespan: Timespan) -> list[tuple[str, CalendarEvent]]:
async def async_get_events(timespan: Timespan) -> list[CalendarEvent]:
"""Return events active in the specified time span."""
entity = get_entity(hass, entity_id)
# Expand by one second to make the end time exclusive
end_time = timespan.end + datetime.timedelta(seconds=1)
events: list[tuple[str, CalendarEvent]] = []
for entity_id in entity_ids:
entity = get_entity(hass, entity_id)
events.extend(
(entity_id, event)
for event in await entity.async_get_events(
hass, timespan.start, end_time
)
)
return events
return await entity.async_get_events(hass, timespan.start, end_time)
return async_get_events
@@ -177,11 +142,12 @@ def queued_event_fetcher(
# Example: For an EVENT_END trigger the event may start during this
# time span, but need to be triggered later when the end happens.
results = []
for entity_id, event in active_events:
trigger_time = get_trigger_time(event)
for trigger_time, event in zip(
map(get_trigger_time, active_events), active_events, strict=False
):
if trigger_time not in offset_timespan:
continue
results.append(QueuedCalendarEvent(trigger_time + offset, event, entity_id))
results.append(QueuedCalendarEvent(trigger_time + offset, event))
_LOGGER.debug(
"Scan events @ %s%s found %s eligible of %s active",
@@ -274,7 +240,6 @@ class CalendarEventListener:
_LOGGER.debug("Dispatching event: %s", queued_event.event)
payload = {
**self._trigger_payload,
ATTR_ENTITY_ID: queued_event.entity_id,
"calendar_event": queued_event.event.as_dict(),
}
self._action_runner(payload, "calendar event state change")
@@ -295,77 +260,8 @@ class CalendarEventListener:
self._listen_next_calendar_event()
class TargetCalendarEventListener(TargetEntityChangeTracker):
"""Helper class to listen to calendar events for target entity changes."""
def __init__(
self,
hass: HomeAssistant,
target_selection: TargetSelection,
event_type: str,
offset: datetime.timedelta,
run_action: TriggerActionRunner,
) -> None:
"""Initialize the state change tracker."""
def entity_filter(entities: set[str]) -> set[str]:
return {
entity_id
for entity_id in entities
if split_entity_id(entity_id)[0] == DOMAIN
}
super().__init__(hass, target_selection, entity_filter)
self._event_type = event_type
self._offset = offset
self._run_action = run_action
self._trigger_data = {
"event": event_type,
"offset": offset,
}
self._pending_listener_task: asyncio.Task[None] | None = None
self._calendar_event_listener: CalendarEventListener | None = None
@callback
def _handle_entities_update(self, tracked_entities: set[str]) -> None:
"""Restart the listeners when the list of entities of the tracked targets is updated."""
if self._pending_listener_task:
self._pending_listener_task.cancel()
self._pending_listener_task = self._hass.async_create_task(
self._start_listening(tracked_entities)
)
async def _start_listening(self, tracked_entities: set[str]) -> None:
"""Start listening for calendar events."""
_LOGGER.debug("Tracking events for calendars: %s", tracked_entities)
if self._calendar_event_listener:
self._calendar_event_listener.async_detach()
self._calendar_event_listener = CalendarEventListener(
self._hass,
self._run_action,
self._trigger_data,
queued_event_fetcher(
event_fetcher(self._hass, tracked_entities),
self._event_type,
self._offset,
),
)
await self._calendar_event_listener.async_attach()
def _unsubscribe(self) -> None:
"""Unsubscribe from all events."""
super()._unsubscribe()
if self._pending_listener_task:
self._pending_listener_task.cancel()
self._pending_listener_task = None
if self._calendar_event_listener:
self._calendar_event_listener.async_detach()
self._calendar_event_listener = None
class SingleEntityEventTrigger(Trigger):
"""Legacy single calendar entity event trigger."""
class EventTrigger(Trigger):
"""Calendar event trigger."""
_options: dict[str, Any]
@@ -375,7 +271,7 @@ class SingleEntityEventTrigger(Trigger):
) -> ConfigType:
"""Validate complete config."""
complete_config = move_top_level_schema_fields_to_options(
complete_config, _SINGLE_ENTITY_EVENT_OPTIONS_SCHEMA
complete_config, _OPTIONS_SCHEMA_DICT
)
return await super().async_validate_complete_config(hass, complete_config)
@@ -384,7 +280,7 @@ class SingleEntityEventTrigger(Trigger):
cls, hass: HomeAssistant, config: ConfigType
) -> ConfigType:
"""Validate config."""
return cast(ConfigType, _SINGLE_ENTITY_EVENT_TRIGGER_SCHEMA(config))
return cast(ConfigType, _CONFIG_SCHEMA(config))
def __init__(self, hass: HomeAssistant, config: TriggerConfig) -> None:
"""Initialize trigger."""
@@ -415,72 +311,15 @@ class SingleEntityEventTrigger(Trigger):
run_action,
trigger_data,
queued_event_fetcher(
event_fetcher(self._hass, {entity_id}), event_type, offset
event_fetcher(self._hass, entity_id), event_type, offset
),
)
await listener.async_attach()
return listener.async_detach
class EventTrigger(Trigger):
"""Calendar event trigger."""
_options: dict[str, Any]
_event_type: str
@classmethod
async def async_validate_config(
cls, hass: HomeAssistant, config: ConfigType
) -> ConfigType:
"""Validate config."""
return cast(ConfigType, _EVENT_TRIGGER_SCHEMA(config))
def __init__(self, hass: HomeAssistant, config: TriggerConfig) -> None:
"""Initialize trigger."""
super().__init__(hass, config)
if TYPE_CHECKING:
assert config.target is not None
assert config.options is not None
self._target = config.target
self._options = config.options
async def async_attach_runner(
self, run_action: TriggerActionRunner
) -> CALLBACK_TYPE:
"""Attach a trigger."""
offset = self._options[CONF_OFFSET]
offset_type = self._options[CONF_OFFSET_TYPE]
if offset_type == OFFSET_TYPE_BEFORE:
offset = -offset
target_selection = TargetSelection(self._target)
if not target_selection.has_any_target:
raise HomeAssistantError(f"No target defined in {self._target}")
listener = TargetCalendarEventListener(
self._hass, target_selection, self._event_type, offset, run_action
)
return listener.async_setup()
class EventStartedTrigger(EventTrigger):
"""Calendar event started trigger."""
_event_type = EVENT_START
class EventEndedTrigger(EventTrigger):
"""Calendar event ended trigger."""
_event_type = EVENT_END
TRIGGERS: dict[str, type[Trigger]] = {
"_": SingleEntityEventTrigger,
"event_started": EventStartedTrigger,
"event_ended": EventEndedTrigger,
"_": EventTrigger,
}

View File

@@ -1,27 +0,0 @@
.trigger_common: &trigger_common
target:
entity:
domain: calendar
fields:
offset:
required: true
default:
days: 0
hours: 0
minutes: 0
seconds: 0
selector:
duration:
enable_day: true
offset_type:
required: true
default: before
selector:
select:
translation_key: trigger_offset_type
options:
- before
- after
event_started: *trigger_common
event_ended: *trigger_common

View File

@@ -3,7 +3,6 @@
from __future__ import annotations
import logging
from typing import Any
import voluptuous as vol
from webexpythonsdk import ApiError, WebexAPI, exceptions
@@ -52,7 +51,7 @@ class CiscoWebexNotificationService(BaseNotificationService):
self.room = room
self.client = client
def send_message(self, message: str = "", **kwargs: Any) -> None:
def send_message(self, message="", **kwargs):
"""Send a message to a user."""
title = ""

View File

@@ -5,7 +5,6 @@ from __future__ import annotations
from http import HTTPStatus
import json
import logging
from typing import Any
import requests
import voluptuous as vol
@@ -82,7 +81,7 @@ class ClicksendNotificationService(BaseNotificationService):
self.language = config[CONF_LANGUAGE]
self.voice = config[CONF_VOICE]
def send_message(self, message: str = "", **kwargs: Any) -> None:
def send_message(self, message="", **kwargs):
"""Send a voice call to a user."""
data = {
"messages": [

View File

@@ -119,7 +119,7 @@ class Concord232ZoneSensor(BinarySensorEntity):
self._zone_type = zone_type
@property
def device_class(self) -> BinarySensorDeviceClass:
def device_class(self):
"""Return the class of this sensor, from DEVICE_CLASSES."""
return self._zone_type

View File

@@ -11,11 +11,13 @@ from homeassistant import config_entries
from homeassistant.components import websocket_api
from homeassistant.components.websocket_api import ERR_NOT_FOUND, require_admin
from homeassistant.core import HomeAssistant, callback
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import (
config_validation as cv,
device_registry as dr,
entity_registry as er,
)
from homeassistant.helpers.entity_component import async_get_entity_suggested_object_id
from homeassistant.helpers.json import json_dumps
_LOGGER = logging.getLogger(__name__)
@@ -349,12 +351,26 @@ def websocket_get_automatic_entity_ids(
if not (entry := registry.entities.get(entity_id)):
automatic_entity_ids[entity_id] = None
continue
new_entity_id = registry.async_regenerate_entity_id(
entry,
try:
suggested = async_get_entity_suggested_object_id(hass, entity_id)
except HomeAssistantError as err:
# This is raised if the entity has no object.
_LOGGER.debug(
"Unable to get suggested object ID for %s, entity ID: %s (%s)",
entry.entity_id,
entity_id,
err,
)
automatic_entity_ids[entity_id] = None
continue
suggested_entity_id = registry.async_generate_entity_id(
entry.domain,
suggested or f"{entry.platform}_{entry.unique_id}",
current_entity_id=entity_id,
reserved_entity_ids=reserved_entity_ids,
)
automatic_entity_ids[entity_id] = new_entity_id
reserved_entity_ids.add(new_entity_id)
automatic_entity_ids[entity_id] = suggested_entity_id
reserved_entity_ids.add(suggested_entity_id)
connection.send_message(
websocket_api.result_message(msg["id"], automatic_entity_ids)

View File

@@ -7,5 +7,6 @@
"integration_type": "service",
"iot_class": "local_push",
"loggers": ["datadog"],
"quality_scale": "legacy",
"requirements": ["datadog==0.52.0"]
}

View File

@@ -84,7 +84,7 @@ class DigitalOceanBinarySensor(BinarySensorEntity):
return self.data.status == "active"
@property
def device_class(self) -> BinarySensorDeviceClass:
def device_class(self):
"""Return the class of this sensor."""
return BinarySensorDeviceClass.MOVING

View File

@@ -3,7 +3,6 @@
from __future__ import annotations
import logging
from typing import Any
from homeassistant.components.notify import ATTR_TARGET, BaseNotificationService
from homeassistant.core import HomeAssistant
@@ -30,7 +29,7 @@ class DovadoSMSNotificationService(BaseNotificationService):
"""Initialize the service."""
self._client = client
def send_message(self, message: str, **kwargs: Any) -> None:
def send_message(self, message, **kwargs):
"""Send SMS to the specified target phone number."""
if not (target := kwargs.get(ATTR_TARGET)):
_LOGGER.error("One target is required")

View File

@@ -5,7 +5,7 @@ from __future__ import annotations
import datetime
import logging
from homeassistant.components.sensor import SensorDeviceClass, SensorEntity
from homeassistant.components.sensor import SensorEntity
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
@@ -96,7 +96,7 @@ class EbusdSensor(SensorEntity):
return None
@property
def device_class(self) -> SensorDeviceClass | None:
def device_class(self):
"""Return the class of this device, from component DEVICE_CLASSES."""
return self._device_class

View File

@@ -75,6 +75,6 @@ class EgardiaBinarySensor(BinarySensorEntity):
return self._state == STATE_ON
@property
def device_class(self) -> BinarySensorDeviceClass | None:
def device_class(self):
"""Return the device class."""
return self._device_class

View File

@@ -8,7 +8,7 @@
"iot_class": "local_polling",
"loggers": ["pyenphase"],
"quality_scale": "platinum",
"requirements": ["pyenphase==2.4.3"],
"requirements": ["pyenphase==2.4.2"],
"zeroconf": [
{
"type": "_enphase-envoy._tcp.local."

View File

@@ -5,10 +5,7 @@ from __future__ import annotations
import datetime
import logging
from homeassistant.components.binary_sensor import (
BinarySensorDeviceClass,
BinarySensorEntity,
)
from homeassistant.components.binary_sensor import BinarySensorEntity
from homeassistant.const import ATTR_LAST_TRIP_TIME
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.dispatcher import async_dispatcher_connect
@@ -105,7 +102,7 @@ class EnvisalinkBinarySensor(EnvisalinkEntity, BinarySensorEntity):
return self._info["status"]["open"]
@property
def device_class(self) -> BinarySensorDeviceClass:
def device_class(self):
"""Return the class of this sensor, from DEVICE_CLASSES."""
return self._zone_type

View File

@@ -5,7 +5,6 @@ from __future__ import annotations
from http import HTTPStatus
import json
import logging
from typing import Any
import requests
import voluptuous as vol
@@ -47,7 +46,7 @@ class FacebookNotificationService(BaseNotificationService):
"""Initialize the service."""
self.page_access_token = access_token
def send_message(self, message: str = "", **kwargs: Any) -> None:
def send_message(self, message="", **kwargs):
"""Send some message."""
payload = {"access_token": self.page_access_token}
targets = kwargs.get(ATTR_TARGET)

View File

@@ -1,17 +0,0 @@
"""Provides conditions for fans."""
from homeassistant.const import STATE_OFF, STATE_ON
from homeassistant.core import HomeAssistant
from homeassistant.helpers.condition import Condition, make_entity_state_condition
from . import DOMAIN
CONDITIONS: dict[str, type[Condition]] = {
"is_off": make_entity_state_condition(DOMAIN, STATE_OFF),
"is_on": make_entity_state_condition(DOMAIN, STATE_ON),
}
async def async_get_conditions(hass: HomeAssistant) -> dict[str, type[Condition]]:
"""Return the fan conditions."""
return CONDITIONS

View File

@@ -1,17 +0,0 @@
.condition_common: &condition_common
target:
entity:
domain: fan
fields:
behavior:
required: true
default: any
selector:
select:
translation_key: condition_behavior
options:
- all
- any
is_off: *condition_common
is_on: *condition_common

View File

@@ -1,12 +1,4 @@
{
"conditions": {
"is_off": {
"condition": "mdi:fan-off"
},
"is_on": {
"condition": "mdi:fan"
}
},
"entity_component": {
"_": {
"default": "mdi:fan",

View File

@@ -1,32 +1,8 @@
{
"common": {
"condition_behavior_description": "How the state should match on the targeted fans.",
"condition_behavior_name": "Behavior",
"trigger_behavior_description": "The behavior of the targeted fans to trigger on.",
"trigger_behavior_name": "Behavior"
},
"conditions": {
"is_off": {
"description": "Tests if one or more fans are off.",
"fields": {
"behavior": {
"description": "[%key:component::fan::common::condition_behavior_description%]",
"name": "[%key:component::fan::common::condition_behavior_name%]"
}
},
"name": "If a fan is off"
},
"is_on": {
"description": "Tests if one or more fans are on.",
"fields": {
"behavior": {
"description": "[%key:component::fan::common::condition_behavior_description%]",
"name": "[%key:component::fan::common::condition_behavior_name%]"
}
},
"name": "If a fan is on"
}
},
"device_automation": {
"action_type": {
"toggle": "[%key:common::device_automation::action_type::toggle%]",
@@ -89,12 +65,6 @@
}
},
"selector": {
"condition_behavior": {
"options": {
"all": "All",
"any": "Any"
}
},
"direction": {
"options": {
"forward": "Forward",

View File

@@ -2,7 +2,6 @@
from __future__ import annotations
import asyncio
from dataclasses import dataclass
from datetime import datetime, timedelta
import logging
@@ -98,30 +97,17 @@ class FireflyDataUpdateCoordinator(DataUpdateCoordinator[FireflyCoordinatorData]
end_date = now
try:
(
accounts,
categories,
primary_currency,
budgets,
bills,
) = await asyncio.gather(
self.firefly.get_accounts(),
self.firefly.get_categories(),
self.firefly.get_currency_primary(),
self.firefly.get_budgets(start=start_date, end=end_date),
self.firefly.get_bills(),
)
category_details = await asyncio.gather(
*(
self.firefly.get_category(
category_id=int(category.id),
start=start_date,
end=end_date,
)
for category in categories
accounts = await self.firefly.get_accounts()
categories = await self.firefly.get_categories()
category_details = [
await self.firefly.get_category(
category_id=int(category.id), start=start_date, end=end_date
)
)
for category in categories
]
primary_currency = await self.firefly.get_currency_primary()
budgets = await self.firefly.get_budgets(start=start_date, end=end_date)
bills = await self.firefly.get_bills()
except FireflyAuthenticationError as err:
raise ConfigEntryAuthFailed(
translation_domain=DOMAIN,

View File

@@ -7,5 +7,5 @@
"integration_type": "service",
"iot_class": "local_polling",
"quality_scale": "bronze",
"requirements": ["pyfirefly==0.1.11"]
"requirements": ["pyfirefly==0.1.10"]
}

View File

@@ -5,7 +5,6 @@ from __future__ import annotations
import asyncio
from http import HTTPStatus
import logging
from typing import Any
import voluptuous as vol
@@ -48,7 +47,7 @@ class FlockNotificationService(BaseNotificationService):
self._url = url
self._session = session
async def async_send_message(self, message: str, **kwargs: Any) -> None:
async def async_send_message(self, message, **kwargs):
"""Send the message to the user."""
payload = {"text": message}

View File

@@ -4,7 +4,6 @@ from __future__ import annotations
from http import HTTPStatus
import logging
from typing import Any
from freesms import FreeClient
import voluptuous as vol
@@ -41,7 +40,7 @@ class FreeSMSNotificationService(BaseNotificationService):
"""Initialize the service."""
self.free_client = FreeClient(username, access_token)
def send_message(self, message: str = "", **kwargs: Any) -> None:
def send_message(self, message="", **kwargs):
"""Send a message to the Free Mobile user cell."""
resp = self.free_client.send_sms(message)

View File

@@ -66,7 +66,6 @@ from .const import (
CONF_COLD_TOLERANCE,
CONF_HEATER,
CONF_HOT_TOLERANCE,
CONF_KEEP_ALIVE,
CONF_MAX_TEMP,
CONF_MIN_DUR,
CONF_MIN_TEMP,
@@ -82,6 +81,7 @@ _LOGGER = logging.getLogger(__name__)
DEFAULT_NAME = "Generic Thermostat"
CONF_INITIAL_HVAC_MODE = "initial_hvac_mode"
CONF_KEEP_ALIVE = "keep_alive"
CONF_PRECISION = "precision"
CONF_TARGET_TEMP = "target_temp"
CONF_TEMP_STEP = "target_temp_step"

View File

@@ -21,7 +21,6 @@ from .const import (
CONF_COLD_TOLERANCE,
CONF_HEATER,
CONF_HOT_TOLERANCE,
CONF_KEEP_ALIVE,
CONF_MAX_TEMP,
CONF_MIN_DUR,
CONF_MIN_TEMP,
@@ -60,9 +59,6 @@ OPTIONS_SCHEMA = {
vol.Optional(CONF_MIN_DUR): selector.DurationSelector(
selector.DurationSelectorConfig(allow_negative=False)
),
vol.Optional(CONF_KEEP_ALIVE): selector.DurationSelector(
selector.DurationSelectorConfig(allow_negative=False)
),
vol.Optional(CONF_MIN_TEMP): selector.NumberSelector(
selector.NumberSelectorConfig(
mode=selector.NumberSelectorMode.BOX, unit_of_measurement=DEGREE, step=0.1

View File

@@ -33,5 +33,4 @@ CONF_PRESETS = {
)
}
CONF_SENSOR = "target_sensor"
CONF_KEEP_ALIVE = "keep_alive"
DEFAULT_TOLERANCE = 0.3

View File

@@ -18,7 +18,6 @@
"cold_tolerance": "Cold tolerance",
"heater": "Actuator switch",
"hot_tolerance": "Hot tolerance",
"keep_alive": "Keep-alive interval",
"max_temp": "Maximum target temperature",
"min_cycle_duration": "Minimum cycle duration",
"min_temp": "Minimum target temperature",
@@ -30,7 +29,6 @@
"cold_tolerance": "Minimum amount of difference between the temperature read by the temperature sensor the target temperature that must change prior to being switched on. For example, if the target temperature is 25 and the tolerance is 0.5 the heater will start when the sensor goes below 24.5.",
"heater": "Switch entity used to cool or heat depending on A/C mode.",
"hot_tolerance": "Minimum amount of difference between the temperature read by the temperature sensor the target temperature that must change prior to being switched off. For example, if the target temperature is 25 and the tolerance is 0.5 the heater will stop when the sensor equals or goes above 25.5.",
"keep_alive": "Trigger the heater periodically to keep devices from losing state. When set, min cycle duration is ignored.",
"min_cycle_duration": "Set a minimum amount of time that the switch specified must be in its current state prior to being switched either off or on.",
"target_sensor": "Temperature sensor that reflects the current temperature."
},
@@ -47,7 +45,6 @@
"cold_tolerance": "[%key:component::generic_thermostat::config::step::user::data::cold_tolerance%]",
"heater": "[%key:component::generic_thermostat::config::step::user::data::heater%]",
"hot_tolerance": "[%key:component::generic_thermostat::config::step::user::data::hot_tolerance%]",
"keep_alive": "[%key:component::generic_thermostat::config::step::user::data::keep_alive%]",
"max_temp": "[%key:component::generic_thermostat::config::step::user::data::max_temp%]",
"min_cycle_duration": "[%key:component::generic_thermostat::config::step::user::data::min_cycle_duration%]",
"min_temp": "[%key:component::generic_thermostat::config::step::user::data::min_temp%]",
@@ -58,7 +55,6 @@
"cold_tolerance": "[%key:component::generic_thermostat::config::step::user::data_description::cold_tolerance%]",
"heater": "[%key:component::generic_thermostat::config::step::user::data_description::heater%]",
"hot_tolerance": "[%key:component::generic_thermostat::config::step::user::data_description::hot_tolerance%]",
"keep_alive": "[%key:component::generic_thermostat::config::step::user::data_description::keep_alive%]",
"min_cycle_duration": "[%key:component::generic_thermostat::config::step::user::data_description::min_cycle_duration%]",
"target_sensor": "[%key:component::generic_thermostat::config::step::user::data_description::target_sensor%]"
}

View File

@@ -57,7 +57,7 @@ class GeniusSwitch(GeniusZone, SwitchEntity):
"""Representation of a Genius Hub switch."""
@property
def device_class(self) -> SwitchDeviceClass:
def device_class(self):
"""Return the class of this device, from component DEVICE_CLASSES."""
return SwitchDeviceClass.OUTLET

View File

@@ -112,7 +112,6 @@ AIR_QUALITY_SENSOR_TYPES: tuple[AirQualitySensorEntityDescription, ...] = (
state_class=SensorStateClass.MEASUREMENT,
device_class=SensorDeviceClass.CO,
native_unit_of_measurement_fn=lambda x: x.pollutants.co.concentration.units,
exists_fn=lambda x: "co" in {p.code for p in x.pollutants},
value_fn=lambda x: x.pollutants.co.concentration.value,
),
AirQualitySensorEntityDescription(
@@ -144,7 +143,6 @@ AIR_QUALITY_SENSOR_TYPES: tuple[AirQualitySensorEntityDescription, ...] = (
translation_key="nitrogen_dioxide",
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement_fn=lambda x: x.pollutants.no2.concentration.units,
exists_fn=lambda x: "no2" in {p.code for p in x.pollutants},
value_fn=lambda x: x.pollutants.no2.concentration.value,
),
AirQualitySensorEntityDescription(
@@ -152,7 +150,6 @@ AIR_QUALITY_SENSOR_TYPES: tuple[AirQualitySensorEntityDescription, ...] = (
translation_key="ozone",
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement_fn=lambda x: x.pollutants.o3.concentration.units,
exists_fn=lambda x: "o3" in {p.code for p in x.pollutants},
value_fn=lambda x: x.pollutants.o3.concentration.value,
),
AirQualitySensorEntityDescription(
@@ -160,7 +157,6 @@ AIR_QUALITY_SENSOR_TYPES: tuple[AirQualitySensorEntityDescription, ...] = (
state_class=SensorStateClass.MEASUREMENT,
device_class=SensorDeviceClass.PM10,
native_unit_of_measurement_fn=lambda x: x.pollutants.pm10.concentration.units,
exists_fn=lambda x: "pm10" in {p.code for p in x.pollutants},
value_fn=lambda x: x.pollutants.pm10.concentration.value,
),
AirQualitySensorEntityDescription(
@@ -168,7 +164,6 @@ AIR_QUALITY_SENSOR_TYPES: tuple[AirQualitySensorEntityDescription, ...] = (
state_class=SensorStateClass.MEASUREMENT,
device_class=SensorDeviceClass.PM25,
native_unit_of_measurement_fn=lambda x: x.pollutants.pm25.concentration.units,
exists_fn=lambda x: "pm25" in {p.code for p in x.pollutants},
value_fn=lambda x: x.pollutants.pm25.concentration.value,
),
AirQualitySensorEntityDescription(
@@ -176,7 +171,6 @@ AIR_QUALITY_SENSOR_TYPES: tuple[AirQualitySensorEntityDescription, ...] = (
translation_key="sulphur_dioxide",
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement_fn=lambda x: x.pollutants.so2.concentration.units,
exists_fn=lambda x: "so2" in {p.code for p in x.pollutants},
value_fn=lambda x: x.pollutants.so2.concentration.value,
),
)

View File

@@ -346,6 +346,7 @@ class SensorGroup(GroupEntity, SensorEntity):
self._attr_name = name
if name == DEFAULT_NAME:
self._attr_name = f"{DEFAULT_NAME} {sensor_type}".capitalize()
self._attr_extra_state_attributes = {ATTR_ENTITY_ID: entity_ids}
self._attr_unique_id = unique_id
self._ignore_non_numeric = ignore_non_numeric
self.mode = all if ignore_non_numeric is False else any
@@ -373,7 +374,7 @@ class SensorGroup(GroupEntity, SensorEntity):
def async_update_group_state(self) -> None:
"""Query all members and determine the sensor group state."""
self.calculate_state_attributes(self._get_valid_entities())
states: list[str | None] = []
states: list[str] = []
valid_units = self._valid_units
valid_states: list[bool] = []
sensor_values: list[tuple[str, float, State]] = []
@@ -434,12 +435,9 @@ class SensorGroup(GroupEntity, SensorEntity):
state.attributes.get("unit_of_measurement"),
self.entity_id,
)
else:
states.append(None)
valid_states.append(False)
# Set group as unavailable if all members are unavailable or missing
self._attr_available = not all(s in (STATE_UNAVAILABLE, None) for s in states)
# Set group as unavailable if all members do not have numeric values
self._attr_available = any(numeric_state for numeric_state in valid_states)
valid_state = self.mode(
state not in (STATE_UNKNOWN, STATE_UNAVAILABLE) for state in states
@@ -448,7 +446,6 @@ class SensorGroup(GroupEntity, SensorEntity):
if not valid_state or not valid_state_numeric:
self._attr_native_value = None
self._extra_state_attribute = {}
return
# Calculate values

View File

@@ -8,7 +8,6 @@ from .coordinator import HDFuryConfigEntry, HDFuryCoordinator
PLATFORMS = [
Platform.BUTTON,
Platform.SELECT,
Platform.SENSOR,
Platform.SWITCH,
]

View File

@@ -16,50 +16,6 @@
"default": "mdi:hdmi-port"
}
},
"sensor": {
"aud0": {
"default": "mdi:audio-input-rca"
},
"aud1": {
"default": "mdi:audio-input-rca"
},
"audout": {
"default": "mdi:television-speaker"
},
"earcrx": {
"default": "mdi:audio-video"
},
"edida0": {
"default": "mdi:format-list-text"
},
"edida1": {
"default": "mdi:format-list-text"
},
"edida2": {
"default": "mdi:format-list-text"
},
"rx0": {
"default": "mdi:video-input-hdmi"
},
"rx1": {
"default": "mdi:video-input-hdmi"
},
"sink0": {
"default": "mdi:television"
},
"sink1": {
"default": "mdi:television"
},
"sink2": {
"default": "mdi:audio-video"
},
"tx0": {
"default": "mdi:cable-data"
},
"tx1": {
"default": "mdi:cable-data"
}
},
"switch": {
"autosw": {
"default": "mdi:import"

View File

@@ -1,121 +0,0 @@
"""Sensor platform for HDFury Integration."""
from homeassistant.components.sensor import SensorEntity, SensorEntityDescription
from homeassistant.const import EntityCategory
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from .coordinator import HDFuryConfigEntry
from .entity import HDFuryEntity
SENSORS: tuple[SensorEntityDescription, ...] = (
SensorEntityDescription(
key="RX0",
translation_key="rx0",
entity_registry_enabled_default=False,
entity_category=EntityCategory.DIAGNOSTIC,
),
SensorEntityDescription(
key="RX1",
translation_key="rx1",
entity_registry_enabled_default=False,
entity_category=EntityCategory.DIAGNOSTIC,
),
SensorEntityDescription(
key="TX0",
translation_key="tx0",
entity_category=EntityCategory.DIAGNOSTIC,
),
SensorEntityDescription(
key="TX1",
translation_key="tx1",
entity_category=EntityCategory.DIAGNOSTIC,
),
SensorEntityDescription(
key="AUD0",
translation_key="aud0",
entity_registry_enabled_default=False,
entity_category=EntityCategory.DIAGNOSTIC,
),
SensorEntityDescription(
key="AUD1",
translation_key="aud1",
entity_registry_enabled_default=False,
entity_category=EntityCategory.DIAGNOSTIC,
),
SensorEntityDescription(
key="AUDOUT",
translation_key="audout",
entity_category=EntityCategory.DIAGNOSTIC,
),
SensorEntityDescription(
key="EARCRX",
translation_key="earcrx",
entity_registry_enabled_default=False,
entity_category=EntityCategory.DIAGNOSTIC,
),
SensorEntityDescription(
key="SINK0",
translation_key="sink0",
entity_registry_enabled_default=False,
entity_category=EntityCategory.DIAGNOSTIC,
),
SensorEntityDescription(
key="SINK1",
translation_key="sink1",
entity_registry_enabled_default=False,
entity_category=EntityCategory.DIAGNOSTIC,
),
SensorEntityDescription(
key="SINK2",
translation_key="sink2",
entity_registry_enabled_default=False,
entity_category=EntityCategory.DIAGNOSTIC,
),
SensorEntityDescription(
key="EDIDA0",
translation_key="edida0",
entity_registry_enabled_default=False,
entity_category=EntityCategory.DIAGNOSTIC,
),
SensorEntityDescription(
key="EDIDA1",
translation_key="edida1",
entity_registry_enabled_default=False,
entity_category=EntityCategory.DIAGNOSTIC,
),
SensorEntityDescription(
key="EDIDA2",
translation_key="edida2",
entity_registry_enabled_default=False,
entity_category=EntityCategory.DIAGNOSTIC,
),
)
async def async_setup_entry(
hass: HomeAssistant,
entry: HDFuryConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up sensors using the platform schema."""
coordinator = entry.runtime_data
async_add_entities(
HDFurySensor(coordinator, description)
for description in SENSORS
if description.key in coordinator.data.info
)
class HDFurySensor(HDFuryEntity, SensorEntity):
"""Base HDFury Sensor Class."""
entity_description: SensorEntityDescription
@property
def native_value(self) -> str:
"""Set Sensor Value."""
return self.coordinator.data.info[self.entity_description.key]

View File

@@ -57,50 +57,6 @@
}
}
},
"sensor": {
"aud0": {
"name": "Audio TX0"
},
"aud1": {
"name": "Audio TX1"
},
"audout": {
"name": "Audio output"
},
"earcrx": {
"name": "eARC/ARC status"
},
"edida0": {
"name": "EDID TXA0"
},
"edida1": {
"name": "EDID TXA1"
},
"edida2": {
"name": "EDID AUDA"
},
"rx0": {
"name": "Input RX0"
},
"rx1": {
"name": "Input RX1"
},
"sink0": {
"name": "EDID TX0"
},
"sink1": {
"name": "EDID TX1"
},
"sink2": {
"name": "EDID AUD"
},
"tx0": {
"name": "Output TX0"
},
"tx1": {
"name": "Output TX1"
}
},
"switch": {
"autosw": {
"name": "Auto switch inputs"

View File

@@ -191,11 +191,7 @@ class HikvisionBinarySensor(BinarySensorEntity):
self._attr_device_info = DeviceInfo(
identifiers={(DOMAIN, f"{self._data.device_id}_{channel}")},
via_device=(DOMAIN, self._data.device_id),
translation_key="nvr_channel",
translation_placeholders={
"device_name": self._data.device_name,
"channel_number": str(channel),
},
name=f"{self._data.device_name} Channel {channel}",
manufacturer="Hikvision",
model="NVR Channel",
)

View File

@@ -62,11 +62,7 @@ class HikvisionCamera(Camera):
self._attr_device_info = DeviceInfo(
identifiers={(DOMAIN, f"{self._data.device_id}_{channel}")},
via_device=(DOMAIN, self._data.device_id),
translation_key="nvr_channel",
translation_placeholders={
"device_name": self._data.device_name,
"channel_number": str(channel),
},
name=f"{self._data.device_name} Channel {channel}",
manufacturer="Hikvision",
model="NVR Channel",
)

View File

@@ -29,11 +29,6 @@
}
}
},
"device": {
"nvr_channel": {
"name": "{device_name} channel {channel_number}"
}
},
"issues": {
"deprecated_yaml_import_issue": {
"description": "Configuring {integration_title} using YAML is deprecated and the import failed. Please remove the `{domain}` entry from your `configuration.yaml` file and set up the integration manually.",

View File

@@ -220,33 +220,31 @@ def get_accessory( # noqa: C901
a_type = "TemperatureSensor"
elif device_class == SensorDeviceClass.HUMIDITY and unit == PERCENTAGE:
a_type = "HumiditySensor"
elif device_class == SensorDeviceClass.PM10:
elif (
device_class == SensorDeviceClass.PM10
or SensorDeviceClass.PM10 in state.entity_id
):
a_type = "PM10Sensor"
elif device_class == SensorDeviceClass.PM25:
elif (
device_class == SensorDeviceClass.PM25
or SensorDeviceClass.PM25 in state.entity_id
):
a_type = "PM25Sensor"
elif device_class == SensorDeviceClass.NITROGEN_DIOXIDE:
a_type = "NitrogenDioxideSensor"
elif device_class == SensorDeviceClass.VOLATILE_ORGANIC_COMPOUNDS:
a_type = "VolatileOrganicCompoundsSensor"
elif device_class == SensorDeviceClass.GAS:
elif (
device_class == SensorDeviceClass.GAS
or SensorDeviceClass.GAS in state.entity_id
):
a_type = "AirQualitySensor"
elif device_class == SensorDeviceClass.CO:
a_type = "CarbonMonoxideSensor"
elif device_class == SensorDeviceClass.CO2:
elif device_class == SensorDeviceClass.CO2 or "co2" in state.entity_id:
a_type = "CarbonDioxideSensor"
elif device_class == SensorDeviceClass.ILLUMINANCE or unit == LIGHT_LUX:
a_type = "LightSensor"
# Fallbacks based on entity_id
elif SensorDeviceClass.PM10 in state.entity_id:
a_type = "PM10Sensor"
elif SensorDeviceClass.PM25 in state.entity_id:
a_type = "PM25Sensor"
elif SensorDeviceClass.GAS in state.entity_id:
a_type = "AirQualitySensor"
elif "co2" in state.entity_id:
a_type = "CarbonDioxideSensor"
else:
_LOGGER.debug(
"%s: Unsupported sensor type (device_class=%s) (unit=%s)",

View File

@@ -66,7 +66,7 @@ class HMBinarySensor(HMDevice, BinarySensorEntity):
return bool(self._hm_get_state())
@property
def device_class(self) -> BinarySensorDeviceClass | None:
def device_class(self):
"""Return the class of this sensor from DEVICE_CLASSES."""
# If state is MOTION (Only RemoteMotion working)
if self._state == "MOTION":

View File

@@ -2,8 +2,6 @@
from __future__ import annotations
from typing import Any
import voluptuous as vol
from homeassistant.components.notify import (
@@ -62,7 +60,7 @@ class HomematicNotificationService(BaseNotificationService):
self.hass = hass
self.data = data
def send_message(self, message: str = "", **kwargs: Any) -> None:
def send_message(self, message="", **kwargs):
"""Send a notification to the device."""
data = {**self.data, **kwargs.get(ATTR_DATA, {})}

View File

@@ -1,41 +0,0 @@
"""The Homevolt integration."""
from __future__ import annotations
from homevolt import Homevolt
from homeassistant.const import CONF_HOST, CONF_PASSWORD, Platform
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from .coordinator import HomevoltConfigEntry, HomevoltDataUpdateCoordinator
PLATFORMS: list[Platform] = [
Platform.NUMBER,
Platform.SELECT,
Platform.SENSOR,
Platform.SWITCH,
]
async def async_setup_entry(hass: HomeAssistant, entry: HomevoltConfigEntry) -> bool:
"""Set up Homevolt from a config entry."""
host: str = entry.data[CONF_HOST]
password: str | None = entry.data.get(CONF_PASSWORD)
websession = async_get_clientsession(hass)
client = Homevolt(host, password, websession=websession)
coordinator = HomevoltDataUpdateCoordinator(hass, entry, client)
await coordinator.async_config_entry_first_refresh()
entry.runtime_data = coordinator
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
return True
async def async_unload_entry(hass: HomeAssistant, entry: HomevoltConfigEntry) -> bool:
"""Unload a config entry."""
return await hass.config_entries.async_unload_platforms(entry, PLATFORMS)

View File

@@ -1,152 +0,0 @@
"""Config flow for the Homevolt integration."""
from __future__ import annotations
import logging
from typing import Any
from homevolt import Homevolt, HomevoltAuthenticationError, HomevoltConnectionError
import voluptuous as vol
from homeassistant.config_entries import ConfigFlow, ConfigFlowResult
from homeassistant.const import CONF_HOST, CONF_PASSWORD
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.service_info.zeroconf import ZeroconfServiceInfo
from .const import DOMAIN
_LOGGER = logging.getLogger(__name__)
STEP_USER_DATA_SCHEMA = vol.Schema(
{
vol.Required(CONF_HOST): str,
vol.Optional(CONF_PASSWORD): str,
}
)
class HomevoltConfigFlow(ConfigFlow, domain=DOMAIN):
"""Handle a config flow for Homevolt."""
VERSION = 1
MINOR_VERSION = 1
_host: str
_device_id: str
async def async_step_user(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle the initial step."""
errors: dict[str, str] = {}
if user_input is not None:
host = user_input[CONF_HOST]
password = user_input.get(CONF_PASSWORD)
websession = async_get_clientsession(self.hass)
client = Homevolt(host, password, websession=websession)
try:
await client.update_info()
device = client.get_device()
device_id = device.device_id
except HomevoltAuthenticationError:
errors["base"] = "invalid_auth"
except HomevoltConnectionError:
errors["base"] = "cannot_connect"
except Exception:
_LOGGER.exception(
"Error occurred while connecting to the Homevolt battery"
)
errors["base"] = "unknown"
else:
await self.async_set_unique_id(device_id)
self._abort_if_unique_id_configured()
return self.async_create_entry(
title="Homevolt Local",
data={
CONF_HOST: host,
CONF_PASSWORD: password,
},
)
return self.async_show_form(
step_id="user", data_schema=STEP_USER_DATA_SCHEMA, errors=errors
)
async def async_step_zeroconf(
self, discovery_info: ZeroconfServiceInfo
) -> ConfigFlowResult:
"""Handle zeroconf discovery."""
_LOGGER.debug("Zeroconf discovery: %s", discovery_info)
self._host = str(discovery_info.ip_address)
websession = async_get_clientsession(self.hass)
client = Homevolt(self._host, websession=websession)
try:
await client.update_info()
device = client.get_device()
self._device_id = device.device_id
except HomevoltConnectionError:
return self.async_abort(reason="cannot_connect")
except HomevoltAuthenticationError:
# Device requires authentication - proceed to discovery confirm
# where user can enter password
self._device_id = discovery_info.hostname.removesuffix(".local.")
except Exception:
_LOGGER.exception("Error occurred while connecting to the Homevolt battery")
return self.async_abort(reason="cannot_connect")
await self.async_set_unique_id(self._device_id)
self._abort_if_unique_id_configured(updates={CONF_HOST: self._host})
self.context.update(
{
"title_placeholders": {
"name": "Homevolt",
}
}
)
return await self.async_step_discovery_confirm()
async def async_step_discovery_confirm(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Confirm discovery and optionally get password."""
errors: dict[str, str] = {}
if user_input is not None:
password = user_input.get(CONF_PASSWORD)
websession = async_get_clientsession(self.hass)
client = Homevolt(self._host, password, websession=websession)
try:
await client.update_info()
device = client.get_device()
self._device_id = device.device_id
except HomevoltAuthenticationError:
errors["base"] = "invalid_auth"
except HomevoltConnectionError:
errors["base"] = "cannot_connect"
except Exception:
_LOGGER.exception(
"Error occurred while connecting to the Homevolt battery"
)
errors["base"] = "unknown"
else:
await self.async_set_unique_id(self._device_id)
self._abort_if_unique_id_configured()
return self.async_create_entry(
title="Homevolt",
data={
CONF_HOST: self._host,
CONF_PASSWORD: password,
},
)
return self.async_show_form(
step_id="discovery_confirm",
data_schema=vol.Schema({vol.Optional(CONF_PASSWORD): str}),
errors=errors,
description_placeholders={
"host": self._host,
},
)

View File

@@ -1,9 +0,0 @@
"""Constants for the Homevolt integration."""
from __future__ import annotations
from datetime import timedelta
DOMAIN = "homevolt"
MANUFACTURER = "Homevolt"
SCAN_INTERVAL = timedelta(seconds=15)

View File

@@ -1,56 +0,0 @@
"""Data update coordinator for Homevolt integration."""
from __future__ import annotations
import logging
from homevolt import (
Device,
Homevolt,
HomevoltAuthenticationError,
HomevoltConnectionError,
HomevoltError,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryAuthFailed
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import DOMAIN, SCAN_INTERVAL
type HomevoltConfigEntry = ConfigEntry[HomevoltDataUpdateCoordinator]
_LOGGER = logging.getLogger(__name__)
class HomevoltDataUpdateCoordinator(DataUpdateCoordinator[Device]):
"""Class to manage fetching Homevolt data."""
config_entry: HomevoltConfigEntry
def __init__(
self,
hass: HomeAssistant,
entry: HomevoltConfigEntry,
client: Homevolt,
) -> None:
"""Initialize the Homevolt coordinator."""
self.client = client
super().__init__(
hass,
_LOGGER,
name=DOMAIN,
update_interval=SCAN_INTERVAL,
config_entry=entry,
)
async def _async_update_data(self) -> Device:
"""Fetch data from the Homevolt API."""
try:
await self.client.update_info()
return self.client.get_device()
except HomevoltAuthenticationError as err:
raise ConfigEntryAuthFailed from err
except (HomevoltConnectionError, HomevoltError) as err:
raise UpdateFailed(f"Error communicating with device: {err}") from err

View File

@@ -1,18 +0,0 @@
{
"domain": "homevolt",
"name": "Homevolt",
"codeowners": ["@danielhiversen"],
"config_flow": true,
"dependencies": [],
"documentation": "https://www.home-assistant.io/integrations/homevolt",
"integration_type": "device",
"iot_class": "local_polling",
"quality_scale": "bronze",
"requirements": ["homevolt==0.2.4"],
"zeroconf": [
{
"name": "homevolt*",
"type": "_http._tcp.local."
}
]
}

View File

@@ -1,70 +0,0 @@
rules:
# Bronze
action-setup:
status: exempt
comment: Integration does not register custom actions.
appropriate-polling: done
brands: done
common-modules: done
config-flow-test-coverage: done
config-flow: done
dependency-transparency: done
docs-actions:
status: exempt
comment: Integration does not register custom actions.
docs-high-level-description: done
docs-installation-instructions: done
docs-removal-instructions: done
entity-event-setup:
status: exempt
comment: Local_polling without events
entity-unique-id: done
has-entity-name: done
runtime-data: done
test-before-configure: done
test-before-setup: done
unique-config-entry: done
# Silver
action-exceptions:
status: exempt
comment: Integration does not register custom actions.
config-entry-unloading: done
docs-configuration-parameters:
status: exempt
comment: Integration does not have an options flow.
docs-installation-parameters: todo
entity-unavailable: done
integration-owner: done
log-when-unavailable: todo
parallel-updates: done
reauthentication-flow: todo
test-coverage: todo
# Gold
devices: done
diagnostics: todo
discovery-update-info: todo
discovery: todo
docs-data-update: todo
docs-examples: todo
docs-known-limitations: todo
docs-supported-devices: todo
docs-supported-functions: todo
docs-troubleshooting: todo
docs-use-cases: todo
dynamic-devices: todo
entity-category: todo
entity-device-class: done
entity-disabled-by-default: todo
entity-translations: done
exception-translations: todo
icon-translations: todo
reconfiguration-flow: todo
repair-issues: todo
stale-devices: todo
# Platinum
async-dependency: done
inject-websession: done
strict-typing: todo

View File

@@ -1,148 +0,0 @@
"""Support for Homevolt sensors."""
from __future__ import annotations
from homevolt.models import SensorType
from homeassistant.components.sensor import (
SensorDeviceClass,
SensorEntity,
SensorEntityDescription,
SensorStateClass,
)
from homeassistant.const import (
PERCENTAGE,
SIGNAL_STRENGTH_DECIBELS,
UnitOfElectricCurrent,
UnitOfElectricPotential,
UnitOfEnergy,
UnitOfFrequency,
UnitOfPower,
UnitOfTemperature,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddConfigEntryEntitiesCallback
from homeassistant.helpers.typing import StateType
from .coordinator import HomevoltConfigEntry, HomevoltDataUpdateCoordinator
from .entity import HomevoltEntity
PARALLEL_UPDATES = 0 # Coordinator-based updates
SENSORS: tuple[SensorEntityDescription, ...] = (
SensorEntityDescription(
key=SensorType.COUNT,
state_class=SensorStateClass.TOTAL_INCREASING,
),
SensorEntityDescription(
key=SensorType.ENERGY_TOTAL,
device_class=SensorDeviceClass.ENERGY,
state_class=SensorStateClass.TOTAL,
native_unit_of_measurement=UnitOfEnergy.WATT_HOUR,
),
SensorEntityDescription(
key=SensorType.ENERGY_INCREASING,
device_class=SensorDeviceClass.ENERGY,
state_class=SensorStateClass.TOTAL_INCREASING,
native_unit_of_measurement=UnitOfEnergy.WATT_HOUR,
),
SensorEntityDescription(
key=SensorType.FREQUENCY,
device_class=SensorDeviceClass.FREQUENCY,
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=UnitOfFrequency.HERTZ,
),
SensorEntityDescription(
key=SensorType.PERCENTAGE,
device_class=SensorDeviceClass.BATTERY,
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=PERCENTAGE,
),
SensorEntityDescription(
key=SensorType.POWER,
device_class=SensorDeviceClass.POWER,
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=UnitOfPower.WATT,
),
SensorEntityDescription(
key=SensorType.SCHEDULE_TYPE,
),
SensorEntityDescription(
key=SensorType.SIGNAL_STRENGTH,
device_class=SensorDeviceClass.SIGNAL_STRENGTH,
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=SIGNAL_STRENGTH_DECIBELS,
),
SensorEntityDescription(
key=SensorType.TEMPERATURE,
device_class=SensorDeviceClass.TEMPERATURE,
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=UnitOfTemperature.CELSIUS,
),
SensorEntityDescription(
key=SensorType.TEXT,
),
SensorEntityDescription(
key=SensorType.VOLTAGE,
device_class=SensorDeviceClass.VOLTAGE,
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=UnitOfElectricPotential.VOLT,
),
SensorEntityDescription(
key=SensorType.CURRENT,
device_class=SensorDeviceClass.CURRENT,
state_class=SensorStateClass.MEASUREMENT,
native_unit_of_measurement=UnitOfElectricCurrent.AMPERE,
),
)
async def async_setup_entry(
hass: HomeAssistant,
entry: HomevoltConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
) -> None:
"""Set up the Homevolt sensor."""
coordinator = entry.runtime_data
entities = []
sensors_by_key = {sensor.key: sensor for sensor in SENSORS}
for sensor_key, sensor in coordinator.data.sensors.items():
if (description := sensors_by_key.get(sensor.type)) is None:
continue
entities.append(
HomevoltSensor(
description,
coordinator,
sensor_key,
)
)
async_add_entities(entities)
class HomevoltSensor(HomevoltEntity, SensorEntity):
"""Representation of a Homevolt sensor."""
def __init__(
self,
description: SensorEntityDescription,
coordinator: HomevoltDataUpdateCoordinator,
sensor_key: str,
) -> None:
"""Initialize the sensor."""
self.entity_description = description
device_id = coordinator.data.device_id
self._attr_unique_id = f"{device_id}_{sensor_key}"
sensor_data = coordinator.data.sensors[sensor_key]
self._attr_translation_key = sensor_data.slug
self._sensor_key = sensor_key
super().__init__(coordinator, sensor_data.device_identifier)
@property
def available(self) -> bool:
"""Return if entity is available."""
return super().available and self._sensor_key in self.coordinator.data.sensors
@property
def native_value(self) -> StateType:
"""Return the native value of the sensor."""
return self.coordinator.data.sensors[self._sensor_key].value

View File

@@ -1,236 +0,0 @@
{
"config": {
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_device%]",
"cannot_connect": "Failed to connect"
},
"error": {
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
"invalid_auth": "[%key:common::config_flow::error::invalid_auth%]",
"unknown": "[%key:common::config_flow::error::unknown%]"
},
"flow_title": "{name}",
"step": {
"discovery_confirm": {
"data": {
"password": "[%key:common::config_flow::data::password%]"
},
"data_description": {
"password": "The local password configured for your Homevolt battery, if required."
},
"description": "A Homevolt battery was discovered at {host}. Enter the password if required."
},
"user": {
"data": {
"host": "[%key:common::config_flow::data::host%]",
"password": "[%key:common::config_flow::data::password%]"
},
"data_description": {
"host": "The IP address or hostname of your Homevolt battery on your local network.",
"password": "The local password configured for your Homevolt battery, if required."
},
"description": "Connect Home Assistant to your Homevolt battery over the local network.",
"title": "Homevolt Local"
}
}
},
"entity": {
"number": {
"battery_max_charge_power": {
"name": "Battery max charge power"
},
"battery_max_discharge_power": {
"name": "Battery max discharge power"
},
"battery_max_soc": {
"name": "Battery maximum state of charge"
},
"battery_min_soc": {
"name": "Battery minimum state of charge"
},
"battery_power_setpoint": {
"name": "Battery power setpoint"
}
},
"select": {
"battery_control_mode": {
"name": "Battery control mode"
}
},
"sensor": {
"available_charging_energy": {
"name": "Available charging energy"
},
"available_charging_power": {
"name": "Available charging power"
},
"available_discharge_energy": {
"name": "Available discharge energy"
},
"available_discharge_power": {
"name": "Available discharge power"
},
"average_rssi_grid": {
"name": "Grid average RSSI"
},
"average_rssi_load": {
"name": "Load average RSSI"
},
"battery_state_of_charge": {
"name": "Battery state of charge"
},
"charge_cycles": {
"name": "Charge cycles"
},
"energy_exported_grid": {
"name": "Grid exported energy"
},
"energy_exported_load": {
"name": "Load exported energy"
},
"energy_imported_grid": {
"name": "Grid imported energy"
},
"energy_imported_load": {
"name": "Load imported energy"
},
"exported_energy": {
"name": "Exported energy"
},
"frequency": {
"name": "Frequency"
},
"imported_energy": {
"name": "Imported energy"
},
"l1_current": {
"name": "L1 current"
},
"l1_current_grid": {
"name": "Grid L1 current"
},
"l1_current_load": {
"name": "Load L1 current"
},
"l1_l2_voltage": {
"name": "L1-L2 voltage"
},
"l1_power_grid": {
"name": "Grid L1 power"
},
"l1_power_load": {
"name": "Load L1 power"
},
"l1_voltage": {
"name": "L1 voltage"
},
"l1_voltage_grid": {
"name": "Grid L1 voltage"
},
"l1_voltage_load": {
"name": "Load L1 voltage"
},
"l2_current": {
"name": "L2 current"
},
"l2_current_grid": {
"name": "Grid L2 current"
},
"l2_current_load": {
"name": "Load L2 current"
},
"l2_l3_voltage": {
"name": "L2-L3 voltage"
},
"l2_power_grid": {
"name": "Grid L2 power"
},
"l2_power_load": {
"name": "Load L2 power"
},
"l2_voltage": {
"name": "L2 voltage"
},
"l2_voltage_grid": {
"name": "Grid L2 voltage"
},
"l2_voltage_load": {
"name": "Load L2 voltage"
},
"l3_current": {
"name": "L3 current"
},
"l3_current_grid": {
"name": "Grid L3 current"
},
"l3_current_load": {
"name": "Load L3 current"
},
"l3_l1_voltage": {
"name": "L3-L1 voltage"
},
"l3_power_grid": {
"name": "Grid L3 power"
},
"l3_power_load": {
"name": "Load L3 power"
},
"l3_voltage": {
"name": "L3 voltage"
},
"l3_voltage_grid": {
"name": "Grid L3 voltage"
},
"l3_voltage_load": {
"name": "Load L3 voltage"
},
"power": {
"name": "Power"
},
"power_grid": {
"name": "Grid power"
},
"power_load": {
"name": "Load power"
},
"rssi_grid": {
"name": "Grid RSSI"
},
"rssi_load": {
"name": "Load RSSI"
},
"schedule_id": {
"name": "Schedule ID"
},
"schedule_max_discharge": {
"name": "Schedule max discharge"
},
"schedule_max_power": {
"name": "Schedule max power"
},
"schedule_power_setpoint": {
"name": "Schedule power setpoint"
},
"schedule_type": {
"name": "Schedule type"
},
"state_of_charge": {
"name": "State of charge"
},
"system_temperature": {
"name": "System temperature"
},
"tmax": {
"name": "Maximum temperature"
},
"tmin": {
"name": "Minimum temperature"
}
},
"switch": {
"local_mode": {
"name": "Local mode"
}
}
}
}

View File

@@ -9,7 +9,6 @@ from http import HTTPStatus
import json
import logging
import time
from typing import Any
from urllib.parse import urlparse
import uuid
@@ -452,7 +451,7 @@ class HTML5NotificationService(BaseNotificationService):
"""
await self.hass.async_add_executor_job(partial(self.dismiss, **kwargs))
def send_message(self, message: str = "", **kwargs: Any) -> None:
def send_message(self, message="", **kwargs):
"""Send a message to a user."""
tag = str(uuid.uuid4())
payload = {

View File

@@ -3,7 +3,6 @@
from __future__ import annotations
import logging
from typing import Any
from pyjoin import get_devices, send_notification
import voluptuous as vol
@@ -67,7 +66,7 @@ class JoinNotificationService(BaseNotificationService):
self._device_ids = device_ids
self._device_names = device_names
def send_message(self, message: str = "", **kwargs: Any) -> None:
def send_message(self, message="", **kwargs):
"""Send a message to a user."""
title = kwargs.get(ATTR_TITLE, ATTR_TITLE_DEFAULT)
data = kwargs.get(ATTR_DATA) or {}

View File

@@ -2,8 +2,6 @@
from __future__ import annotations
from typing import Any
from homeassistant.components.notify import ATTR_DATA, BaseNotificationService
from homeassistant.core import HomeAssistant
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
@@ -29,7 +27,7 @@ class KebaNotificationService(BaseNotificationService):
"""Initialize the service."""
self._client = client
async def async_send_message(self, message: str = "", **kwargs: Any) -> None:
async def async_send_message(self, message="", **kwargs):
"""Send the message."""
text = message.replace(" ", "$") # Will be translated back by the display

View File

@@ -116,6 +116,7 @@ class KnxExposeOptions:
dpt: type[DPTBase]
respond_to_read: bool
cooldown: float
periodic_send: float
default: Any | None
value_template: Template | None
@@ -130,12 +131,17 @@ def _yaml_config_to_expose_options(config: ConfigType) -> KnxExposeOptions:
else:
dpt = DPTBase.parse_transcoder(config[ExposeSchema.CONF_KNX_EXPOSE_TYPE]) # type: ignore[assignment] # checked by schema validation
ga = parse_device_group_address(config[KNX_ADDRESS])
cooldown_seconds = config[ExposeSchema.CONF_KNX_EXPOSE_COOLDOWN].total_seconds()
periodic_send_seconds = config[
ExposeSchema.CONF_KNX_EXPOSE_PERIODIC_SEND
].total_seconds()
return KnxExposeOptions(
attribute=config.get(ExposeSchema.CONF_KNX_EXPOSE_ATTRIBUTE),
group_address=ga,
dpt=dpt,
respond_to_read=config[CONF_RESPOND_TO_READ],
cooldown=config[ExposeSchema.CONF_KNX_EXPOSE_COOLDOWN],
cooldown=cooldown_seconds,
periodic_send=periodic_send_seconds,
default=config.get(ExposeSchema.CONF_KNX_EXPOSE_DEFAULT),
value_template=config.get(CONF_VALUE_TEMPLATE),
)
@@ -167,6 +173,7 @@ class KnxExposeEntity:
respond_to_read=option.respond_to_read,
value_type=option.dpt,
cooldown=option.cooldown,
periodic_send=option.periodic_send,
),
)
for option in options

View File

@@ -4,6 +4,7 @@ from __future__ import annotations
from abc import ABC
from collections import OrderedDict
from datetime import timedelta
import math
from typing import ClassVar, Final
@@ -538,6 +539,7 @@ class ExposeSchema(KNXPlatformSchema):
CONF_KNX_EXPOSE_ATTRIBUTE = "attribute"
CONF_KNX_EXPOSE_BINARY = "binary"
CONF_KNX_EXPOSE_COOLDOWN = "cooldown"
CONF_KNX_EXPOSE_PERIODIC_SEND = "periodic_send"
CONF_KNX_EXPOSE_DEFAULT = "default"
CONF_TIME = "time"
CONF_DATE = "date"
@@ -554,7 +556,12 @@ class ExposeSchema(KNXPlatformSchema):
)
EXPOSE_SENSOR_SCHEMA = vol.Schema(
{
vol.Optional(CONF_KNX_EXPOSE_COOLDOWN, default=0): cv.positive_float,
vol.Optional(
CONF_KNX_EXPOSE_COOLDOWN, default=timedelta(0)
): cv.positive_time_period,
vol.Optional(
CONF_KNX_EXPOSE_PERIODIC_SEND, default=timedelta(0)
): cv.positive_time_period,
vol.Optional(CONF_RESPOND_TO_READ, default=True): cv.boolean,
vol.Required(CONF_KNX_EXPOSE_TYPE): vol.Any(
CONF_KNX_EXPOSE_BINARY, sensor_type_validator

View File

@@ -3,7 +3,6 @@
from __future__ import annotations
import logging
from typing import Any
import aiohttp
import jsonrpc_async
@@ -94,7 +93,7 @@ class KodiNotificationService(BaseNotificationService):
self._server = jsonrpc_async.Server(self._url, **kwargs)
async def async_send_message(self, message: str = "", **kwargs: Any) -> None:
async def async_send_message(self, message="", **kwargs):
"""Send a message to Kodi."""
try:
data = kwargs.get(ATTR_DATA) or {}

View File

@@ -4,7 +4,6 @@ from __future__ import annotations
import logging
import socket
from typing import Any
from urllib.parse import urlencode
import voluptuous as vol
@@ -74,7 +73,7 @@ class LannouncerNotificationService(BaseNotificationService):
self._host = host
self._port = port
def send_message(self, message: str = "", **kwargs: Any) -> None:
def send_message(self, message="", **kwargs):
"""Send a message to Lannouncer."""
data = kwargs.get(ATTR_DATA)
if data is not None and ATTR_METHOD in data:

View File

@@ -1,14 +1,127 @@
"""Provides conditions for lights."""
from homeassistant.const import STATE_OFF, STATE_ON
from homeassistant.core import HomeAssistant
from homeassistant.helpers.condition import Condition, make_entity_state_condition
from collections.abc import Callable
from typing import TYPE_CHECKING, Any, Final, Unpack, override
import voluptuous as vol
from homeassistant.const import CONF_OPTIONS, CONF_TARGET, STATE_OFF, STATE_ON
from homeassistant.core import HomeAssistant, split_entity_id
from homeassistant.helpers import config_validation as cv, target
from homeassistant.helpers.condition import (
Condition,
ConditionChecker,
ConditionCheckParams,
ConditionConfig,
)
from homeassistant.helpers.typing import ConfigType
from .const import DOMAIN
ATTR_BEHAVIOR: Final = "behavior"
BEHAVIOR_ANY: Final = "any"
BEHAVIOR_ALL: Final = "all"
STATE_CONDITION_VALID_STATES: Final = [STATE_ON, STATE_OFF]
STATE_CONDITION_OPTIONS_SCHEMA: dict[vol.Marker, Any] = {
vol.Required(ATTR_BEHAVIOR, default=BEHAVIOR_ANY): vol.In(
[BEHAVIOR_ANY, BEHAVIOR_ALL]
),
}
STATE_CONDITION_SCHEMA = vol.Schema(
{
vol.Required(CONF_TARGET): cv.TARGET_FIELDS,
vol.Required(CONF_OPTIONS): STATE_CONDITION_OPTIONS_SCHEMA,
}
)
class StateConditionBase(Condition):
"""State condition."""
@override
@classmethod
async def async_validate_config(
cls, hass: HomeAssistant, config: ConfigType
) -> ConfigType:
"""Validate config."""
return STATE_CONDITION_SCHEMA(config) # type: ignore[no-any-return]
def __init__(
self, hass: HomeAssistant, config: ConditionConfig, state: str
) -> None:
"""Initialize condition."""
super().__init__(hass, config)
if TYPE_CHECKING:
assert config.target
assert config.options
self._target = config.target
self._behavior = config.options[ATTR_BEHAVIOR]
self._state = state
@override
async def async_get_checker(self) -> ConditionChecker:
"""Get the condition checker."""
def check_any_match_state(states: list[str]) -> bool:
"""Test if any entity match the state."""
return any(state == self._state for state in states)
def check_all_match_state(states: list[str]) -> bool:
"""Test if all entities match the state."""
return all(state == self._state for state in states)
matcher: Callable[[list[str]], bool]
if self._behavior == BEHAVIOR_ANY:
matcher = check_any_match_state
elif self._behavior == BEHAVIOR_ALL:
matcher = check_all_match_state
def test_state(**kwargs: Unpack[ConditionCheckParams]) -> bool:
"""Test state condition."""
target_selection = target.TargetSelection(self._target)
targeted_entities = target.async_extract_referenced_entity_ids(
self._hass, target_selection, expand_group=False
)
referenced_entity_ids = targeted_entities.referenced.union(
targeted_entities.indirectly_referenced
)
light_entity_ids = {
entity_id
for entity_id in referenced_entity_ids
if split_entity_id(entity_id)[0] == DOMAIN
}
light_entity_states = [
state.state
for entity_id in light_entity_ids
if (state := self._hass.states.get(entity_id))
and state.state in STATE_CONDITION_VALID_STATES
]
return matcher(light_entity_states)
return test_state
class IsOnCondition(StateConditionBase):
"""Is on condition."""
def __init__(self, hass: HomeAssistant, config: ConditionConfig) -> None:
"""Initialize condition."""
super().__init__(hass, config, STATE_ON)
class IsOffCondition(StateConditionBase):
"""Is off condition."""
def __init__(self, hass: HomeAssistant, config: ConditionConfig) -> None:
"""Initialize condition."""
super().__init__(hass, config, STATE_OFF)
CONDITIONS: dict[str, type[Condition]] = {
"is_off": make_entity_state_condition(DOMAIN, STATE_OFF),
"is_on": make_entity_state_condition(DOMAIN, STATE_ON),
"is_off": IsOffCondition,
"is_on": IsOnCondition,
}

View File

@@ -4,7 +4,6 @@ from __future__ import annotations
from http import HTTPStatus
import logging
from typing import Any
import requests
import voluptuous as vol
@@ -57,7 +56,7 @@ class AutomateNotificationService(BaseNotificationService):
self._recipient = recipient
self._device = device
def send_message(self, message: str = "", **kwargs: Any) -> None:
def send_message(self, message="", **kwargs):
"""Send a message to a user."""
# Extract params from data dict

View File

@@ -7,6 +7,7 @@
"integration_type": "service",
"iot_class": "cloud_polling",
"loggers": ["london_tube_status"],
"quality_scale": "legacy",
"requirements": ["london-tube-status==0.5"],
"single_config_entry": true
}

View File

@@ -3,7 +3,6 @@
from __future__ import annotations
import logging
from typing import Any
from pymailgunner import (
Client,
@@ -92,7 +91,7 @@ class MailgunNotificationService(BaseNotificationService):
return False
return True
def send_message(self, message: str = "", **kwargs: Any) -> None:
def send_message(self, message="", **kwargs):
"""Send a mail to the recipient."""
subject = kwargs.get(ATTR_TITLE, ATTR_TITLE_DEFAULT)

View File

@@ -528,10 +528,7 @@ DISCOVERY_SCHEMAS = [
),
),
entity_class=MatterBinarySensor,
required_attributes=(
clusters.Thermostat.Attributes.RemoteSensing,
clusters.Thermostat.Attributes.OutdoorTemperature,
),
required_attributes=(clusters.Thermostat.Attributes.RemoteSensing,),
allow_multi=True,
),
MatterDiscoverySchema(

View File

@@ -642,7 +642,6 @@ DISCOVERY_SCHEMAS = [
list_attribute=clusters.DoorLock.Attributes.SupportedOperatingModes,
device_to_ha=DOOR_LOCK_OPERATING_MODE_MAP.get,
ha_to_device=DOOR_LOCK_OPERATING_MODE_MAP_REVERSE.get,
entity_category=EntityCategory.CONFIG,
),
entity_class=MatterDoorLockOperatingModeSelectEntity,
required_attributes=(

View File

@@ -4,7 +4,6 @@ from __future__ import annotations
import asyncio
from datetime import timedelta
from http import HTTPStatus
from aiohttp import ClientConnectionError, ClientResponseError
from pymelcloud import get_devices
@@ -24,18 +23,21 @@ PLATFORMS = [Platform.CLIMATE, Platform.SENSOR, Platform.WATER_HEATER]
async def async_setup_entry(hass: HomeAssistant, entry: MelCloudConfigEntry) -> bool:
"""Establish connection with MELCloud."""
token = entry.data[CONF_TOKEN]
session = async_get_clientsession(hass)
try:
async with asyncio.timeout(10):
all_devices = await get_devices(
token=entry.data[CONF_TOKEN],
session=async_get_clientsession(hass),
token,
session,
conf_update_interval=timedelta(minutes=30),
device_set_debounce=timedelta(seconds=2),
)
except ClientResponseError as ex:
if ex.status in (HTTPStatus.UNAUTHORIZED, HTTPStatus.FORBIDDEN):
if ex.status in (401, 403):
raise ConfigEntryAuthFailed from ex
if ex.status == HTTPStatus.TOO_MANY_REQUESTS:
if ex.status == 429:
raise UpdateFailed(
"MELCloud rate limit exceeded. Your account may be temporarily blocked"
) from ex
@@ -47,21 +49,13 @@ async def async_setup_entry(hass: HomeAssistant, entry: MelCloudConfigEntry) ->
coordinators: dict[str, list[MelCloudDeviceUpdateCoordinator]] = {}
device_registry = dr.async_get(hass)
for device_type, devices in all_devices.items():
# Build coordinators for this device_type
coordinators[device_type] = [
MelCloudDeviceUpdateCoordinator(hass, device, entry) for device in devices
]
# Perform initial refreshes concurrently
await asyncio.gather(
*(
coordinator.async_config_entry_first_refresh()
for coordinator in coordinators[device_type]
)
)
# Register parent devices so zone entities can reference via_device
for coordinator in coordinators[device_type]:
coordinators[device_type] = []
for device in devices:
coordinator = MelCloudDeviceUpdateCoordinator(hass, device, entry)
# Perform initial refresh for this device
await coordinator.async_config_entry_first_refresh()
coordinators[device_type].append(coordinator)
# Register parent device now so zone entities can reference it via via_device
device_registry.async_get_or_create(
config_entry_id=entry.entry_id,
**coordinator.device_info,

View File

@@ -5,6 +5,7 @@ from __future__ import annotations
import asyncio
from collections.abc import Mapping
from http import HTTPStatus
import logging
from typing import Any
from aiohttp import ClientError, ClientResponseError
@@ -17,6 +18,8 @@ from homeassistant.helpers.aiohttp_client import async_get_clientsession
from .const import DOMAIN
_LOGGER = logging.getLogger(__name__)
class FlowHandler(ConfigFlow, domain=DOMAIN):
"""Handle a config flow."""
@@ -34,7 +37,8 @@ class FlowHandler(ConfigFlow, domain=DOMAIN):
async def _create_client(
self,
username: str,
password: str,
*,
password: str | None = None,
token: str | None = None,
) -> ConfigFlowResult:
"""Create client."""
@@ -42,13 +46,13 @@ class FlowHandler(ConfigFlow, domain=DOMAIN):
async with asyncio.timeout(10):
if (acquired_token := token) is None:
acquired_token = await pymelcloud.login(
email=username,
password=password,
session=async_get_clientsession(self.hass),
username,
password,
async_get_clientsession(self.hass),
)
await pymelcloud.get_devices(
token=acquired_token,
session=async_get_clientsession(self.hass),
acquired_token,
async_get_clientsession(self.hass),
)
except ClientResponseError as err:
if err.status in (HTTPStatus.UNAUTHORIZED, HTTPStatus.FORBIDDEN):
@@ -74,9 +78,8 @@ class FlowHandler(ConfigFlow, domain=DOMAIN):
{vol.Required(CONF_USERNAME): str, vol.Required(CONF_PASSWORD): str}
),
)
return await self._create_client(
username=user_input[CONF_USERNAME], password=user_input[CONF_PASSWORD]
)
username = user_input[CONF_USERNAME]
return await self._create_client(username, password=user_input[CONF_PASSWORD])
async def async_step_reauth(
self, entry_data: Mapping[str, Any]
@@ -115,9 +118,9 @@ class FlowHandler(ConfigFlow, domain=DOMAIN):
try:
async with asyncio.timeout(10):
acquired_token = await pymelcloud.login(
email=user_input[CONF_USERNAME],
password=user_input[CONF_PASSWORD],
session=async_get_clientsession(self.hass),
user_input[CONF_USERNAME],
user_input[CONF_PASSWORD],
async_get_clientsession(self.hass),
)
except (ClientResponseError, AttributeError) as err:
if (
@@ -131,7 +134,10 @@ class FlowHandler(ConfigFlow, domain=DOMAIN):
errors["base"] = "invalid_auth"
else:
errors["base"] = "cannot_connect"
except (TimeoutError, ClientError):
except (
TimeoutError,
ClientError,
):
errors["base"] = "cannot_connect"
return acquired_token, errors
@@ -149,9 +155,9 @@ class FlowHandler(ConfigFlow, domain=DOMAIN):
try:
async with asyncio.timeout(10):
acquired_token = await pymelcloud.login(
email=user_input[CONF_USERNAME],
password=user_input[CONF_PASSWORD],
session=async_get_clientsession(self.hass),
user_input[CONF_USERNAME],
user_input[CONF_PASSWORD],
async_get_clientsession(self.hass),
)
except (ClientResponseError, AttributeError) as err:
if (

View File

@@ -3,7 +3,6 @@
from __future__ import annotations
import logging
from typing import Any
import messagebird
from messagebird.client import ErrorException
@@ -56,7 +55,7 @@ class MessageBirdNotificationService(BaseNotificationService):
self.sender = sender
self.client = client
def send_message(self, message: str = "", **kwargs: Any) -> None:
def send_message(self, message=None, **kwargs):
"""Send a message to a specified target."""
if not (targets := kwargs.get(ATTR_TARGET)):
_LOGGER.error("No target specified")

View File

@@ -116,7 +116,7 @@ class MfiSensor(SensorEntity):
return round(self._port.value, digits)
@property
def device_class(self) -> SensorDeviceClass | None:
def device_class(self):
"""Return the device class of the sensor."""
try:
tag = self._port.tag

View File

@@ -46,7 +46,7 @@ class MobileAppFlowHandler(ConfigFlow, domain=DOMAIN):
"device_tracker",
DOMAIN,
user_input[ATTR_DEVICE_ID],
object_id_base=user_input[ATTR_DEVICE_NAME],
suggested_object_id=user_input[ATTR_DEVICE_NAME],
)
await person.async_add_user_device_tracker(
self.hass, user_input[CONF_USER_ID], devt_entry.entity_id

View File

@@ -16,5 +16,5 @@
"iot_class": "local_push",
"loggers": ["nacl"],
"quality_scale": "internal",
"requirements": ["PyNaCl==1.6.2"]
"requirements": ["PyNaCl==1.6.0"]
}

View File

@@ -6,7 +6,6 @@ import asyncio
from functools import partial
from http import HTTPStatus
import logging
from typing import Any
import aiohttp
@@ -48,7 +47,7 @@ from .util import supports_push
_LOGGER = logging.getLogger(__name__)
def push_registrations(hass: HomeAssistant) -> dict[str, str]:
def push_registrations(hass):
"""Return a dictionary of push enabled registrations."""
targets = {}
@@ -91,32 +90,38 @@ async def async_get_service(
discovery_info: DiscoveryInfoType | None = None,
) -> MobileAppNotificationService:
"""Get the mobile_app notification service."""
service = hass.data[DOMAIN][DATA_NOTIFY] = MobileAppNotificationService()
service = hass.data[DOMAIN][DATA_NOTIFY] = MobileAppNotificationService(hass)
return service
class MobileAppNotificationService(BaseNotificationService):
"""Implement the notification service for mobile_app."""
def __init__(self, hass):
"""Initialize the service."""
self._hass = hass
@property
def targets(self) -> dict[str, str]:
def targets(self):
"""Return a dictionary of registered targets."""
return push_registrations(self.hass)
async def async_send_message(self, message: str = "", **kwargs: Any) -> None:
async def async_send_message(self, message="", **kwargs):
"""Send a message to the Lambda APNS gateway."""
data = {ATTR_MESSAGE: message}
# Remove default title from notifications.
if (
title_arg := kwargs.get(ATTR_TITLE)
) is not None and title_arg != ATTR_TITLE_DEFAULT:
data[ATTR_TITLE] = title_arg
kwargs.get(ATTR_TITLE) is not None
and kwargs.get(ATTR_TITLE) != ATTR_TITLE_DEFAULT
):
data[ATTR_TITLE] = kwargs.get(ATTR_TITLE)
if not (targets := kwargs.get(ATTR_TARGET)):
targets = push_registrations(self.hass).values()
if (data_arg := kwargs.get(ATTR_DATA)) is not None:
data[ATTR_DATA] = data_arg
if kwargs.get(ATTR_DATA) is not None:
data[ATTR_DATA] = kwargs.get(ATTR_DATA)
local_push_channels = self.hass.data[DOMAIN][DATA_PUSH_CHANNEL]
@@ -161,7 +166,7 @@ class MobileAppNotificationService(BaseNotificationService):
try:
async with asyncio.timeout(10):
response = await async_get_clientsession(self.hass).post(
response = await async_get_clientsession(self._hass).post(
push_url, json=target_data
)
result = await response.json()

View File

@@ -3,7 +3,6 @@
from __future__ import annotations
import logging
from typing import Any
import pymsteams
import voluptuous as vol
@@ -50,7 +49,7 @@ class MSTeamsNotificationService(BaseNotificationService):
"""Initialize the service."""
self._webhook_url = webhook_url
def send_message(self, message: str = "", **kwargs: Any) -> None:
def send_message(self, message=None, **kwargs):
"""Send a message to the webhook."""
teams_message = pymsteams.connectorcard(self._webhook_url)

View File

@@ -1,21 +1,28 @@
"""Support for namecheap DNS services."""
from datetime import timedelta
import logging
from aiohttp import ClientError, ClientSession
import voluptuous as vol
from homeassistant.config_entries import SOURCE_IMPORT
from homeassistant.config_entries import SOURCE_IMPORT, ConfigEntry
from homeassistant.const import CONF_DOMAIN, CONF_HOST, CONF_PASSWORD
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.event import async_track_time_interval
from homeassistant.helpers.typing import ConfigType
from .const import DOMAIN
from .coordinator import NamecheapConfigEntry, NamecheapDnsUpdateCoordinator
from .const import DOMAIN, UPDATE_URL
_LOGGER = logging.getLogger(__name__)
INTERVAL = timedelta(minutes=5)
CONFIG_SCHEMA = vol.Schema(
{
DOMAIN: vol.Schema(
@@ -29,6 +36,8 @@ CONFIG_SCHEMA = vol.Schema(
extra=vol.ALLOW_EXTRA,
)
type NamecheapConfigEntry = ConfigEntry[None]
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Initialize the namecheap DNS component."""
@@ -45,13 +54,37 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
async def async_setup_entry(hass: HomeAssistant, entry: NamecheapConfigEntry) -> bool:
"""Set up Namecheap DynamicDNS from a config entry."""
host = entry.data[CONF_HOST]
domain = entry.data[CONF_DOMAIN]
password = entry.data[CONF_PASSWORD]
coordinator = NamecheapDnsUpdateCoordinator(hass, entry)
await coordinator.async_config_entry_first_refresh()
entry.runtime_data = coordinator
session = async_get_clientsession(hass)
# Add a dummy listener as we do not have regular entities
entry.async_on_unload(coordinator.async_add_listener(lambda: None))
try:
if not await update_namecheapdns(session, host, domain, password):
raise ConfigEntryNotReady(
translation_domain=DOMAIN,
translation_key="update_failed",
translation_placeholders={
CONF_DOMAIN: f"{entry.data[CONF_HOST]}.{entry.data[CONF_DOMAIN]}"
},
)
except ClientError as e:
raise ConfigEntryNotReady(
translation_domain=DOMAIN,
translation_key="connection_error",
translation_placeholders={
CONF_DOMAIN: f"{entry.data[CONF_HOST]}.{entry.data[CONF_DOMAIN]}"
},
) from e
async def update_domain_interval(now):
"""Update the namecheap DNS entry."""
await update_namecheapdns(session, host, domain, password)
entry.async_on_unload(
async_track_time_interval(hass, update_domain_interval, INTERVAL)
)
return True
@@ -59,3 +92,19 @@ async def async_setup_entry(hass: HomeAssistant, entry: NamecheapConfigEntry) ->
async def async_unload_entry(hass: HomeAssistant, entry: NamecheapConfigEntry) -> bool:
"""Unload a config entry."""
return True
async def update_namecheapdns(
session: ClientSession, host: str, domain: str, password: str
):
"""Update namecheap DNS entry."""
params = {"host": host, "domain": domain, "password": password}
resp = await session.get(UPDATE_URL, params=params)
xml_string = await resp.text()
if "<ErrCount>0</ErrCount>" not in xml_string:
_LOGGER.warning("Updating namecheap domain failed: %s", domain)
return False
return True

View File

@@ -9,7 +9,7 @@ from aiohttp import ClientError
import voluptuous as vol
from homeassistant.config_entries import ConfigFlow, ConfigFlowResult
from homeassistant.const import CONF_DOMAIN, CONF_HOST, CONF_NAME, CONF_PASSWORD
from homeassistant.const import CONF_DOMAIN, CONF_HOST, CONF_PASSWORD
from homeassistant.helpers import config_validation as cv
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.selector import (
@@ -18,8 +18,8 @@ from homeassistant.helpers.selector import (
TextSelectorType,
)
from . import update_namecheapdns
from .const import DOMAIN
from .helpers import update_namecheapdns
from .issue import deprecate_yaml_issue
_LOGGER = logging.getLogger(__name__)
@@ -37,16 +37,6 @@ STEP_USER_DATA_SCHEMA = vol.Schema(
}
)
STEP_RECONFIGURE_DATA_SCHEMA = vol.Schema(
{
vol.Required(CONF_PASSWORD): TextSelector(
TextSelectorConfig(
type=TextSelectorType.PASSWORD, autocomplete="current-password"
)
),
}
)
class NamecheapDnsConfigFlow(ConfigFlow, domain=DOMAIN):
"""Handle a config flow for Namecheap DynamicDNS."""
@@ -99,41 +89,3 @@ class NamecheapDnsConfigFlow(ConfigFlow, domain=DOMAIN):
deprecate_yaml_issue(self.hass, import_success=True)
return result
async def async_step_reconfigure(
self, user_input: dict[str, Any] | None = None
) -> ConfigFlowResult:
"""Handle reconfigure flow."""
errors: dict[str, str] = {}
entry = self._get_reconfigure_entry()
if user_input is not None:
session = async_get_clientsession(self.hass)
try:
if not await update_namecheapdns(
session,
entry.data[CONF_HOST],
entry.data[CONF_DOMAIN],
user_input[CONF_PASSWORD],
):
errors["base"] = "update_failed"
except ClientError:
_LOGGER.debug("Cannot connect", exc_info=True)
errors["base"] = "cannot_connect"
except Exception:
_LOGGER.exception("Unexpected exception")
errors["base"] = "unknown"
if not errors:
return self.async_update_reload_and_abort(
entry,
data_updates=user_input,
)
return self.async_show_form(
step_id="reconfigure",
data_schema=STEP_RECONFIGURE_DATA_SCHEMA,
errors=errors,
description_placeholders={CONF_NAME: entry.title},
)

View File

@@ -1,61 +0,0 @@
"""Coordinator for the Namecheap DynamicDNS integration."""
from datetime import timedelta
import logging
from aiohttp import ClientError
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_DOMAIN, CONF_HOST, CONF_PASSWORD
from homeassistant.core import HomeAssistant
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from .const import DOMAIN
from .helpers import update_namecheapdns
_LOGGER = logging.getLogger(__name__)
type NamecheapConfigEntry = ConfigEntry[NamecheapDnsUpdateCoordinator]
INTERVAL = timedelta(minutes=5)
class NamecheapDnsUpdateCoordinator(DataUpdateCoordinator[None]):
"""Namecheap DynamicDNS update coordinator."""
config_entry: NamecheapConfigEntry
def __init__(self, hass: HomeAssistant, config_entry: NamecheapConfigEntry) -> None:
"""Initialize the Namecheap DynamicDNS update coordinator."""
super().__init__(
hass,
_LOGGER,
config_entry=config_entry,
name=DOMAIN,
update_interval=INTERVAL,
)
self.session = async_get_clientsession(hass)
async def _async_update_data(self) -> None:
"""Update Namecheap DNS."""
host = self.config_entry.data[CONF_HOST]
domain = self.config_entry.data[CONF_DOMAIN]
password = self.config_entry.data[CONF_PASSWORD]
try:
if not await update_namecheapdns(self.session, host, domain, password):
raise UpdateFailed(
translation_domain=DOMAIN,
translation_key="update_failed",
translation_placeholders={CONF_DOMAIN: f"{host}.{domain}"},
)
except ClientError as e:
raise UpdateFailed(
translation_domain=DOMAIN,
translation_key="connection_error",
translation_placeholders={CONF_DOMAIN: f"{host}.{domain}"},
) from e

View File

@@ -1,24 +0,0 @@
"""Helpers for the Namecheap DynamicDNS integration."""
import logging
from aiohttp import ClientSession
from .const import UPDATE_URL
_LOGGER = logging.getLogger(__name__)
async def update_namecheapdns(
session: ClientSession, host: str, domain: str, password: str
):
"""Update namecheap DNS entry."""
params = {"host": host, "domain": domain, "password": password}
resp = await session.get(UPDATE_URL, params=params)
xml_string = await resp.text()
if "<ErrCount>0</ErrCount>" not in xml_string:
return False
return True

View File

@@ -1,8 +1,7 @@
{
"config": {
"abort": {
"already_configured": "[%key:common::config_flow::abort::already_configured_service%]",
"reconfigure_successful": "[%key:common::config_flow::abort::reconfigure_successful%]"
"already_configured": "[%key:common::config_flow::abort::already_configured_service%]"
},
"error": {
"cannot_connect": "[%key:common::config_flow::error::cannot_connect%]",
@@ -10,15 +9,6 @@
"update_failed": "Updating DNS failed"
},
"step": {
"reconfigure": {
"data": {
"password": "[%key:component::namecheapdns::config::step::user::data::password%]"
},
"data_description": {
"password": "[%key:component::namecheapdns::config::step::user::data_description::password%]"
},
"title": "Re-configure {name}"
},
"user": {
"data": {
"domain": "[%key:common::config_flow::data::username%]",

View File

@@ -6,5 +6,6 @@
"documentation": "https://www.home-assistant.io/integrations/nederlandse_spoorwegen",
"integration_type": "service",
"iot_class": "cloud_polling",
"quality_scale": "legacy",
"requirements": ["nsapi==3.1.3"]
}

View File

@@ -2,10 +2,7 @@
from __future__ import annotations
from homeassistant.components.binary_sensor import (
BinarySensorDeviceClass,
BinarySensorEntity,
)
from homeassistant.components.binary_sensor import BinarySensorEntity
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.entity_platform import AddEntitiesCallback
@@ -78,7 +75,7 @@ class NessZoneBinarySensor(BinarySensorEntity):
return self._state == 1
@property
def device_class(self) -> BinarySensorDeviceClass:
def device_class(self):
"""Return the class of this sensor, from DEVICE_CLASSES."""
return self._type

View File

@@ -40,7 +40,7 @@ class NetgearNotifyService(BaseNotificationService):
self.modem: Modem = discovery_info["modem"]
discovery_info["entry"].async_on_unload(self.async_unregister_services)
async def async_send_message(self, message: str = "", **kwargs: Any) -> None:
async def async_send_message(self, message="", **kwargs):
"""Send a message to a user."""
if not self.modem.token:

View File

@@ -7,5 +7,6 @@
"integration_type": "service",
"iot_class": "cloud_polling",
"loggers": ["pyrail"],
"quality_scale": "legacy",
"requirements": ["pyrail==0.4.1"]
}

View File

@@ -4,7 +4,6 @@ from __future__ import annotations
import logging
import os.path
from typing import Any
from notify_events import Message
@@ -124,7 +123,7 @@ class NotifyEventsNotificationService(BaseNotificationService):
return msg
def send_message(self, message: str, **kwargs: Any) -> None:
def send_message(self, message, **kwargs):
"""Send a message."""
data = kwargs.get(ATTR_DATA) or {}
token = data.get(ATTR_TOKEN, self.token)

View File

@@ -96,7 +96,7 @@ class NX584ZoneSensor(BinarySensorEntity):
self._zone_type = zone_type
@property
def device_class(self) -> BinarySensorDeviceClass:
def device_class(self):
"""Return the class of this sensor, from DEVICE_CLASSES."""
return self._zone_type

View File

@@ -84,7 +84,7 @@ class OASATelematicsSensor(SensorEntity):
return self._name
@property
def device_class(self) -> SensorDeviceClass:
def device_class(self):
"""Return the class of this sensor."""
return SensorDeviceClass.TIMESTAMP

View File

@@ -9,6 +9,6 @@
"integration_type": "service",
"iot_class": "local_push",
"loggers": ["nacl"],
"requirements": ["PyNaCl==1.6.2"],
"requirements": ["PyNaCl==1.6.0"],
"single_config_entry": true
}

View File

@@ -2,8 +2,7 @@
from __future__ import annotations
from collections.abc import Callable, Coroutine
from typing import Any, Literal
from typing import Literal
from pooldose.type_definitions import DeviceInfoDict, ValueDict
@@ -81,10 +80,7 @@ class PooldoseEntity(CoordinatorEntity[PooldoseCoordinator]):
return platform_data.get(self.entity_description.key)
async def _async_perform_write(
self,
api_call: Callable[[str, Any], Coroutine[Any, Any, bool]],
key: str,
value: bool | str | float,
self, api_call, key: str, value: bool | str | float
) -> None:
"""Perform a write call to the API with unified error handling.

View File

@@ -11,6 +11,6 @@
"documentation": "https://www.home-assistant.io/integrations/pooldose",
"integration_type": "device",
"iot_class": "local_polling",
"quality_scale": "platinum",
"quality_scale": "gold",
"requirements": ["python-pooldose==0.8.2"]
}

View File

@@ -71,4 +71,4 @@ rules:
# Platinum
async-dependency: done
inject-websession: done
strict-typing: done
strict-typing: todo

View File

@@ -7,5 +7,5 @@
"integration_type": "hub",
"iot_class": "local_polling",
"quality_scale": "bronze",
"requirements": ["pyportainer==1.0.23"]
"requirements": ["pyportainer==1.0.22"]
}

View File

@@ -7,5 +7,6 @@
"integration_type": "service",
"iot_class": "cloud_push",
"loggers": ["prowl"],
"quality_scale": "legacy",
"requirements": ["prowlpy==1.1.1"]
}

View File

@@ -126,9 +126,9 @@ async def async_migrate_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"media_player",
DOMAIN,
unique_id,
suggested_object_id=new_id,
config_entry=entry,
device_id=e_entry.device_id,
object_id_base=new_id,
)
_LOGGER.debug(
"PlayStation 4 identifier for entity: %s has changed",

View File

@@ -6,10 +6,7 @@ import logging
from pyqwikswitch.qwikswitch import SENSORS
from homeassistant.components.binary_sensor import (
BinarySensorDeviceClass,
BinarySensorEntity,
)
from homeassistant.components.binary_sensor import BinarySensorEntity
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
@@ -79,6 +76,6 @@ class QSBinarySensor(QSEntity, BinarySensorEntity):
return f"qs{self.qsid}:{self.channel}"
@property
def device_class(self) -> BinarySensorDeviceClass:
def device_class(self):
"""Return the class of this sensor."""
return self._class

View File

@@ -4,7 +4,6 @@ from __future__ import annotations
from http import HTTPStatus
import logging
from typing import Any
from rocketchat_API.APIExceptions.RocketExceptions import (
RocketAuthenticationException,
@@ -70,7 +69,7 @@ class RocketChatNotificationService(BaseNotificationService):
self._room = room
self._server = RocketChat(username, password, server_url=url)
def send_message(self, message: str = "", **kwargs: Any) -> None:
def send_message(self, message="", **kwargs):
"""Send a message to Rocket.Chat."""
data = kwargs.get(ATTR_DATA) or {}
resp = self._server.chat_post_message(message, channel=self._room, **data)

View File

@@ -4,7 +4,6 @@ from __future__ import annotations
from http import HTTPStatus
import logging
from typing import Any
from sendgrid import SendGridAPIClient
import voluptuous as vol
@@ -62,7 +61,7 @@ class SendgridNotificationService(BaseNotificationService):
self._sg = SendGridAPIClient(self.api_key)
def send_message(self, message: str = "", **kwargs: Any) -> None:
def send_message(self, message="", **kwargs):
"""Send an email to a user via SendGrid."""
subject = kwargs.get(ATTR_TITLE, ATTR_TITLE_DEFAULT)

View File

@@ -3,7 +3,6 @@
from __future__ import annotations
import logging
from typing import Any
from clx.xms.api import MtBatchTextSmsResult
from clx.xms.client import Client
@@ -68,7 +67,7 @@ class SinchNotificationService(BaseNotificationService):
self.sender = config[CONF_SENDER]
self.client = Client(config[CONF_SERVICE_PLAN_ID], config[CONF_API_KEY])
def send_message(self, message: str = "", **kwargs: Any) -> None:
def send_message(self, message="", **kwargs):
"""Send a message to a user."""
targets = kwargs.get(ATTR_TARGET, self.default_recipients)
data = kwargs.get(ATTR_DATA) or {}

Some files were not shown because too many files have changed in this diff Show More