mirror of
https://github.com/home-assistant/core.git
synced 2026-04-20 08:29:39 +02:00
Make influxdb batch settings configurable (#134758)
Co-authored-by: Erik Montnemery <erik@montnemery.com>
This commit is contained in:
@@ -54,14 +54,15 @@ from homeassistant.helpers.typing import ConfigType
|
||||
|
||||
from .const import (
|
||||
API_VERSION_2,
|
||||
BATCH_BUFFER_SIZE,
|
||||
BATCH_TIMEOUT,
|
||||
CATCHING_UP_MESSAGE,
|
||||
CLIENT_ERROR_V1,
|
||||
CLIENT_ERROR_V2,
|
||||
CODE_INVALID_INPUTS,
|
||||
COMPONENT_CONFIG_SCHEMA_BATCH,
|
||||
COMPONENT_CONFIG_SCHEMA_CONNECTION,
|
||||
CONF_API_VERSION,
|
||||
CONF_BATCH_BUFFER_SIZE,
|
||||
CONF_BATCH_TIMEOUT,
|
||||
CONF_BUCKET,
|
||||
CONF_COMPONENT_CONFIG,
|
||||
CONF_COMPONENT_CONFIG_DOMAIN,
|
||||
@@ -193,7 +194,12 @@ _INFLUX_BASE_SCHEMA = INCLUDE_EXCLUDE_BASE_FILTER_SCHEMA.extend(
|
||||
)
|
||||
|
||||
INFLUX_SCHEMA = vol.All(
|
||||
_INFLUX_BASE_SCHEMA.extend(COMPONENT_CONFIG_SCHEMA_CONNECTION),
|
||||
_INFLUX_BASE_SCHEMA.extend(
|
||||
{
|
||||
**COMPONENT_CONFIG_SCHEMA_CONNECTION,
|
||||
**COMPONENT_CONFIG_SCHEMA_BATCH,
|
||||
}
|
||||
),
|
||||
validate_version_specific_config,
|
||||
create_influx_url,
|
||||
)
|
||||
@@ -496,7 +502,9 @@ def setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
||||
|
||||
event_to_json = _generate_event_to_json(conf)
|
||||
max_tries = conf.get(CONF_RETRY_COUNT)
|
||||
instance = hass.data[DOMAIN] = InfluxThread(hass, influx, event_to_json, max_tries)
|
||||
instance = hass.data[DOMAIN] = InfluxThread(
|
||||
hass, influx, event_to_json, max_tries, conf
|
||||
)
|
||||
instance.start()
|
||||
|
||||
def shutdown(event):
|
||||
@@ -513,7 +521,7 @@ def setup(hass: HomeAssistant, config: ConfigType) -> bool:
|
||||
class InfluxThread(threading.Thread):
|
||||
"""A threaded event handler class."""
|
||||
|
||||
def __init__(self, hass, influx, event_to_json, max_tries):
|
||||
def __init__(self, hass, influx, event_to_json, max_tries, config):
|
||||
"""Initialize the listener."""
|
||||
threading.Thread.__init__(self, name=DOMAIN)
|
||||
self.queue: queue.SimpleQueue[threading.Event | tuple[float, Event] | None] = (
|
||||
@@ -524,6 +532,8 @@ class InfluxThread(threading.Thread):
|
||||
self.max_tries = max_tries
|
||||
self.write_errors = 0
|
||||
self.shutdown = False
|
||||
self._batch_timeout = config[CONF_BATCH_TIMEOUT]
|
||||
self.batch_buffer_size = config[CONF_BATCH_BUFFER_SIZE]
|
||||
hass.bus.listen(EVENT_STATE_CHANGED, self._event_listener)
|
||||
|
||||
@callback
|
||||
@@ -532,23 +542,31 @@ class InfluxThread(threading.Thread):
|
||||
item = (time.monotonic(), event)
|
||||
self.queue.put(item)
|
||||
|
||||
@staticmethod
|
||||
def batch_timeout():
|
||||
@property
|
||||
def batch_timeout(self):
|
||||
"""Return number of seconds to wait for more events."""
|
||||
return BATCH_TIMEOUT
|
||||
return self._batch_timeout
|
||||
|
||||
def get_events_json(self):
|
||||
"""Return a batch of events formatted for writing."""
|
||||
queue_seconds = QUEUE_BACKLOG_SECONDS + self.max_tries * RETRY_DELAY
|
||||
start_time = time.monotonic()
|
||||
batch_timeout = self.batch_timeout()
|
||||
|
||||
count = 0
|
||||
json = []
|
||||
|
||||
dropped = 0
|
||||
|
||||
with suppress(queue.Empty):
|
||||
while len(json) < BATCH_BUFFER_SIZE and not self.shutdown:
|
||||
timeout = None if count == 0 else self.batch_timeout()
|
||||
while len(json) < self.batch_buffer_size and not self.shutdown:
|
||||
if count > 0 and time.monotonic() - start_time >= batch_timeout:
|
||||
break
|
||||
|
||||
timeout = (
|
||||
None
|
||||
if count == 0
|
||||
else batch_timeout - (time.monotonic() - start_time)
|
||||
)
|
||||
item = self.queue.get(timeout=timeout)
|
||||
count += 1
|
||||
|
||||
|
||||
@@ -47,6 +47,9 @@ CONF_FUNCTION = "function"
|
||||
CONF_QUERY = "query"
|
||||
CONF_IMPORTS = "imports"
|
||||
|
||||
CONF_BATCH_BUFFER_SIZE = "batch_buffer_size"
|
||||
CONF_BATCH_TIMEOUT = "batch_timeout"
|
||||
|
||||
DEFAULT_DATABASE = "home_assistant"
|
||||
DEFAULT_HOST_V2 = "us-west-2-1.aws.cloud2.influxdata.com"
|
||||
DEFAULT_SSL_V2 = True
|
||||
@@ -60,6 +63,9 @@ DEFAULT_RANGE_STOP = "now()"
|
||||
DEFAULT_FUNCTION_FLUX = "|> limit(n: 1)"
|
||||
DEFAULT_MEASUREMENT_ATTR = "unit_of_measurement"
|
||||
|
||||
DEFAULT_BATCH_BUFFER_SIZE = 100
|
||||
DEFAULT_BATCH_TIMEOUT = 1
|
||||
|
||||
INFLUX_CONF_MEASUREMENT = "measurement"
|
||||
INFLUX_CONF_TAGS = "tags"
|
||||
INFLUX_CONF_TIME = "time"
|
||||
@@ -76,8 +82,6 @@ TIMEOUT = 10 # seconds
|
||||
RETRY_DELAY = 20
|
||||
QUEUE_BACKLOG_SECONDS = 30
|
||||
RETRY_INTERVAL = 60 # seconds
|
||||
BATCH_TIMEOUT = 1
|
||||
BATCH_BUFFER_SIZE = 100
|
||||
LANGUAGE_INFLUXQL = "influxQL"
|
||||
LANGUAGE_FLUX = "flux"
|
||||
TEST_QUERY_V1 = "SHOW DATABASES;"
|
||||
@@ -152,3 +156,10 @@ COMPONENT_CONFIG_SCHEMA_CONNECTION = {
|
||||
vol.Inclusive(CONF_ORG, "v2_authentication"): cv.string,
|
||||
vol.Optional(CONF_BUCKET, default=DEFAULT_BUCKET): cv.string,
|
||||
}
|
||||
|
||||
COMPONENT_CONFIG_SCHEMA_BATCH = {
|
||||
vol.Optional(
|
||||
CONF_BATCH_BUFFER_SIZE, default=DEFAULT_BATCH_BUFFER_SIZE
|
||||
): cv.positive_int,
|
||||
vol.Optional(CONF_BATCH_TIMEOUT, default=DEFAULT_BATCH_TIMEOUT): cv.positive_float,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user