Add fans and battery sensor to systemmonitor

This commit is contained in:
G Johansson
2025-08-23 11:19:32 +00:00
parent af951ff0d4
commit 7ec7d85067
6 changed files with 160 additions and 31 deletions

View File

@@ -9,7 +9,7 @@ import os
from typing import TYPE_CHECKING, Any, NamedTuple
from psutil import Process
from psutil._common import sdiskusage, shwtemp, snetio, snicaddr, sswap
from psutil._common import sbattery, sdiskusage, shwtemp, snetio, snicaddr, sswap
import psutil_home_assistant as ha_psutil
from homeassistant.core import HomeAssistant
@@ -19,6 +19,7 @@ from homeassistant.util import dt as dt_util
if TYPE_CHECKING:
from . import SystemMonitorConfigEntry
from .util import read_fan_rpm
_LOGGER = logging.getLogger(__name__)
@@ -27,15 +28,17 @@ _LOGGER = logging.getLogger(__name__)
class SensorData:
"""Sensor data."""
disk_usage: dict[str, sdiskusage]
swap: sswap
memory: VirtualMemory
io_counters: dict[str, snetio]
addresses: dict[str, list[snicaddr]]
load: tuple[float, float, float]
cpu_percent: float | None
battery: sbattery | None
boot_time: datetime
cpu_percent: float | None
disk_usage: dict[str, sdiskusage]
fan_rpm: dict[str, int]
io_counters: dict[str, snetio]
load: tuple[float, float, float]
memory: VirtualMemory
processes: list[Process]
swap: sswap
temperatures: dict[str, list[shwtemp]]
def as_dict(self) -> dict[str, Any]:
@@ -43,6 +46,9 @@ class SensorData:
disk_usage = None
if self.disk_usage:
disk_usage = {k: str(v) for k, v in self.disk_usage.items()}
fan_rpm = None
if self.fan_rpm:
fan_rpm = {k: str(v) for k, v in self.fan_rpm.items()}
io_counters = None
if self.io_counters:
io_counters = {k: str(v) for k, v in self.io_counters.items()}
@@ -52,16 +58,19 @@ class SensorData:
temperatures = None
if self.temperatures:
temperatures = {k: str(v) for k, v in self.temperatures.items()}
return {
"disk_usage": disk_usage,
"swap": str(self.swap),
"memory": str(self.memory),
"io_counters": io_counters,
"addresses": addresses,
"load": str(self.load),
"cpu_percent": str(self.cpu_percent),
"battery": str(self.battery),
"boot_time": str(self.boot_time),
"cpu_percent": str(self.cpu_percent),
"disk_usage": disk_usage,
"fan_rpm": fan_rpm,
"io_counters": io_counters,
"load": str(self.load),
"memory": str(self.memory),
"processes": str(self.processes),
"swap": str(self.swap),
"temperatures": temperatures,
}
@@ -117,14 +126,16 @@ class SystemMonitorCoordinator(TimestampDataUpdateCoordinator[SensorData]):
_disk_defaults[("disks", argument)] = set()
return {
**_disk_defaults,
("swap", ""): set(),
("memory", ""): set(),
("io_counters", ""): set(),
("addresses", ""): set(),
("load", ""): set(),
("cpu_percent", ""): set(),
("battery", ""): set(),
("boot", ""): set(),
("cpu_percent", ""): set(),
("fan_rpm", ""): set(),
("io_counters", ""): set(),
("load", ""): set(),
("memory", ""): set(),
("processes", ""): set(),
("swap", ""): set(),
("temperatures", ""): set(),
}
@@ -146,15 +157,17 @@ class SystemMonitorCoordinator(TimestampDataUpdateCoordinator[SensorData]):
self._initial_update = False
return SensorData(
disk_usage=_data["disks"],
swap=_data["swap"],
memory=_data["memory"],
io_counters=_data["io_counters"],
addresses=_data["addresses"],
load=load,
cpu_percent=cpu_percent,
battery=_data["battery"],
boot_time=_data["boot_time"],
cpu_percent=cpu_percent,
disk_usage=_data["disks"],
fan_rpm=_data["fan_rpm"],
io_counters=_data["io_counters"],
load=load,
memory=_data["memory"],
processes=_data["processes"],
swap=_data["swap"],
temperatures=_data["temperatures"],
)
@@ -217,13 +230,32 @@ class SystemMonitorCoordinator(TimestampDataUpdateCoordinator[SensorData]):
except AttributeError:
_LOGGER.debug("OS does not provide temperature sensors")
fan_rpm: dict[str, int] = {}
if self.update_subscribers[("fan_rpm", "")] or self._initial_update:
try:
fan_sensors = self._psutil.sensors_fans()
fan_rpm = read_fan_rpm(fan_sensors)
_LOGGER.debug("fan_rpm: %s", fan_rpm)
except AttributeError:
_LOGGER.debug("OS does not provide fan sensors")
battery: sbattery | None = None
if self.update_subscribers[("battery", "")] or self._initial_update:
try:
battery = self._psutil.sensors_battery()
_LOGGER.debug("battery: %s", battery)
except AttributeError:
_LOGGER.debug("OS does not provide battery sensors")
return {
"disks": disks,
"swap": swap,
"memory": memory,
"io_counters": io_counters,
"addresses": addresses,
"battery": battery,
"boot_time": self.boot_time,
"disks": disks,
"fan_rpm": fan_rpm,
"io_counters": io_counters,
"memory": memory,
"processes": processes,
"swap": swap,
"temperatures": temps,
}

View File

@@ -1,6 +1,9 @@
{
"entity": {
"sensor": {
"battery": {
"default": "mdi:battery"
},
"disk_free": {
"default": "mdi:harddisk"
},
@@ -10,6 +13,9 @@
"disk_use_percent": {
"default": "mdi:harddisk"
},
"fan_rpm": {
"default": "mdi:fan"
},
"ipv4_address": {
"default": "mdi:ip-network"
},

View File

@@ -23,6 +23,7 @@ from homeassistant.components.sensor import (
)
from homeassistant.const import (
PERCENTAGE,
REVOLUTIONS_PER_MINUTE,
EntityCategory,
UnitOfDataRate,
UnitOfInformation,
@@ -130,6 +131,18 @@ class SysMonitorSensorEntityDescription(SensorEntityDescription):
SENSOR_TYPES: dict[str, SysMonitorSensorEntityDescription] = {
"battery": SysMonitorSensorEntityDescription(
key="battery",
translation_key="battery",
native_unit_of_measurement=PERCENTAGE,
device_class=SensorDeviceClass.BATTERY,
state_class=SensorStateClass.MEASUREMENT,
value_fn=lambda entity: entity.coordinator.data.battery.percent
if entity.coordinator.data.battery
else None,
none_is_unavailable=True,
add_to_update=lambda entity: ("battery", ""),
),
"disk_free": SysMonitorSensorEntityDescription(
key="disk_free",
translation_key="disk_free",
@@ -174,6 +187,16 @@ SENSOR_TYPES: dict[str, SysMonitorSensorEntityDescription] = {
none_is_unavailable=True,
add_to_update=lambda entity: ("disks", entity.argument),
),
"fan_rpm": SysMonitorSensorEntityDescription(
key="fan_rpm",
translation_key="fan_rpm",
placeholder="name",
native_unit_of_measurement=REVOLUTIONS_PER_MINUTE,
state_class=SensorStateClass.MEASUREMENT,
value_fn=lambda entity: entity.coordinator.data.fan_rpm[entity.argument],
none_is_unavailable=True,
add_to_update=lambda entity: ("fan_rpm", ""),
),
"ipv4_address": SysMonitorSensorEntityDescription(
key="ipv4_address",
translation_key="ipv4_address",
@@ -394,7 +417,7 @@ IO_COUNTER = {
IF_ADDRS_FAMILY = {"ipv4_address": socket.AF_INET, "ipv6_address": socket.AF_INET6}
async def async_setup_entry(
async def async_setup_entry( # noqa: C901
hass: HomeAssistant,
entry: SystemMonitorConfigEntry,
async_add_entities: AddConfigEntryEntitiesCallback,
@@ -567,6 +590,33 @@ async def async_setup_entry(
)
)
if _type == "battery":
argument = ""
loaded_resources.add(slugify(f"{_type}_{argument}"))
entities.append(
SystemMonitorSensor(
coordinator,
sensor_description,
entry.entry_id,
argument,
False,
)
)
for _arg in coordinator.data.fan_rpm:
if _type == "fan_rpm":
argument = ""
loaded_resources.add(slugify(f"{_type}_{argument}"))
entities.append(
SystemMonitorSensor(
coordinator,
sensor_description,
entry.entry_id,
_arg,
False,
)
)
# Ensure legacy imported disk_* resources are loaded if they are not part
# of mount points automatically discovered
for resource in legacy_resources:

View File

@@ -29,6 +29,9 @@
}
},
"sensor": {
"battery": {
"name": "Battery"
},
"disk_free": {
"name": "Disk free {mount_point}"
},
@@ -38,6 +41,9 @@
"disk_use_percent": {
"name": "Disk usage {mount_point}"
},
"fan_rpm": {
"name": "{name} fan RPM"
},
"ipv4_address": {
"name": "IPv4 address {ip_address}"
},

View File

@@ -3,7 +3,7 @@
import logging
import os
from psutil._common import shwtemp
from psutil._common import sfan, shwtemp
import psutil_home_assistant as ha_psutil
from homeassistant.core import HomeAssistant
@@ -84,6 +84,7 @@ def read_cpu_temperature(temps: dict[str, list[shwtemp]]) -> float | None:
entry: shwtemp
_LOGGER.debug("CPU Temperatures: %s", temps)
# {'acpitz': [shwtemp(label='', current=47.0, high=103.0, critical=103.0)] }
for name, entries in temps.items():
for i, entry in enumerate(entries, start=1):
# In case the label is empty (e.g. on Raspberry PI 4),
@@ -95,3 +96,20 @@ def read_cpu_temperature(temps: dict[str, list[shwtemp]]) -> float | None:
return round(entry.current, 1)
return None
def read_fan_rpm(fans: dict[str, list[sfan]]) -> dict[str, int]:
"""Attempt to read fan speed."""
entry: sfan
_LOGGER.debug("Fan rpm: %s", fans)
if not fans:
return {}
# {'asus': [sfan(label='cpu_fan', current=3200)] }
sensor_fans: dict[str, int] = {}
for name, entries in fans.items():
for entry in entries:
_label = name if not entry.label else entry.label
sensor_fans[_label] = round(entry.current, 0)
return sensor_fans

View File

@@ -7,7 +7,16 @@ import socket
from unittest.mock import AsyncMock, Mock, NonCallableMock, patch
from psutil import NoSuchProcess, Process
from psutil._common import sdiskpart, sdiskusage, shwtemp, snetio, snicaddr, sswap
from psutil._common import (
sbattery,
sdiskpart,
sdiskusage,
sfan,
shwtemp,
snetio,
snicaddr,
sswap,
)
import pytest
from homeassistant.components.systemmonitor.const import DOMAIN
@@ -182,6 +191,14 @@ def mock_psutil(mock_process: list[MockProcess]) -> Generator:
]
mock_psutil.boot_time.return_value = 1708786800.0
mock_psutil.NoSuchProcess = NoSuchProcess
# mock_psutil.sensors_fans = Mock()
mock_psutil.sensors_fans.return_value = {
"fan1": [sfan("fan1", 1200)],
"fan2": [sfan("fan2", 1300)],
}
mock_psutil.sensors_battery.return_value = sbattery(
percent=93, secsleft=16628, power_plugged=False
)
yield mock_psutil