From aa0e0a90b0a73c02654a73e531291fb0d1087d78 Mon Sep 17 00:00:00 2001 From: Aaron Bach Date: Sun, 23 Jun 2019 15:04:23 -0600 Subject: [PATCH] Add support for Apache Kafka --- .../components/apache_kafka/__init__.py | 85 +++++++++++++++++++ .../components/apache_kafka/manifest.json | 10 +++ 2 files changed, 95 insertions(+) create mode 100644 homeassistant/components/apache_kafka/__init__.py create mode 100644 homeassistant/components/apache_kafka/manifest.json diff --git a/homeassistant/components/apache_kafka/__init__.py b/homeassistant/components/apache_kafka/__init__.py new file mode 100644 index 00000000000..f78d25d0639 --- /dev/null +++ b/homeassistant/components/apache_kafka/__init__.py @@ -0,0 +1,85 @@ +"""Support for Apache Kafka.""" +from datetime import datetime +import json +import logging + +import voluptuous as vol + +from homeassistant.const import ( + CONF_IP_ADDRESS, CONF_PORT, EVENT_HOMEASSISTANT_STOP, EVENT_STATE_CHANGED, + STATE_UNAVAILABLE, STATE_UNKNOWN) +import homeassistant.helpers.config_validation as cv +from homeassistant.helpers.entityfilter import FILTER_SCHEMA + +_LOGGER = logging.getLogger(__name__) + +DOMAIN = 'apache_kafka' + +CONF_FILTER = 'filter' +CONF_TOPIC = 'topic' + +CONFIG_SCHEMA = vol.Schema({ + DOMAIN: vol.Schema({ + vol.Required(CONF_IP_ADDRESS): cv.string, + vol.Required(CONF_PORT): cv.port, + vol.Required(CONF_TOPIC): cv.string, + vol.Optional(CONF_FILTER, default={}): FILTER_SCHEMA, + }), +}, extra=vol.ALLOW_EXTRA) + + +def setup(hass, config): + """Activate the Apache Kafka integration.""" + from aiokafka import AIOKafkaProducer + + conf = config[DOMAIN] + topic_name = conf[CONF_TOPIC] + entities_filter = conf[CONF_FILTER] + + producer = AIOKafkaProducer( + loop=hass.loop, + bootstrap_servers="{0}:{1}".format( + conf[CONF_IP_ADDRESS], conf[CONF_PORT]), + compression_type="gzip", + ) + + encoder = DateTimeJSONEncoder() + + async def send_to_pubsub(event): + """Send states to Pub/Sub.""" + await producer.start() + + state = event.data.get('new_state') + if (state is None + or state.state in (STATE_UNKNOWN, '', STATE_UNAVAILABLE) + or not entities_filter(state.entity_id)): + return + + as_dict = state.as_dict() + data = json.dumps( + obj=as_dict, + default=encoder.encode + ).encode('utf-8') + + try: + await producer.send_and_wait(topic_name, data) + finally: + producer.stop() + + hass.bus.listen(EVENT_HOMEASSISTANT_STOP, producer.stop()) + hass.bus.listen(EVENT_STATE_CHANGED, send_to_pubsub) + + return True + + +class DateTimeJSONEncoder(json.JSONEncoder): + """Encode python objects. + + Additionally add encoding for datetime objects as isoformat. + """ + + def default(self, o): # pylint: disable=E0202 + """Implement encoding logic.""" + if isinstance(o, datetime): + return o.isoformat() + return super().default(o) diff --git a/homeassistant/components/apache_kafka/manifest.json b/homeassistant/components/apache_kafka/manifest.json new file mode 100644 index 00000000000..29d3d4bfcdc --- /dev/null +++ b/homeassistant/components/apache_kafka/manifest.json @@ -0,0 +1,10 @@ +{ + "domain": "apache_kafka", + "name": "Apache Kafka", + "documentation": "https://www.home-assistant.io/components/apache_kafka", + "requirements": [ + "aiokafka==0.5.1" + ], + "dependencies": [], + "codeowners": [] +}