diff --git a/homeassistant/components/apache_kafka/__init__.py b/homeassistant/components/apache_kafka/__init__.py index 166982d9496..f78d25d0639 100644 --- a/homeassistant/components/apache_kafka/__init__.py +++ b/homeassistant/components/apache_kafka/__init__.py @@ -1,11 +1,8 @@ """Support for Apache Kafka.""" -import asyncio from datetime import datetime import json import logging -from aiokafka import AIOKafkaProducer -from aiokafka.errors import KafkaError import voluptuous as vol from homeassistant.const import ( @@ -31,24 +28,46 @@ CONFIG_SCHEMA = vol.Schema({ }, extra=vol.ALLOW_EXTRA) -async def async_setup(hass, config): +def setup(hass, config): """Activate the Apache Kafka integration.""" + from aiokafka import AIOKafkaProducer + conf = config[DOMAIN] + topic_name = conf[CONF_TOPIC] + entities_filter = conf[CONF_FILTER] - kafka = hass.data[DOMAIN] = KafkaManager( - hass, - conf[CONF_IP_ADDRESS], - conf[CONF_PORT], - conf[CONF_TOPIC], - conf[CONF_FILTER]) + producer = AIOKafkaProducer( + loop=hass.loop, + bootstrap_servers="{0}:{1}".format( + conf[CONF_IP_ADDRESS], conf[CONF_PORT]), + compression_type="gzip", + ) - hass.bus.listen(EVENT_HOMEASSISTANT_STOP, kafka.shutdown()) + encoder = DateTimeJSONEncoder() - try: - await kafka.start() - except asyncio.TimeoutError: - _LOGGER.error('Timed out while connecting to Kafka') - return False + async def send_to_pubsub(event): + """Send states to Pub/Sub.""" + await producer.start() + + state = event.data.get('new_state') + if (state is None + or state.state in (STATE_UNKNOWN, '', STATE_UNAVAILABLE) + or not entities_filter(state.entity_id)): + return + + as_dict = state.as_dict() + data = json.dumps( + obj=as_dict, + default=encoder.encode + ).encode('utf-8') + + try: + await producer.send_and_wait(topic_name, data) + finally: + producer.stop() + + hass.bus.listen(EVENT_HOMEASSISTANT_STOP, producer.stop()) + hass.bus.listen(EVENT_STATE_CHANGED, send_to_pubsub) return True @@ -64,51 +83,3 @@ class DateTimeJSONEncoder(json.JSONEncoder): if isinstance(o, datetime): return o.isoformat() return super().default(o) - - -class KafkaManager: - """Define a manager to buffer events to Kafka.""" - - def __init__( - self, - hass, - ip_address, - port, - topic, - entities_filter): - """Initialize.""" - self._encoder = DateTimeJSONEncoder() - self._entities_filter = entities_filter - self._producer = AIOKafkaProducer( - loop=hass.loop, - bootstrap_servers="{0}:{1}".format(ip_address, port), - compression_type="gzip", - ) - self._topic = topic - - hass.bus.listen(EVENT_STATE_CHANGED, self._write_to_kafka) - - def _encode_event(self, event): - """Translate events into a binary JSON payload.""" - state = event.data.get('new_state') - if (state is None - or state.state in (STATE_UNKNOWN, '', STATE_UNAVAILABLE) - or not self._entities_filter(state.entity_id)): - return - - return json.dumps( - obj=state.as_dict(), - default=self._encoder.encode - ).encode('utf-8') - - async def _write_to_kafka(self, event): - """Write a binary payload to Kafka.""" - await self._producer.send_and_wait(self._topic, event) - - async def start(self): - """Start the Kafka manager.""" - asyncio.wait_for(self._producer.start(), timeout=5) - - async def shutdown(self): - """Shut the manager down.""" - await self._producer.stop()