Add history/history_during_period websocket endpoint (#71688)

This commit is contained in:
J. Nick Koston
2022-05-11 17:52:22 -05:00
committed by GitHub
parent 81e8d2ab86
commit e2cef55162
8 changed files with 655 additions and 88 deletions

View File

@@ -1,12 +1,12 @@
"""Provide pre-made queries on top of the recorder component.""" """Provide pre-made queries on top of the recorder component."""
from __future__ import annotations from __future__ import annotations
from collections.abc import Iterable from collections.abc import Iterable, MutableMapping
from datetime import datetime as dt, timedelta from datetime import datetime as dt, timedelta
from http import HTTPStatus from http import HTTPStatus
import logging import logging
import time import time
from typing import cast from typing import Any, cast
from aiohttp import web from aiohttp import web
from sqlalchemy import not_, or_ from sqlalchemy import not_, or_
@@ -25,7 +25,7 @@ from homeassistant.components.recorder.statistics import (
) )
from homeassistant.components.recorder.util import session_scope from homeassistant.components.recorder.util import session_scope
from homeassistant.const import CONF_DOMAINS, CONF_ENTITIES, CONF_EXCLUDE, CONF_INCLUDE from homeassistant.const import CONF_DOMAINS, CONF_ENTITIES, CONF_EXCLUDE, CONF_INCLUDE
from homeassistant.core import HomeAssistant from homeassistant.core import HomeAssistant, State
import homeassistant.helpers.config_validation as cv import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.deprecation import deprecated_class, deprecated_function from homeassistant.helpers.deprecation import deprecated_class, deprecated_function
from homeassistant.helpers.entityfilter import ( from homeassistant.helpers.entityfilter import (
@@ -40,6 +40,7 @@ import homeassistant.util.dt as dt_util
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
DOMAIN = "history" DOMAIN = "history"
HISTORY_FILTERS = "history_filters"
CONF_ORDER = "use_include_order" CONF_ORDER = "use_include_order"
GLOB_TO_SQL_CHARS = { GLOB_TO_SQL_CHARS = {
@@ -83,7 +84,9 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the history hooks.""" """Set up the history hooks."""
conf = config.get(DOMAIN, {}) conf = config.get(DOMAIN, {})
filters = sqlalchemy_filter_from_include_exclude_conf(conf) hass.data[HISTORY_FILTERS] = filters = sqlalchemy_filter_from_include_exclude_conf(
conf
)
use_include_order = conf.get(CONF_ORDER) use_include_order = conf.get(CONF_ORDER)
@@ -91,6 +94,7 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
frontend.async_register_built_in_panel(hass, "history", "history", "hass:chart-box") frontend.async_register_built_in_panel(hass, "history", "history", "hass:chart-box")
websocket_api.async_register_command(hass, ws_get_statistics_during_period) websocket_api.async_register_command(hass, ws_get_statistics_during_period)
websocket_api.async_register_command(hass, ws_get_list_statistic_ids) websocket_api.async_register_command(hass, ws_get_list_statistic_ids)
websocket_api.async_register_command(hass, ws_get_history_during_period)
return True return True
@@ -163,6 +167,79 @@ async def ws_get_list_statistic_ids(
connection.send_result(msg["id"], statistic_ids) connection.send_result(msg["id"], statistic_ids)
@websocket_api.websocket_command(
{
vol.Required("type"): "history/history_during_period",
vol.Required("start_time"): str,
vol.Optional("end_time"): str,
vol.Optional("entity_ids"): [str],
vol.Optional("include_start_time_state", default=True): bool,
vol.Optional("significant_changes_only", default=True): bool,
vol.Optional("minimal_response", default=False): bool,
vol.Optional("no_attributes", default=False): bool,
}
)
@websocket_api.async_response
async def ws_get_history_during_period(
hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict
) -> None:
"""Handle history during period websocket command."""
start_time_str = msg["start_time"]
end_time_str = msg.get("end_time")
if start_time := dt_util.parse_datetime(start_time_str):
start_time = dt_util.as_utc(start_time)
else:
connection.send_error(msg["id"], "invalid_start_time", "Invalid start_time")
return
if end_time_str:
if end_time := dt_util.parse_datetime(end_time_str):
end_time = dt_util.as_utc(end_time)
else:
connection.send_error(msg["id"], "invalid_end_time", "Invalid end_time")
return
else:
end_time = None
if start_time > dt_util.utcnow():
connection.send_result(msg["id"], {})
return
entity_ids = msg.get("entity_ids")
include_start_time_state = msg["include_start_time_state"]
if (
not include_start_time_state
and entity_ids
and not _entities_may_have_state_changes_after(hass, entity_ids, start_time)
):
connection.send_result(msg["id"], {})
return
significant_changes_only = msg["significant_changes_only"]
no_attributes = msg["no_attributes"]
minimal_response = msg["minimal_response"]
compressed_state_format = True
history_during_period: MutableMapping[
str, list[State | dict[str, Any]]
] = await get_instance(hass).async_add_executor_job(
history.get_significant_states,
hass,
start_time,
end_time,
entity_ids,
hass.data[HISTORY_FILTERS],
include_start_time_state,
significant_changes_only,
minimal_response,
no_attributes,
compressed_state_format,
)
connection.send_result(msg["id"], history_during_period)
class HistoryPeriodView(HomeAssistantView): class HistoryPeriodView(HomeAssistantView):
"""Handle history period requests.""" """Handle history period requests."""

View File

@@ -2,7 +2,7 @@
from __future__ import annotations from __future__ import annotations
from collections import defaultdict from collections import defaultdict
from collections.abc import Iterable, Iterator, MutableMapping from collections.abc import Callable, Iterable, Iterator, MutableMapping
from datetime import datetime from datetime import datetime
from itertools import groupby from itertools import groupby
import logging import logging
@@ -10,6 +10,7 @@ import time
from typing import Any, cast from typing import Any, cast
from sqlalchemy import Column, Text, and_, bindparam, func, or_ from sqlalchemy import Column, Text, and_, bindparam, func, or_
from sqlalchemy.engine.row import Row
from sqlalchemy.ext import baked from sqlalchemy.ext import baked
from sqlalchemy.ext.baked import BakedQuery from sqlalchemy.ext.baked import BakedQuery
from sqlalchemy.orm.query import Query from sqlalchemy.orm.query import Query
@@ -17,6 +18,10 @@ from sqlalchemy.orm.session import Session
from sqlalchemy.sql.expression import literal from sqlalchemy.sql.expression import literal
from homeassistant.components import recorder from homeassistant.components import recorder
from homeassistant.components.websocket_api.const import (
COMPRESSED_STATE_LAST_CHANGED,
COMPRESSED_STATE_STATE,
)
from homeassistant.core import HomeAssistant, State, split_entity_id from homeassistant.core import HomeAssistant, State, split_entity_id
import homeassistant.util.dt as dt_util import homeassistant.util.dt as dt_util
@@ -25,8 +30,10 @@ from .models import (
RecorderRuns, RecorderRuns,
StateAttributes, StateAttributes,
States, States,
process_datetime_to_timestamp,
process_timestamp, process_timestamp,
process_timestamp_to_utc_isoformat, process_timestamp_to_utc_isoformat,
row_to_compressed_state,
) )
from .util import execute, session_scope from .util import execute, session_scope
@@ -161,6 +168,7 @@ def get_significant_states(
significant_changes_only: bool = True, significant_changes_only: bool = True,
minimal_response: bool = False, minimal_response: bool = False,
no_attributes: bool = False, no_attributes: bool = False,
compressed_state_format: bool = False,
) -> MutableMapping[str, list[State | dict[str, Any]]]: ) -> MutableMapping[str, list[State | dict[str, Any]]]:
"""Wrap get_significant_states_with_session with an sql session.""" """Wrap get_significant_states_with_session with an sql session."""
with session_scope(hass=hass) as session: with session_scope(hass=hass) as session:
@@ -175,6 +183,7 @@ def get_significant_states(
significant_changes_only, significant_changes_only,
minimal_response, minimal_response,
no_attributes, no_attributes,
compressed_state_format,
) )
@@ -199,7 +208,7 @@ def _query_significant_states_with_session(
filters: Any = None, filters: Any = None,
significant_changes_only: bool = True, significant_changes_only: bool = True,
no_attributes: bool = False, no_attributes: bool = False,
) -> list[States]: ) -> list[Row]:
"""Query the database for significant state changes.""" """Query the database for significant state changes."""
if _LOGGER.isEnabledFor(logging.DEBUG): if _LOGGER.isEnabledFor(logging.DEBUG):
timer_start = time.perf_counter() timer_start = time.perf_counter()
@@ -271,6 +280,7 @@ def get_significant_states_with_session(
significant_changes_only: bool = True, significant_changes_only: bool = True,
minimal_response: bool = False, minimal_response: bool = False,
no_attributes: bool = False, no_attributes: bool = False,
compressed_state_format: bool = False,
) -> MutableMapping[str, list[State | dict[str, Any]]]: ) -> MutableMapping[str, list[State | dict[str, Any]]]:
""" """
Return states changes during UTC period start_time - end_time. Return states changes during UTC period start_time - end_time.
@@ -304,6 +314,7 @@ def get_significant_states_with_session(
include_start_time_state, include_start_time_state,
minimal_response, minimal_response,
no_attributes, no_attributes,
compressed_state_format,
) )
@@ -541,7 +552,7 @@ def _get_states_baked_query_for_all(
return baked_query return baked_query
def _get_states_with_session( def _get_rows_with_session(
hass: HomeAssistant, hass: HomeAssistant,
session: Session, session: Session,
utc_point_in_time: datetime, utc_point_in_time: datetime,
@@ -549,7 +560,7 @@ def _get_states_with_session(
run: RecorderRuns | None = None, run: RecorderRuns | None = None,
filters: Any | None = None, filters: Any | None = None,
no_attributes: bool = False, no_attributes: bool = False,
) -> list[State]: ) -> list[Row]:
"""Return the states at a specific point in time.""" """Return the states at a specific point in time."""
if entity_ids and len(entity_ids) == 1: if entity_ids and len(entity_ids) == 1:
return _get_single_entity_states_with_session( return _get_single_entity_states_with_session(
@@ -570,17 +581,13 @@ def _get_states_with_session(
else: else:
baked_query = _get_states_baked_query_for_all(hass, filters, no_attributes) baked_query = _get_states_baked_query_for_all(hass, filters, no_attributes)
attr_cache: dict[str, dict[str, Any]] = {} return execute(
return [ baked_query(session).params(
LazyState(row, attr_cache) run_start=run.start,
for row in execute( utc_point_in_time=utc_point_in_time,
baked_query(session).params( entity_ids=entity_ids,
run_start=run.start,
utc_point_in_time=utc_point_in_time,
entity_ids=entity_ids,
)
) )
] )
def _get_single_entity_states_with_session( def _get_single_entity_states_with_session(
@@ -589,7 +596,7 @@ def _get_single_entity_states_with_session(
utc_point_in_time: datetime, utc_point_in_time: datetime,
entity_id: str, entity_id: str,
no_attributes: bool = False, no_attributes: bool = False,
) -> list[State]: ) -> list[Row]:
# Use an entirely different (and extremely fast) query if we only # Use an entirely different (and extremely fast) query if we only
# have a single entity id # have a single entity id
baked_query, join_attributes = bake_query_and_join_attributes(hass, no_attributes) baked_query, join_attributes = bake_query_and_join_attributes(hass, no_attributes)
@@ -607,19 +614,20 @@ def _get_single_entity_states_with_session(
utc_point_in_time=utc_point_in_time, entity_id=entity_id utc_point_in_time=utc_point_in_time, entity_id=entity_id
) )
return [LazyState(row) for row in execute(query)] return execute(query)
def _sorted_states_to_dict( def _sorted_states_to_dict(
hass: HomeAssistant, hass: HomeAssistant,
session: Session, session: Session,
states: Iterable[States], states: Iterable[Row],
start_time: datetime, start_time: datetime,
entity_ids: list[str] | None, entity_ids: list[str] | None,
filters: Any = None, filters: Any = None,
include_start_time_state: bool = True, include_start_time_state: bool = True,
minimal_response: bool = False, minimal_response: bool = False,
no_attributes: bool = False, no_attributes: bool = False,
compressed_state_format: bool = False,
) -> MutableMapping[str, list[State | dict[str, Any]]]: ) -> MutableMapping[str, list[State | dict[str, Any]]]:
"""Convert SQL results into JSON friendly data structure. """Convert SQL results into JSON friendly data structure.
@@ -632,6 +640,19 @@ def _sorted_states_to_dict(
each list of states, otherwise our graphs won't start on the Y each list of states, otherwise our graphs won't start on the Y
axis correctly. axis correctly.
""" """
if compressed_state_format:
state_class = row_to_compressed_state
_process_timestamp: Callable[
[datetime], float | str
] = process_datetime_to_timestamp
attr_last_changed = COMPRESSED_STATE_LAST_CHANGED
attr_state = COMPRESSED_STATE_STATE
else:
state_class = LazyState # type: ignore[assignment]
_process_timestamp = process_timestamp_to_utc_isoformat
attr_last_changed = LAST_CHANGED_KEY
attr_state = STATE_KEY
result: dict[str, list[State | dict[str, Any]]] = defaultdict(list) result: dict[str, list[State | dict[str, Any]]] = defaultdict(list)
# Set all entity IDs to empty lists in result set to maintain the order # Set all entity IDs to empty lists in result set to maintain the order
if entity_ids is not None: if entity_ids is not None:
@@ -640,27 +661,24 @@ def _sorted_states_to_dict(
# Get the states at the start time # Get the states at the start time
timer_start = time.perf_counter() timer_start = time.perf_counter()
initial_states: dict[str, Row] = {}
if include_start_time_state: if include_start_time_state:
for state in _get_states_with_session( initial_states = {
hass, row.entity_id: row
session, for row in _get_rows_with_session(
start_time, hass,
entity_ids, session,
filters=filters, start_time,
no_attributes=no_attributes, entity_ids,
): filters=filters,
state.last_updated = start_time no_attributes=no_attributes,
state.last_changed = start_time )
result[state.entity_id].append(state) }
if _LOGGER.isEnabledFor(logging.DEBUG): if _LOGGER.isEnabledFor(logging.DEBUG):
elapsed = time.perf_counter() - timer_start elapsed = time.perf_counter() - timer_start
_LOGGER.debug("getting %d first datapoints took %fs", len(result), elapsed) _LOGGER.debug("getting %d first datapoints took %fs", len(result), elapsed)
# Called in a tight loop so cache the function
# here
_process_timestamp_to_utc_isoformat = process_timestamp_to_utc_isoformat
if entity_ids and len(entity_ids) == 1: if entity_ids and len(entity_ids) == 1:
states_iter: Iterable[tuple[str | Column, Iterator[States]]] = ( states_iter: Iterable[tuple[str | Column, Iterator[States]]] = (
(entity_ids[0], iter(states)), (entity_ids[0], iter(states)),
@@ -670,11 +688,15 @@ def _sorted_states_to_dict(
# Append all changes to it # Append all changes to it
for ent_id, group in states_iter: for ent_id, group in states_iter:
ent_results = result[ent_id]
attr_cache: dict[str, dict[str, Any]] = {} attr_cache: dict[str, dict[str, Any]] = {}
prev_state: Column | str
ent_results = result[ent_id]
if row := initial_states.pop(ent_id, None):
prev_state = row.state
ent_results.append(state_class(row, attr_cache, start_time))
if not minimal_response or split_entity_id(ent_id)[0] in NEED_ATTRIBUTE_DOMAINS: if not minimal_response or split_entity_id(ent_id)[0] in NEED_ATTRIBUTE_DOMAINS:
ent_results.extend(LazyState(db_state, attr_cache) for db_state in group) ent_results.extend(state_class(db_state, attr_cache) for db_state in group)
continue continue
# With minimal response we only provide a native # With minimal response we only provide a native
@@ -684,34 +706,35 @@ def _sorted_states_to_dict(
if not ent_results: if not ent_results:
if (first_state := next(group, None)) is None: if (first_state := next(group, None)) is None:
continue continue
ent_results.append(LazyState(first_state, attr_cache)) prev_state = first_state.state
ent_results.append(state_class(first_state, attr_cache))
assert isinstance(ent_results[-1], State)
prev_state: Column | str = ent_results[-1].state
initial_state_count = len(ent_results) initial_state_count = len(ent_results)
row = None
db_state = None for row in group:
for db_state in group:
# With minimal response we do not care about attribute # With minimal response we do not care about attribute
# changes so we can filter out duplicate states # changes so we can filter out duplicate states
if (state := db_state.state) == prev_state: if (state := row.state) == prev_state:
continue continue
ent_results.append( ent_results.append(
{ {
STATE_KEY: state, attr_state: state,
LAST_CHANGED_KEY: _process_timestamp_to_utc_isoformat( attr_last_changed: _process_timestamp(row.last_changed),
db_state.last_changed
),
} }
) )
prev_state = state prev_state = state
if db_state and len(ent_results) != initial_state_count: if row and len(ent_results) != initial_state_count:
# There was at least one state change # There was at least one state change
# replace the last minimal state with # replace the last minimal state with
# a full state # a full state
ent_results[-1] = LazyState(db_state, attr_cache) ent_results[-1] = state_class(row, attr_cache)
# If there are no states beyond the initial state,
# the state a was never popped from initial_states
for ent_id, row in initial_states.items():
result[ent_id].append(state_class(row, {}, start_time))
# Filter out the empty lists if some states had 0 results. # Filter out the empty lists if some states had 0 results.
return {key: val for key, val in result.items() if val} return {key: val for key, val in result.items() if val}

View File

@@ -28,6 +28,12 @@ from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.orm import declarative_base, relationship from sqlalchemy.orm import declarative_base, relationship
from sqlalchemy.orm.session import Session from sqlalchemy.orm.session import Session
from homeassistant.components.websocket_api.const import (
COMPRESSED_STATE_ATTRIBUTES,
COMPRESSED_STATE_LAST_CHANGED,
COMPRESSED_STATE_LAST_UPDATED,
COMPRESSED_STATE_STATE,
)
from homeassistant.const import ( from homeassistant.const import (
MAX_LENGTH_EVENT_CONTEXT_ID, MAX_LENGTH_EVENT_CONTEXT_ID,
MAX_LENGTH_EVENT_EVENT_TYPE, MAX_LENGTH_EVENT_EVENT_TYPE,
@@ -612,6 +618,13 @@ def process_timestamp_to_utc_isoformat(ts: datetime | None) -> str | None:
return ts.astimezone(dt_util.UTC).isoformat() return ts.astimezone(dt_util.UTC).isoformat()
def process_datetime_to_timestamp(ts: datetime) -> float:
"""Process a timestamp into a unix timestamp."""
if ts.tzinfo == dt_util.UTC:
return ts.timestamp()
return ts.replace(tzinfo=dt_util.UTC).timestamp()
class LazyState(State): class LazyState(State):
"""A lazy version of core State.""" """A lazy version of core State."""
@@ -621,45 +634,30 @@ class LazyState(State):
"_last_changed", "_last_changed",
"_last_updated", "_last_updated",
"_context", "_context",
"_attr_cache", "attr_cache",
] ]
def __init__( # pylint: disable=super-init-not-called def __init__( # pylint: disable=super-init-not-called
self, row: Row, attr_cache: dict[str, dict[str, Any]] | None = None self,
row: Row,
attr_cache: dict[str, dict[str, Any]],
start_time: datetime | None = None,
) -> None: ) -> None:
"""Init the lazy state.""" """Init the lazy state."""
self._row = row self._row = row
self.entity_id: str = self._row.entity_id self.entity_id: str = self._row.entity_id
self.state = self._row.state or "" self.state = self._row.state or ""
self._attributes: dict[str, Any] | None = None self._attributes: dict[str, Any] | None = None
self._last_changed: datetime | None = None self._last_changed: datetime | None = start_time
self._last_updated: datetime | None = None self._last_updated: datetime | None = start_time
self._context: Context | None = None self._context: Context | None = None
self._attr_cache = attr_cache self.attr_cache = attr_cache
@property # type: ignore[override] @property # type: ignore[override]
def attributes(self) -> dict[str, Any]: # type: ignore[override] def attributes(self) -> dict[str, Any]: # type: ignore[override]
"""State attributes.""" """State attributes."""
if self._attributes is None: if self._attributes is None:
source = self._row.shared_attrs or self._row.attributes self._attributes = decode_attributes_from_row(self._row, self.attr_cache)
if self._attr_cache is not None and (
attributes := self._attr_cache.get(source)
):
self._attributes = attributes
return attributes
if source == EMPTY_JSON_OBJECT or source is None:
self._attributes = {}
return self._attributes
try:
self._attributes = json.loads(source)
except ValueError:
# When json.loads fails
_LOGGER.exception(
"Error converting row to state attributes: %s", self._row
)
self._attributes = {}
if self._attr_cache is not None:
self._attr_cache[source] = self._attributes
return self._attributes return self._attributes
@attributes.setter @attributes.setter
@@ -748,3 +746,48 @@ class LazyState(State):
and self.state == other.state and self.state == other.state
and self.attributes == other.attributes and self.attributes == other.attributes
) )
def decode_attributes_from_row(
row: Row, attr_cache: dict[str, dict[str, Any]]
) -> dict[str, Any]:
"""Decode attributes from a database row."""
source: str = row.shared_attrs or row.attributes
if (attributes := attr_cache.get(source)) is not None:
return attributes
if not source or source == EMPTY_JSON_OBJECT:
return {}
try:
attr_cache[source] = attributes = json.loads(source)
except ValueError:
_LOGGER.exception("Error converting row to state attributes: %s", source)
attr_cache[source] = attributes = {}
return attributes
def row_to_compressed_state(
row: Row,
attr_cache: dict[str, dict[str, Any]],
start_time: datetime | None = None,
) -> dict[str, Any]:
"""Convert a database row to a compressed state."""
if start_time:
last_changed = last_updated = start_time.timestamp()
else:
row_changed_changed: datetime = row.last_changed
if (
not (row_last_updated := row.last_updated)
or row_last_updated == row_changed_changed
):
last_changed = last_updated = process_datetime_to_timestamp(
row_changed_changed
)
else:
last_changed = process_datetime_to_timestamp(row_changed_changed)
last_updated = process_datetime_to_timestamp(row_last_updated)
return {
COMPRESSED_STATE_STATE: row.state,
COMPRESSED_STATE_ATTRIBUTES: decode_attributes_from_row(row, attr_cache),
COMPRESSED_STATE_LAST_CHANGED: last_changed,
COMPRESSED_STATE_LAST_UPDATED: last_updated,
}

View File

@@ -56,3 +56,9 @@ DATA_CONNECTIONS: Final = f"{DOMAIN}.connections"
JSON_DUMP: Final = partial( JSON_DUMP: Final = partial(
json.dumps, cls=JSONEncoder, allow_nan=False, separators=(",", ":") json.dumps, cls=JSONEncoder, allow_nan=False, separators=(",", ":")
) )
COMPRESSED_STATE_STATE = "s"
COMPRESSED_STATE_ATTRIBUTES = "a"
COMPRESSED_STATE_CONTEXT = "c"
COMPRESSED_STATE_LAST_CHANGED = "lc"
COMPRESSED_STATE_LAST_UPDATED = "lu"

View File

@@ -16,6 +16,13 @@ from homeassistant.util.json import (
from homeassistant.util.yaml.loader import JSON_TYPE from homeassistant.util.yaml.loader import JSON_TYPE
from . import const from . import const
from .const import (
COMPRESSED_STATE_ATTRIBUTES,
COMPRESSED_STATE_CONTEXT,
COMPRESSED_STATE_LAST_CHANGED,
COMPRESSED_STATE_LAST_UPDATED,
COMPRESSED_STATE_STATE,
)
_LOGGER: Final = logging.getLogger(__name__) _LOGGER: Final = logging.getLogger(__name__)
@@ -31,12 +38,6 @@ BASE_COMMAND_MESSAGE_SCHEMA: Final = vol.Schema({vol.Required("id"): cv.positive
IDEN_TEMPLATE: Final = "__IDEN__" IDEN_TEMPLATE: Final = "__IDEN__"
IDEN_JSON_TEMPLATE: Final = '"__IDEN__"' IDEN_JSON_TEMPLATE: Final = '"__IDEN__"'
COMPRESSED_STATE_STATE = "s"
COMPRESSED_STATE_ATTRIBUTES = "a"
COMPRESSED_STATE_CONTEXT = "c"
COMPRESSED_STATE_LAST_CHANGED = "lc"
COMPRESSED_STATE_LAST_UPDATED = "lu"
STATE_DIFF_ADDITIONS = "+" STATE_DIFF_ADDITIONS = "+"
STATE_DIFF_REMOVALS = "-" STATE_DIFF_REMOVALS = "-"

View File

@@ -1070,3 +1070,409 @@ async def test_list_statistic_ids(
response = await client.receive_json() response = await client.receive_json()
assert response["success"] assert response["success"]
assert response["result"] == [] assert response["result"] == []
async def test_history_during_period(hass, hass_ws_client, recorder_mock):
"""Test history_during_period."""
now = dt_util.utcnow()
await async_setup_component(hass, "history", {})
await async_setup_component(hass, "sensor", {})
await async_recorder_block_till_done(hass)
hass.states.async_set("sensor.test", "on", attributes={"any": "attr"})
await async_recorder_block_till_done(hass)
hass.states.async_set("sensor.test", "off", attributes={"any": "attr"})
await async_recorder_block_till_done(hass)
hass.states.async_set("sensor.test", "off", attributes={"any": "changed"})
await async_recorder_block_till_done(hass)
hass.states.async_set("sensor.test", "off", attributes={"any": "again"})
await async_recorder_block_till_done(hass)
hass.states.async_set("sensor.test", "on", attributes={"any": "attr"})
await async_wait_recording_done(hass)
do_adhoc_statistics(hass, start=now)
await async_wait_recording_done(hass)
client = await hass_ws_client()
await client.send_json(
{
"id": 1,
"type": "history/history_during_period",
"start_time": now.isoformat(),
"end_time": now.isoformat(),
"entity_ids": ["sensor.test"],
"include_start_time_state": True,
"significant_changes_only": False,
"no_attributes": True,
}
)
response = await client.receive_json()
assert response["success"]
assert response["result"] == {}
await client.send_json(
{
"id": 2,
"type": "history/history_during_period",
"start_time": now.isoformat(),
"entity_ids": ["sensor.test"],
"include_start_time_state": True,
"significant_changes_only": False,
"no_attributes": True,
"minimal_response": True,
}
)
response = await client.receive_json()
assert response["success"]
assert response["id"] == 2
sensor_test_history = response["result"]["sensor.test"]
assert len(sensor_test_history) == 3
assert sensor_test_history[0]["s"] == "on"
assert sensor_test_history[0]["a"] == {}
assert isinstance(sensor_test_history[0]["lu"], float)
assert isinstance(sensor_test_history[0]["lc"], float)
assert "a" not in sensor_test_history[1]
assert sensor_test_history[1]["s"] == "off"
assert isinstance(sensor_test_history[1]["lc"], float)
assert sensor_test_history[2]["s"] == "on"
assert sensor_test_history[2]["a"] == {}
await client.send_json(
{
"id": 3,
"type": "history/history_during_period",
"start_time": now.isoformat(),
"entity_ids": ["sensor.test"],
"include_start_time_state": True,
"significant_changes_only": False,
"no_attributes": False,
}
)
response = await client.receive_json()
assert response["success"]
assert response["id"] == 3
sensor_test_history = response["result"]["sensor.test"]
assert len(sensor_test_history) == 5
assert sensor_test_history[0]["s"] == "on"
assert sensor_test_history[0]["a"] == {"any": "attr"}
assert isinstance(sensor_test_history[0]["lu"], float)
assert isinstance(sensor_test_history[0]["lc"], float)
assert sensor_test_history[1]["s"] == "off"
assert isinstance(sensor_test_history[1]["lc"], float)
assert sensor_test_history[1]["a"] == {"any": "attr"}
assert sensor_test_history[4]["s"] == "on"
assert sensor_test_history[4]["a"] == {"any": "attr"}
await client.send_json(
{
"id": 4,
"type": "history/history_during_period",
"start_time": now.isoformat(),
"entity_ids": ["sensor.test"],
"include_start_time_state": True,
"significant_changes_only": True,
"no_attributes": False,
}
)
response = await client.receive_json()
assert response["success"]
assert response["id"] == 4
sensor_test_history = response["result"]["sensor.test"]
assert len(sensor_test_history) == 3
assert sensor_test_history[0]["s"] == "on"
assert sensor_test_history[0]["a"] == {"any": "attr"}
assert isinstance(sensor_test_history[0]["lu"], float)
assert isinstance(sensor_test_history[0]["lc"], float)
assert sensor_test_history[1]["s"] == "off"
assert isinstance(sensor_test_history[1]["lc"], float)
assert sensor_test_history[1]["a"] == {"any": "attr"}
assert sensor_test_history[2]["s"] == "on"
assert sensor_test_history[2]["a"] == {"any": "attr"}
async def test_history_during_period_impossible_conditions(
hass, hass_ws_client, recorder_mock
):
"""Test history_during_period returns when condition cannot be true."""
now = dt_util.utcnow()
await async_setup_component(hass, "history", {})
await async_setup_component(hass, "sensor", {})
await async_recorder_block_till_done(hass)
hass.states.async_set("sensor.test", "on", attributes={"any": "attr"})
await async_recorder_block_till_done(hass)
hass.states.async_set("sensor.test", "off", attributes={"any": "attr"})
await async_recorder_block_till_done(hass)
hass.states.async_set("sensor.test", "off", attributes={"any": "changed"})
await async_recorder_block_till_done(hass)
hass.states.async_set("sensor.test", "off", attributes={"any": "again"})
await async_recorder_block_till_done(hass)
hass.states.async_set("sensor.test", "on", attributes={"any": "attr"})
await async_wait_recording_done(hass)
do_adhoc_statistics(hass, start=now)
await async_wait_recording_done(hass)
after = dt_util.utcnow()
client = await hass_ws_client()
await client.send_json(
{
"id": 1,
"type": "history/history_during_period",
"start_time": after.isoformat(),
"end_time": after.isoformat(),
"entity_ids": ["sensor.test"],
"include_start_time_state": False,
"significant_changes_only": False,
"no_attributes": True,
}
)
response = await client.receive_json()
assert response["success"]
assert response["id"] == 1
assert response["result"] == {}
future = dt_util.utcnow() + timedelta(hours=10)
await client.send_json(
{
"id": 2,
"type": "history/history_during_period",
"start_time": future.isoformat(),
"entity_ids": ["sensor.test"],
"include_start_time_state": True,
"significant_changes_only": True,
"no_attributes": True,
}
)
response = await client.receive_json()
assert response["success"]
assert response["id"] == 2
assert response["result"] == {}
@pytest.mark.parametrize(
"time_zone", ["UTC", "Europe/Berlin", "America/Chicago", "US/Hawaii"]
)
async def test_history_during_period_significant_domain(
time_zone, hass, hass_ws_client, recorder_mock
):
"""Test history_during_period with climate domain."""
hass.config.set_time_zone(time_zone)
now = dt_util.utcnow()
await async_setup_component(hass, "history", {})
await async_setup_component(hass, "sensor", {})
await async_recorder_block_till_done(hass)
hass.states.async_set("climate.test", "on", attributes={"temperature": "1"})
await async_recorder_block_till_done(hass)
hass.states.async_set("climate.test", "off", attributes={"temperature": "2"})
await async_recorder_block_till_done(hass)
hass.states.async_set("climate.test", "off", attributes={"temperature": "3"})
await async_recorder_block_till_done(hass)
hass.states.async_set("climate.test", "off", attributes={"temperature": "4"})
await async_recorder_block_till_done(hass)
hass.states.async_set("climate.test", "on", attributes={"temperature": "5"})
await async_wait_recording_done(hass)
do_adhoc_statistics(hass, start=now)
await async_wait_recording_done(hass)
client = await hass_ws_client()
await client.send_json(
{
"id": 1,
"type": "history/history_during_period",
"start_time": now.isoformat(),
"end_time": now.isoformat(),
"entity_ids": ["climate.test"],
"include_start_time_state": True,
"significant_changes_only": False,
"no_attributes": True,
}
)
response = await client.receive_json()
assert response["success"]
assert response["result"] == {}
await client.send_json(
{
"id": 2,
"type": "history/history_during_period",
"start_time": now.isoformat(),
"entity_ids": ["climate.test"],
"include_start_time_state": True,
"significant_changes_only": False,
"no_attributes": True,
"minimal_response": True,
}
)
response = await client.receive_json()
assert response["success"]
assert response["id"] == 2
sensor_test_history = response["result"]["climate.test"]
assert len(sensor_test_history) == 5
assert sensor_test_history[0]["s"] == "on"
assert sensor_test_history[0]["a"] == {}
assert isinstance(sensor_test_history[0]["lu"], float)
assert isinstance(sensor_test_history[0]["lc"], float)
assert "a" in sensor_test_history[1]
assert sensor_test_history[1]["s"] == "off"
assert isinstance(sensor_test_history[1]["lc"], float)
assert sensor_test_history[4]["s"] == "on"
assert sensor_test_history[4]["a"] == {}
await client.send_json(
{
"id": 3,
"type": "history/history_during_period",
"start_time": now.isoformat(),
"entity_ids": ["climate.test"],
"include_start_time_state": True,
"significant_changes_only": False,
"no_attributes": False,
}
)
response = await client.receive_json()
assert response["success"]
assert response["id"] == 3
sensor_test_history = response["result"]["climate.test"]
assert len(sensor_test_history) == 5
assert sensor_test_history[0]["s"] == "on"
assert sensor_test_history[0]["a"] == {"temperature": "1"}
assert isinstance(sensor_test_history[0]["lu"], float)
assert isinstance(sensor_test_history[0]["lc"], float)
assert sensor_test_history[1]["s"] == "off"
assert isinstance(sensor_test_history[1]["lc"], float)
assert sensor_test_history[1]["a"] == {"temperature": "2"}
assert sensor_test_history[4]["s"] == "on"
assert sensor_test_history[4]["a"] == {"temperature": "5"}
await client.send_json(
{
"id": 4,
"type": "history/history_during_period",
"start_time": now.isoformat(),
"entity_ids": ["climate.test"],
"include_start_time_state": True,
"significant_changes_only": True,
"no_attributes": False,
}
)
response = await client.receive_json()
assert response["success"]
assert response["id"] == 4
sensor_test_history = response["result"]["climate.test"]
assert len(sensor_test_history) == 5
assert sensor_test_history[0]["s"] == "on"
assert sensor_test_history[0]["a"] == {"temperature": "1"}
assert isinstance(sensor_test_history[0]["lu"], float)
assert isinstance(sensor_test_history[0]["lc"], float)
assert sensor_test_history[1]["s"] == "off"
assert isinstance(sensor_test_history[1]["lc"], float)
assert sensor_test_history[1]["a"] == {"temperature": "2"}
assert sensor_test_history[2]["s"] == "off"
assert sensor_test_history[2]["a"] == {"temperature": "3"}
assert sensor_test_history[3]["s"] == "off"
assert sensor_test_history[3]["a"] == {"temperature": "4"}
assert sensor_test_history[4]["s"] == "on"
assert sensor_test_history[4]["a"] == {"temperature": "5"}
# Test we impute the state time state
later = dt_util.utcnow()
await client.send_json(
{
"id": 5,
"type": "history/history_during_period",
"start_time": later.isoformat(),
"entity_ids": ["climate.test"],
"include_start_time_state": True,
"significant_changes_only": True,
"no_attributes": False,
}
)
response = await client.receive_json()
assert response["success"]
assert response["id"] == 5
sensor_test_history = response["result"]["climate.test"]
assert len(sensor_test_history) == 1
assert sensor_test_history[0]["s"] == "on"
assert sensor_test_history[0]["a"] == {"temperature": "5"}
assert sensor_test_history[0]["lu"] == later.timestamp()
assert sensor_test_history[0]["lc"] == later.timestamp()
async def test_history_during_period_bad_start_time(
hass, hass_ws_client, recorder_mock
):
"""Test history_during_period bad state time."""
await async_setup_component(
hass,
"history",
{"history": {}},
)
client = await hass_ws_client()
await client.send_json(
{
"id": 1,
"type": "history/history_during_period",
"start_time": "cats",
}
)
response = await client.receive_json()
assert not response["success"]
assert response["error"]["code"] == "invalid_start_time"
async def test_history_during_period_bad_end_time(hass, hass_ws_client, recorder_mock):
"""Test history_during_period bad end time."""
now = dt_util.utcnow()
await async_setup_component(
hass,
"history",
{"history": {}},
)
client = await hass_ws_client()
await client.send_json(
{
"id": 1,
"type": "history/history_during_period",
"start_time": now.isoformat(),
"end_time": "dogs",
}
)
response = await client.receive_json()
assert not response["success"]
assert response["error"]["code"] == "invalid_end_time"

View File

@@ -14,6 +14,7 @@ from homeassistant.components import recorder
from homeassistant.components.recorder import history from homeassistant.components.recorder import history
from homeassistant.components.recorder.models import ( from homeassistant.components.recorder.models import (
Events, Events,
LazyState,
RecorderRuns, RecorderRuns,
StateAttributes, StateAttributes,
States, States,
@@ -40,9 +41,19 @@ async def _async_get_states(
def _get_states_with_session(): def _get_states_with_session():
with session_scope(hass=hass) as session: with session_scope(hass=hass) as session:
return history._get_states_with_session( attr_cache = {}
hass, session, utc_point_in_time, entity_ids, run, None, no_attributes return [
) LazyState(row, attr_cache)
for row in history._get_rows_with_session(
hass,
session,
utc_point_in_time,
entity_ids,
run,
None,
no_attributes,
)
]
return await recorder.get_instance(hass).async_add_executor_job( return await recorder.get_instance(hass).async_add_executor_job(
_get_states_with_session _get_states_with_session

View File

@@ -247,7 +247,7 @@ async def test_lazy_state_handles_include_json(caplog):
entity_id="sensor.invalid", entity_id="sensor.invalid",
shared_attrs="{INVALID_JSON}", shared_attrs="{INVALID_JSON}",
) )
assert LazyState(row).attributes == {} assert LazyState(row, {}).attributes == {}
assert "Error converting row to state attributes" in caplog.text assert "Error converting row to state attributes" in caplog.text
@@ -258,7 +258,7 @@ async def test_lazy_state_prefers_shared_attrs_over_attrs(caplog):
shared_attrs='{"shared":true}', shared_attrs='{"shared":true}',
attributes='{"shared":false}', attributes='{"shared":false}',
) )
assert LazyState(row).attributes == {"shared": True} assert LazyState(row, {}).attributes == {"shared": True}
async def test_lazy_state_handles_different_last_updated_and_last_changed(caplog): async def test_lazy_state_handles_different_last_updated_and_last_changed(caplog):
@@ -271,7 +271,7 @@ async def test_lazy_state_handles_different_last_updated_and_last_changed(caplog
last_updated=now, last_updated=now,
last_changed=now - timedelta(seconds=60), last_changed=now - timedelta(seconds=60),
) )
lstate = LazyState(row) lstate = LazyState(row, {})
assert lstate.as_dict() == { assert lstate.as_dict() == {
"attributes": {"shared": True}, "attributes": {"shared": True},
"entity_id": "sensor.valid", "entity_id": "sensor.valid",
@@ -300,7 +300,7 @@ async def test_lazy_state_handles_same_last_updated_and_last_changed(caplog):
last_updated=now, last_updated=now,
last_changed=now, last_changed=now,
) )
lstate = LazyState(row) lstate = LazyState(row, {})
assert lstate.as_dict() == { assert lstate.as_dict() == {
"attributes": {"shared": True}, "attributes": {"shared": True},
"entity_id": "sensor.valid", "entity_id": "sensor.valid",