Use event loop scheduling for tracking time patterns

This commit is contained in:
J. Nick Koston
2020-07-20 18:32:31 +00:00
parent dd459a7855
commit f9c43f0d65
3 changed files with 91 additions and 47 deletions

View File

@@ -1,4 +1,5 @@
"""Helpers for listening to events.""" """Helpers for listening to events."""
import asyncio
from datetime import datetime, timedelta from datetime import datetime, timedelta
import functools as ft import functools as ft
import logging import logging
@@ -590,7 +591,8 @@ def async_track_utc_time_change(
matching_minutes = dt_util.parse_time_expression(minute, 0, 59) matching_minutes = dt_util.parse_time_expression(minute, 0, 59)
matching_hours = dt_util.parse_time_expression(hour, 0, 23) matching_hours = dt_util.parse_time_expression(hour, 0, 23)
next_time = None last_now: datetime = dt_util.utcnow()
next_time: datetime = last_now
def calculate_next(now: datetime) -> None: def calculate_next(now: datetime) -> None:
"""Calculate and set the next time the trigger should fire.""" """Calculate and set the next time the trigger should fire."""
@@ -603,29 +605,44 @@ def async_track_utc_time_change(
# Make sure rolling back the clock doesn't prevent the timer from # Make sure rolling back the clock doesn't prevent the timer from
# triggering. # triggering.
last_now: Optional[datetime] = None cancel_callback: Optional[asyncio.TimerHandle] = None
calculate_next(last_now)
@callback @callback
def pattern_time_change_listener(event: Event) -> None: def pattern_time_change_listener() -> None:
"""Listen for matching time_changed events.""" """Listen for matching time_changed events."""
nonlocal next_time, last_now nonlocal next_time, last_now, cancel_callback
now = event.data[ATTR_NOW] now = dt_util.utcnow()
if last_now is None or now < last_now: if now < last_now:
# Time rolled back or next time not yet calculated # Time rolled back
calculate_next(now) calculate_next(now)
last_now = now
if next_time <= now: if next_time <= now:
hass.async_run_job(action, dt_util.as_local(now) if local else now) hass.async_run_job(action, dt_util.as_local(now) if local else now)
calculate_next(now + timedelta(seconds=1)) calculate_next(now + timedelta(seconds=1))
# We can't use async_track_point_in_utc_time here because it would last_now = now
# break in the case that the system time abruptly jumps backwards.
# Our custom last_now logic takes care of resolving that scenario. cancel_callback = hass.loop.call_at(
return hass.bus.async_listen(EVENT_TIME_CHANGED, pattern_time_change_listener) hass.loop.time() + next_time.timestamp() - time.time(),
pattern_time_change_listener,
)
cancel_callback = hass.loop.call_at(
hass.loop.time() + next_time.timestamp() - time.time(),
pattern_time_change_listener,
)
@callback
def unsub_pattern_time_change_listener() -> None:
"""Cancel the call_later."""
nonlocal cancel_callback
assert cancel_callback is not None
cancel_callback.cancel()
return unsub_pattern_time_change_listener
track_utc_time_change = threaded_listener_factory(async_track_utc_time_change) track_utc_time_change = threaded_listener_factory(async_track_utc_time_change)

View File

@@ -299,8 +299,11 @@ def async_fire_time_changed(hass, datetime_):
mock_seconds_into_future = datetime_.timestamp() - time.time() mock_seconds_into_future = datetime_.timestamp() - time.time()
if mock_seconds_into_future >= future_seconds: if mock_seconds_into_future >= future_seconds:
task._run() with patch(
task.cancel() "homeassistant.util.dt.utcnow", return_value=date_util.as_utc(datetime_)
):
task._run()
task.cancel()
fire_time_changed = threadsafe_callback_factory(async_fire_time_changed) fire_time_changed = threadsafe_callback_factory(async_fire_time_changed)

View File

@@ -748,22 +748,24 @@ async def test_async_track_time_change(hass):
wildcard_runs = [] wildcard_runs = []
specific_runs = [] specific_runs = []
now = dt_util.utcnow()
unsub = async_track_time_change(hass, lambda x: wildcard_runs.append(1)) unsub = async_track_time_change(hass, lambda x: wildcard_runs.append(1))
unsub_utc = async_track_utc_time_change( unsub_utc = async_track_utc_time_change(
hass, lambda x: specific_runs.append(1), second=[0, 30] hass, lambda x: specific_runs.append(1), second=[0, 30]
) )
async_fire_time_changed(hass, datetime(2014, 5, 24, 12, 0, 0)) async_fire_time_changed(hass, datetime(now.year + 1, 5, 24, 12, 0, 0))
await hass.async_block_till_done() await hass.async_block_till_done()
assert len(specific_runs) == 1 assert len(specific_runs) == 1
assert len(wildcard_runs) == 1 assert len(wildcard_runs) == 1
async_fire_time_changed(hass, datetime(2014, 5, 24, 12, 0, 15)) async_fire_time_changed(hass, datetime(now.year + 1, 5, 24, 12, 0, 15))
await hass.async_block_till_done() await hass.async_block_till_done()
assert len(specific_runs) == 1 assert len(specific_runs) == 1
assert len(wildcard_runs) == 2 assert len(wildcard_runs) == 2
async_fire_time_changed(hass, datetime(2014, 5, 24, 12, 0, 30)) async_fire_time_changed(hass, datetime(now.year + 1, 5, 24, 12, 0, 30))
await hass.async_block_till_done() await hass.async_block_till_done()
assert len(specific_runs) == 2 assert len(specific_runs) == 2
assert len(wildcard_runs) == 3 assert len(wildcard_runs) == 3
@@ -771,7 +773,7 @@ async def test_async_track_time_change(hass):
unsub() unsub()
unsub_utc() unsub_utc()
async_fire_time_changed(hass, datetime(2014, 5, 24, 12, 0, 30)) async_fire_time_changed(hass, datetime(now.year + 1, 5, 24, 12, 0, 30))
await hass.async_block_till_done() await hass.async_block_till_done()
assert len(specific_runs) == 2 assert len(specific_runs) == 2
assert len(wildcard_runs) == 3 assert len(wildcard_runs) == 3
@@ -781,25 +783,27 @@ async def test_periodic_task_minute(hass):
"""Test periodic tasks per minute.""" """Test periodic tasks per minute."""
specific_runs = [] specific_runs = []
now = dt_util.utcnow()
unsub = async_track_utc_time_change( unsub = async_track_utc_time_change(
hass, lambda x: specific_runs.append(1), minute="/5", second=0 hass, lambda x: specific_runs.append(1), minute="/5", second=0
) )
async_fire_time_changed(hass, datetime(2014, 5, 24, 12, 0, 0)) async_fire_time_changed(hass, datetime(now.year + 1, 5, 24, 12, 0, 0))
await hass.async_block_till_done() await hass.async_block_till_done()
assert len(specific_runs) == 1 assert len(specific_runs) == 1
async_fire_time_changed(hass, datetime(2014, 5, 24, 12, 3, 0)) async_fire_time_changed(hass, datetime(now.year + 1, 5, 24, 12, 3, 0))
await hass.async_block_till_done() await hass.async_block_till_done()
assert len(specific_runs) == 1 assert len(specific_runs) == 1
async_fire_time_changed(hass, datetime(2014, 5, 24, 12, 5, 0)) async_fire_time_changed(hass, datetime(now.year + 1, 5, 24, 12, 5, 0))
await hass.async_block_till_done() await hass.async_block_till_done()
assert len(specific_runs) == 2 assert len(specific_runs) == 2
unsub() unsub()
async_fire_time_changed(hass, datetime(2014, 5, 24, 12, 5, 0)) async_fire_time_changed(hass, datetime(now.year + 1, 5, 24, 12, 5, 0))
await hass.async_block_till_done() await hass.async_block_till_done()
assert len(specific_runs) == 2 assert len(specific_runs) == 2
@@ -808,33 +812,35 @@ async def test_periodic_task_hour(hass):
"""Test periodic tasks per hour.""" """Test periodic tasks per hour."""
specific_runs = [] specific_runs = []
now = dt_util.utcnow()
unsub = async_track_utc_time_change( unsub = async_track_utc_time_change(
hass, lambda x: specific_runs.append(1), hour="/2", minute=0, second=0 hass, lambda x: specific_runs.append(1), hour="/2", minute=0, second=0
) )
async_fire_time_changed(hass, datetime(2014, 5, 24, 22, 0, 0)) async_fire_time_changed(hass, datetime(now.year + 1, 5, 24, 22, 0, 0))
await hass.async_block_till_done() await hass.async_block_till_done()
assert len(specific_runs) == 1 assert len(specific_runs) == 1
async_fire_time_changed(hass, datetime(2014, 5, 24, 23, 0, 0)) async_fire_time_changed(hass, datetime(now.year + 1, 5, 24, 23, 0, 0))
await hass.async_block_till_done() await hass.async_block_till_done()
assert len(specific_runs) == 1 assert len(specific_runs) == 1
async_fire_time_changed(hass, datetime(2014, 5, 25, 0, 0, 0)) async_fire_time_changed(hass, datetime(now.year + 1, 5, 25, 0, 0, 0))
await hass.async_block_till_done() await hass.async_block_till_done()
assert len(specific_runs) == 2 assert len(specific_runs) == 2
async_fire_time_changed(hass, datetime(2014, 5, 25, 1, 0, 0)) async_fire_time_changed(hass, datetime(now.year + 1, 5, 25, 1, 0, 0))
await hass.async_block_till_done() await hass.async_block_till_done()
assert len(specific_runs) == 2 assert len(specific_runs) == 2
async_fire_time_changed(hass, datetime(2014, 5, 25, 2, 0, 0)) async_fire_time_changed(hass, datetime(now.year + 1, 5, 25, 2, 0, 0))
await hass.async_block_till_done() await hass.async_block_till_done()
assert len(specific_runs) == 3 assert len(specific_runs) == 3
unsub() unsub()
async_fire_time_changed(hass, datetime(2014, 5, 25, 2, 0, 0)) async_fire_time_changed(hass, datetime(now.year + 1, 5, 25, 2, 0, 0))
await hass.async_block_till_done() await hass.async_block_till_done()
assert len(specific_runs) == 3 assert len(specific_runs) == 3
@@ -843,12 +849,14 @@ async def test_periodic_task_wrong_input(hass):
"""Test periodic tasks with wrong input.""" """Test periodic tasks with wrong input."""
specific_runs = [] specific_runs = []
now = dt_util.utcnow()
with pytest.raises(ValueError): with pytest.raises(ValueError):
async_track_utc_time_change( async_track_utc_time_change(
hass, lambda x: specific_runs.append(1), hour="/two" hass, lambda x: specific_runs.append(1), hour="/two"
) )
async_fire_time_changed(hass, datetime(2014, 5, 2, 0, 0, 0)) async_fire_time_changed(hass, datetime(now.year + 1, 5, 2, 0, 0, 0))
await hass.async_block_till_done() await hass.async_block_till_done()
assert len(specific_runs) == 0 assert len(specific_runs) == 0
@@ -857,33 +865,35 @@ async def test_periodic_task_clock_rollback(hass):
"""Test periodic tasks with the time rolling backwards.""" """Test periodic tasks with the time rolling backwards."""
specific_runs = [] specific_runs = []
now = dt_util.utcnow()
unsub = async_track_utc_time_change( unsub = async_track_utc_time_change(
hass, lambda x: specific_runs.append(1), hour="/2", minute=0, second=0 hass, lambda x: specific_runs.append(1), hour="/2", minute=0, second=0
) )
async_fire_time_changed(hass, datetime(2014, 5, 24, 22, 0, 0)) async_fire_time_changed(hass, datetime(now.year + 1, 5, 24, 22, 0, 0))
await hass.async_block_till_done() await hass.async_block_till_done()
assert len(specific_runs) == 1 assert len(specific_runs) == 1
async_fire_time_changed(hass, datetime(2014, 5, 24, 23, 0, 0)) async_fire_time_changed(hass, datetime(now.year + 1, 5, 24, 23, 0, 0))
await hass.async_block_till_done() await hass.async_block_till_done()
assert len(specific_runs) == 1 assert len(specific_runs) == 1
async_fire_time_changed(hass, datetime(2014, 5, 24, 22, 0, 0)) async_fire_time_changed(hass, datetime(now.year + 1, 5, 24, 22, 0, 0))
await hass.async_block_till_done() await hass.async_block_till_done()
assert len(specific_runs) == 2 assert len(specific_runs) == 2
async_fire_time_changed(hass, datetime(2014, 5, 24, 0, 0, 0)) async_fire_time_changed(hass, datetime(now.year + 1, 5, 24, 0, 0, 0))
await hass.async_block_till_done() await hass.async_block_till_done()
assert len(specific_runs) == 3 assert len(specific_runs) == 3
async_fire_time_changed(hass, datetime(2014, 5, 25, 2, 0, 0)) async_fire_time_changed(hass, datetime(now.year + 1, 5, 25, 2, 0, 0))
await hass.async_block_till_done() await hass.async_block_till_done()
assert len(specific_runs) == 4 assert len(specific_runs) == 4
unsub() unsub()
async_fire_time_changed(hass, datetime(2014, 5, 25, 2, 0, 0)) async_fire_time_changed(hass, datetime(now.year + 1, 5, 25, 2, 0, 0))
await hass.async_block_till_done() await hass.async_block_till_done()
assert len(specific_runs) == 4 assert len(specific_runs) == 4
@@ -892,19 +902,21 @@ async def test_periodic_task_duplicate_time(hass):
"""Test periodic tasks not triggering on duplicate time.""" """Test periodic tasks not triggering on duplicate time."""
specific_runs = [] specific_runs = []
now = dt_util.utcnow()
unsub = async_track_utc_time_change( unsub = async_track_utc_time_change(
hass, lambda x: specific_runs.append(1), hour="/2", minute=0, second=0 hass, lambda x: specific_runs.append(1), hour="/2", minute=0, second=0
) )
async_fire_time_changed(hass, datetime(2014, 5, 24, 22, 0, 0)) async_fire_time_changed(hass, datetime(now.year + 1, 5, 24, 22, 0, 0))
await hass.async_block_till_done() await hass.async_block_till_done()
assert len(specific_runs) == 1 assert len(specific_runs) == 1
async_fire_time_changed(hass, datetime(2014, 5, 24, 22, 0, 0)) async_fire_time_changed(hass, datetime(now.year + 1, 5, 24, 22, 0, 0))
await hass.async_block_till_done() await hass.async_block_till_done()
assert len(specific_runs) == 1 assert len(specific_runs) == 1
async_fire_time_changed(hass, datetime(2014, 5, 25, 0, 0, 0)) async_fire_time_changed(hass, datetime(now.year + 1, 5, 25, 0, 0, 0))
await hass.async_block_till_done() await hass.async_block_till_done()
assert len(specific_runs) == 2 assert len(specific_runs) == 2
@@ -917,23 +929,33 @@ async def test_periodic_task_entering_dst(hass):
dt_util.set_default_time_zone(timezone) dt_util.set_default_time_zone(timezone)
specific_runs = [] specific_runs = []
now = dt_util.utcnow()
unsub = async_track_time_change( unsub = async_track_time_change(
hass, lambda x: specific_runs.append(1), hour=2, minute=30, second=0 hass, lambda x: specific_runs.append(1), hour=2, minute=30, second=0
) )
async_fire_time_changed(hass, timezone.localize(datetime(2018, 3, 25, 1, 50, 0))) async_fire_time_changed(
hass, timezone.localize(datetime(now.year + 1, 3, 25, 1, 50, 0))
)
await hass.async_block_till_done() await hass.async_block_till_done()
assert len(specific_runs) == 0 assert len(specific_runs) == 0
async_fire_time_changed(hass, timezone.localize(datetime(2018, 3, 25, 3, 50, 0))) async_fire_time_changed(
hass, timezone.localize(datetime(now.year + 1, 3, 25, 3, 50, 0))
)
await hass.async_block_till_done() await hass.async_block_till_done()
assert len(specific_runs) == 0 assert len(specific_runs) == 0
async_fire_time_changed(hass, timezone.localize(datetime(2018, 3, 26, 1, 50, 0))) async_fire_time_changed(
hass, timezone.localize(datetime(now.year + 1, 3, 26, 1, 50, 0))
)
await hass.async_block_till_done() await hass.async_block_till_done()
assert len(specific_runs) == 0 assert len(specific_runs) == 0
async_fire_time_changed(hass, timezone.localize(datetime(2018, 3, 26, 2, 50, 0))) async_fire_time_changed(
hass, timezone.localize(datetime(now.year + 1, 3, 26, 2, 50, 0))
)
await hass.async_block_till_done() await hass.async_block_till_done()
assert len(specific_runs) == 1 assert len(specific_runs) == 1
@@ -946,30 +968,32 @@ async def test_periodic_task_leaving_dst(hass):
dt_util.set_default_time_zone(timezone) dt_util.set_default_time_zone(timezone)
specific_runs = [] specific_runs = []
now = dt_util.utcnow()
unsub = async_track_time_change( unsub = async_track_time_change(
hass, lambda x: specific_runs.append(1), hour=2, minute=30, second=0 hass, lambda x: specific_runs.append(1), hour=2, minute=30, second=0
) )
async_fire_time_changed( async_fire_time_changed(
hass, timezone.localize(datetime(2018, 10, 28, 2, 5, 0), is_dst=False) hass, timezone.localize(datetime(now.year + 1, 10, 28, 2, 5, 0), is_dst=False)
) )
await hass.async_block_till_done() await hass.async_block_till_done()
assert len(specific_runs) == 0 assert len(specific_runs) == 0
async_fire_time_changed( async_fire_time_changed(
hass, timezone.localize(datetime(2018, 10, 28, 2, 55, 0), is_dst=False) hass, timezone.localize(datetime(now.year + 1, 10, 28, 2, 55, 0), is_dst=False)
) )
await hass.async_block_till_done() await hass.async_block_till_done()
assert len(specific_runs) == 1 assert len(specific_runs) == 1
async_fire_time_changed( async_fire_time_changed(
hass, timezone.localize(datetime(2018, 10, 28, 2, 5, 0), is_dst=True) hass, timezone.localize(datetime(now.year + 1, 10, 28, 2, 5, 0), is_dst=True)
) )
await hass.async_block_till_done() await hass.async_block_till_done()
assert len(specific_runs) == 1 assert len(specific_runs) == 1
async_fire_time_changed( async_fire_time_changed(
hass, timezone.localize(datetime(2018, 10, 28, 2, 55, 0), is_dst=True) hass, timezone.localize(datetime(now.year + 1, 10, 28, 2, 55, 0), is_dst=True)
) )
await hass.async_block_till_done() await hass.async_block_till_done()
assert len(specific_runs) == 2 assert len(specific_runs) == 2