Simplified

This commit is contained in:
Aaron Bach
2019-07-11 12:41:10 -06:00
parent aa0e0a90b0
commit fde4624e07

View File

@ -1,8 +1,11 @@
"""Support for Apache Kafka.""" """Support for Apache Kafka."""
import asyncio
from datetime import datetime from datetime import datetime
import json import json
import logging import logging
from aiokafka import AIOKafkaProducer
from aiokafka.errors import KafkaError
import voluptuous as vol import voluptuous as vol
from homeassistant.const import ( from homeassistant.const import (
@ -28,46 +31,24 @@ CONFIG_SCHEMA = vol.Schema({
}, extra=vol.ALLOW_EXTRA) }, extra=vol.ALLOW_EXTRA)
def setup(hass, config): async def async_setup(hass, config):
"""Activate the Apache Kafka integration.""" """Activate the Apache Kafka integration."""
from aiokafka import AIOKafkaProducer
conf = config[DOMAIN] conf = config[DOMAIN]
topic_name = conf[CONF_TOPIC]
entities_filter = conf[CONF_FILTER]
producer = AIOKafkaProducer( kafka = hass.data[DOMAIN] = KafkaManager(
loop=hass.loop, hass,
bootstrap_servers="{0}:{1}".format( conf[CONF_IP_ADDRESS],
conf[CONF_IP_ADDRESS], conf[CONF_PORT]), conf[CONF_PORT],
compression_type="gzip", conf[CONF_TOPIC],
) conf[CONF_FILTER])
encoder = DateTimeJSONEncoder() hass.bus.listen(EVENT_HOMEASSISTANT_STOP, kafka.shutdown())
async def send_to_pubsub(event): try:
"""Send states to Pub/Sub.""" await kafka.start()
await producer.start() except asyncio.TimeoutError:
_LOGGER.error('Timed out while connecting to Kafka')
state = event.data.get('new_state') return False
if (state is None
or state.state in (STATE_UNKNOWN, '', STATE_UNAVAILABLE)
or not entities_filter(state.entity_id)):
return
as_dict = state.as_dict()
data = json.dumps(
obj=as_dict,
default=encoder.encode
).encode('utf-8')
try:
await producer.send_and_wait(topic_name, data)
finally:
producer.stop()
hass.bus.listen(EVENT_HOMEASSISTANT_STOP, producer.stop())
hass.bus.listen(EVENT_STATE_CHANGED, send_to_pubsub)
return True return True
@ -83,3 +64,51 @@ class DateTimeJSONEncoder(json.JSONEncoder):
if isinstance(o, datetime): if isinstance(o, datetime):
return o.isoformat() return o.isoformat()
return super().default(o) return super().default(o)
class KafkaManager:
"""Define a manager to buffer events to Kafka."""
def __init__(
self,
hass,
ip_address,
port,
topic,
entities_filter):
"""Initialize."""
self._encoder = DateTimeJSONEncoder()
self._entities_filter = entities_filter
self._producer = AIOKafkaProducer(
loop=hass.loop,
bootstrap_servers="{0}:{1}".format(ip_address, port),
compression_type="gzip",
)
self._topic = topic
hass.bus.listen(EVENT_STATE_CHANGED, self._write_to_kafka)
def _encode_event(self, event):
"""Translate events into a binary JSON payload."""
state = event.data.get('new_state')
if (state is None
or state.state in (STATE_UNKNOWN, '', STATE_UNAVAILABLE)
or not self._entities_filter(state.entity_id)):
return
return json.dumps(
obj=state.as_dict(),
default=self._encoder.encode
).encode('utf-8')
async def _write_to_kafka(self, event):
"""Write a binary payload to Kafka."""
await self._producer.send_and_wait(self._topic, event)
async def start(self):
"""Start the Kafka manager."""
asyncio.wait_for(self._producer.start(), timeout=5)
async def shutdown(self):
"""Shut the manager down."""
await self._producer.stop()