mirror of
https://github.com/home-assistant/core.git
synced 2025-08-04 21:25:13 +02:00
Add 'max_sub_interval' option to derivative sensor (#125870)
* Add 'max_sub_interval' option to derivative sensor * add strings * little coverage * improve test accuracy * reimplement at dev head * string * handle unavailable * simplify * Add self to codeowner * fix on remove * Update homeassistant/components/derivative/sensor.py Co-authored-by: Erik Montnemery <erik@montnemery.com> * Fix parenthesis * sort strings --------- Co-authored-by: Erik Montnemery <erik@montnemery.com>
This commit is contained in:
@@ -36,6 +36,7 @@ async def test_config_flow(hass: HomeAssistant, platform) -> None:
|
||||
"source": input_sensor_entity_id,
|
||||
"time_window": {"seconds": 0},
|
||||
"unit_time": "min",
|
||||
"max_sub_interval": {"minutes": 1},
|
||||
},
|
||||
)
|
||||
await hass.async_block_till_done()
|
||||
@@ -49,6 +50,7 @@ async def test_config_flow(hass: HomeAssistant, platform) -> None:
|
||||
"source": "sensor.input",
|
||||
"time_window": {"seconds": 0.0},
|
||||
"unit_time": "min",
|
||||
"max_sub_interval": {"minutes": 1.0},
|
||||
}
|
||||
assert len(mock_setup_entry.mock_calls) == 1
|
||||
|
||||
@@ -60,6 +62,7 @@ async def test_config_flow(hass: HomeAssistant, platform) -> None:
|
||||
"source": "sensor.input",
|
||||
"time_window": {"seconds": 0.0},
|
||||
"unit_time": "min",
|
||||
"max_sub_interval": {"minutes": 1.0},
|
||||
}
|
||||
assert config_entry.title == "My derivative"
|
||||
|
||||
@@ -78,6 +81,7 @@ async def test_options(hass: HomeAssistant, platform) -> None:
|
||||
"time_window": {"seconds": 0.0},
|
||||
"unit_prefix": "k",
|
||||
"unit_time": "min",
|
||||
"max_sub_interval": {"seconds": 30},
|
||||
},
|
||||
title="My derivative",
|
||||
)
|
||||
|
@@ -9,13 +9,13 @@ from freezegun import freeze_time
|
||||
|
||||
from homeassistant.components.derivative.const import DOMAIN
|
||||
from homeassistant.components.sensor import ATTR_STATE_CLASS, SensorStateClass
|
||||
from homeassistant.const import UnitOfPower, UnitOfTime
|
||||
from homeassistant.const import STATE_UNAVAILABLE, UnitOfPower, UnitOfTime
|
||||
from homeassistant.core import HomeAssistant, State
|
||||
from homeassistant.helpers import device_registry as dr, entity_registry as er
|
||||
from homeassistant.setup import async_setup_component
|
||||
from homeassistant.util import dt as dt_util
|
||||
|
||||
from tests.common import MockConfigEntry
|
||||
from tests.common import MockConfigEntry, async_fire_time_changed
|
||||
|
||||
|
||||
async def test_state(hass: HomeAssistant) -> None:
|
||||
@@ -371,6 +371,177 @@ async def test_double_signal_after_delay(hass: HomeAssistant) -> None:
|
||||
previous = derivative
|
||||
|
||||
|
||||
async def test_sub_intervals_instantaneous(hass: HomeAssistant) -> None:
|
||||
"""Test derivative sensor state."""
|
||||
# We simulate the following situation:
|
||||
# Value changes from 0 to 10 in 5 seconds (derivative = 2)
|
||||
# The max_sub_interval is 20 seconds
|
||||
# After max_sub_interval elapses, derivative should change to 0
|
||||
# Value changes to 0, 35 seconds after changing to 10 (derivative = -10/35 = -0.29)
|
||||
# State goes unavailable, derivative stops changing after that.
|
||||
# State goes back to 0, derivative returns to 0 after a max_sub_interval
|
||||
|
||||
max_sub_interval = 20
|
||||
|
||||
config, entity_id = await _setup_sensor(
|
||||
hass,
|
||||
{
|
||||
"unit_time": UnitOfTime.SECONDS,
|
||||
"round": 2,
|
||||
"max_sub_interval": {"seconds": max_sub_interval},
|
||||
},
|
||||
)
|
||||
|
||||
base = dt_util.utcnow()
|
||||
with freeze_time(base) as freezer:
|
||||
freezer.move_to(base)
|
||||
hass.states.async_set(entity_id, 0, {}, force_update=True)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
now = base + timedelta(seconds=5)
|
||||
freezer.move_to(now)
|
||||
hass.states.async_set(entity_id, 10, {}, force_update=True)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
state = hass.states.get("sensor.power")
|
||||
derivative = round(float(state.state), config["sensor"]["round"])
|
||||
assert derivative == 2
|
||||
|
||||
# No change yet as sub_interval not elapsed
|
||||
now += timedelta(seconds=15)
|
||||
async_fire_time_changed(hass, now)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
state = hass.states.get("sensor.power")
|
||||
derivative = round(float(state.state), config["sensor"]["round"])
|
||||
assert derivative == 2
|
||||
|
||||
# After 5 more seconds the sub_interval should fire and derivative should be 0
|
||||
now += timedelta(seconds=10)
|
||||
async_fire_time_changed(hass, now)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
state = hass.states.get("sensor.power")
|
||||
derivative = round(float(state.state), config["sensor"]["round"])
|
||||
assert derivative == 0
|
||||
|
||||
now += timedelta(seconds=10)
|
||||
freezer.move_to(now)
|
||||
hass.states.async_set(entity_id, 0, {}, force_update=True)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
state = hass.states.get("sensor.power")
|
||||
derivative = round(float(state.state), config["sensor"]["round"])
|
||||
assert derivative == -0.29
|
||||
|
||||
now += timedelta(seconds=10)
|
||||
freezer.move_to(now)
|
||||
hass.states.async_set(entity_id, STATE_UNAVAILABLE, {}, force_update=True)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
state = hass.states.get("sensor.power")
|
||||
derivative = round(float(state.state), config["sensor"]["round"])
|
||||
assert derivative == -0.29
|
||||
|
||||
now += timedelta(seconds=60)
|
||||
async_fire_time_changed(hass, now)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
state = hass.states.get("sensor.power")
|
||||
derivative = round(float(state.state), config["sensor"]["round"])
|
||||
assert derivative == -0.29
|
||||
|
||||
now += timedelta(seconds=10)
|
||||
freezer.move_to(now)
|
||||
hass.states.async_set(entity_id, 0, {}, force_update=True)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
state = hass.states.get("sensor.power")
|
||||
derivative = round(float(state.state), config["sensor"]["round"])
|
||||
assert derivative == -0.29
|
||||
|
||||
now += timedelta(seconds=max_sub_interval + 1)
|
||||
async_fire_time_changed(hass, now)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
state = hass.states.get("sensor.power")
|
||||
derivative = round(float(state.state), config["sensor"]["round"])
|
||||
assert derivative == 0
|
||||
|
||||
|
||||
async def test_sub_intervals_with_time_window(hass: HomeAssistant) -> None:
|
||||
"""Test derivative sensor state."""
|
||||
# We simulate the following situation:
|
||||
# The value rises by 1 every second for 1 minute, then pauses
|
||||
# The time window is 30 seconds
|
||||
# The max_sub_interval is 5 seconds
|
||||
# After the value stops increasing, the derivative should slowly trend back to 0
|
||||
|
||||
values = []
|
||||
for value in range(60):
|
||||
values += [value]
|
||||
time_window = 30
|
||||
max_sub_interval = 5
|
||||
times = values
|
||||
|
||||
config, entity_id = await _setup_sensor(
|
||||
hass,
|
||||
{
|
||||
"time_window": {"seconds": time_window},
|
||||
"unit_time": UnitOfTime.SECONDS,
|
||||
"round": 2,
|
||||
"max_sub_interval": {"seconds": max_sub_interval},
|
||||
},
|
||||
)
|
||||
|
||||
base = dt_util.utcnow()
|
||||
with freeze_time(base) as freezer:
|
||||
last_state_change = None
|
||||
for time, value in zip(times, values, strict=False):
|
||||
now = base + timedelta(seconds=time)
|
||||
freezer.move_to(now)
|
||||
hass.states.async_set(entity_id, value, {}, force_update=True)
|
||||
last_state_change = now
|
||||
await hass.async_block_till_done()
|
||||
|
||||
if time_window < time:
|
||||
state = hass.states.get("sensor.power")
|
||||
derivative = round(float(state.state), config["sensor"]["round"])
|
||||
# Test that the error is never more than
|
||||
# (time_window_in_minutes / true_derivative * 100) = 1% + ε
|
||||
assert abs(1 - derivative) <= 0.01 + 1e-6
|
||||
|
||||
for time in range(60):
|
||||
now = last_state_change + timedelta(seconds=time)
|
||||
freezer.move_to(now)
|
||||
|
||||
async_fire_time_changed(hass, now)
|
||||
await hass.async_block_till_done()
|
||||
|
||||
state = hass.states.get("sensor.power")
|
||||
derivative = round(float(state.state), config["sensor"]["round"])
|
||||
|
||||
def calc_expected(elapsed_seconds: int, calculation_delay: int = 0):
|
||||
last_sub_interval = (
|
||||
elapsed_seconds // max_sub_interval
|
||||
) * max_sub_interval
|
||||
return (
|
||||
0
|
||||
if (last_sub_interval >= time_window)
|
||||
else (
|
||||
(time_window - last_sub_interval - calculation_delay)
|
||||
/ time_window
|
||||
)
|
||||
)
|
||||
|
||||
rounding_err = 0.01 + 1e-6
|
||||
expect_max = calc_expected(time) + rounding_err
|
||||
# Allow one second of slop for internal delays
|
||||
expect_min = calc_expected(time, 1) - rounding_err
|
||||
|
||||
assert expect_min <= derivative <= expect_max, f"Failed at time {time}"
|
||||
|
||||
|
||||
async def test_prefix(hass: HomeAssistant) -> None:
|
||||
"""Test derivative sensor state using a power source."""
|
||||
config = {
|
||||
|
Reference in New Issue
Block a user