mirror of
https://github.com/vikt0rm/dbus-goecharger.git
synced 2025-06-24 17:01:44 +02:00
350 lines
13 KiB
Python
Executable File
350 lines
13 KiB
Python
Executable File
#!/usr/bin/env python
|
|
|
|
# import normal packages
|
|
import platform
|
|
import logging
|
|
from logging.handlers import RotatingFileHandler
|
|
import sys
|
|
import os
|
|
import sys
|
|
if sys.version_info.major == 2:
|
|
import gobject
|
|
else:
|
|
from gi.repository import GLib as gobject
|
|
import sys
|
|
import time
|
|
import requests # for http GET
|
|
import configparser # for config/ini file
|
|
|
|
# our own packages from victron
|
|
sys.path.insert(1, os.path.join(os.path.dirname(__file__), '/opt/victronenergy/dbus-systemcalc-py/ext/velib_python'))
|
|
from vedbus import VeDbusService
|
|
|
|
|
|
class DbusGoeChargerService:
|
|
def __init__(self, servicename, paths, productname='go-eCharger', connection='go-eCharger HTTP JSON service'):
|
|
config = self._getConfig()
|
|
deviceinstance = int(config['DEFAULT']['Deviceinstance'])
|
|
hardwareVersion = int(config['DEFAULT']['HardwareVersion'])
|
|
acPosition = int(config['DEFAULT']['AcPosition'])
|
|
pauseBetweenRequests = int(config['ONPREMISE']['PauseBetweenRequests']) # in ms
|
|
|
|
if pauseBetweenRequests <= 20:
|
|
raise ValueError("Pause between requests must be greater than 20")
|
|
|
|
self._dbusservice = VeDbusService("{}.http_{:02d}".format(servicename, deviceinstance), register=False)
|
|
self._paths = paths
|
|
|
|
logging.debug("%s /DeviceInstance = %d" % (servicename, deviceinstance))
|
|
|
|
paths_wo_unit = [
|
|
'/Status', # value 'car' 1: charging station ready, no vehicle 2: vehicle loads 3: Waiting for vehicle 4: Charge finished, vehicle still connected
|
|
'/Mode'
|
|
]
|
|
|
|
#get data from go-eCharger
|
|
data = self._getGoeChargerData('sse,fwv')
|
|
|
|
# Create the management objects, as specified in the ccgx dbus-api document
|
|
self._dbusservice.add_path('/Mgmt/ProcessName', __file__)
|
|
self._dbusservice.add_path('/Mgmt/ProcessVersion', 'Unkown version, and running on Python ' + platform.python_version())
|
|
self._dbusservice.add_path('/Mgmt/Connection', connection)
|
|
|
|
# Create the mandatory objects
|
|
self._dbusservice.add_path('/DeviceInstance', deviceinstance)
|
|
self._dbusservice.add_path('/ProductId', 0xFFFF) #
|
|
self._dbusservice.add_path('/ProductName', productname)
|
|
self._dbusservice.add_path('/CustomName', productname)
|
|
if data:
|
|
fwv = data['fwv']
|
|
try:
|
|
fwv = int(data['fwv'].replace('.', ''))
|
|
except:
|
|
pass
|
|
self._dbusservice.add_path('/FirmwareVersion', fwv)
|
|
self._dbusservice.add_path('/Serial', data['sse'])
|
|
self._dbusservice.add_path('/HardwareVersion', hardwareVersion)
|
|
self._dbusservice.add_path('/Connected', 1)
|
|
self._dbusservice.add_path('/UpdateIndex', 0)
|
|
self._dbusservice.add_path('/Position', acPosition)
|
|
|
|
# add paths without units
|
|
for path in paths_wo_unit:
|
|
self._dbusservice.add_path(path, None)
|
|
|
|
# add path values to dbus
|
|
for path, settings in self._paths.items():
|
|
self._dbusservice.add_path(
|
|
path, settings['initial'], gettextcallback=settings['textformat'], writeable=True, onchangecallback=self._handlechangedvalue)
|
|
|
|
# register the service
|
|
self._dbusservice.register()
|
|
|
|
# last update
|
|
self._lastUpdate = 0
|
|
|
|
# charging time in float
|
|
self._chargingTime = 0.0
|
|
|
|
# add _update function 'timer'
|
|
gobject.timeout_add(pauseBetweenRequests, self._update)
|
|
|
|
# add _signOfLife 'timer' to get feedback in log every 5minutes
|
|
gobject.timeout_add(self._getSignOfLifeInterval()*60*1000, self._signOfLife)
|
|
|
|
def _getConfig(self):
|
|
config = configparser.ConfigParser()
|
|
config.read("%s/config.ini" % (os.path.dirname(os.path.realpath(__file__))))
|
|
return config
|
|
|
|
|
|
def _getSignOfLifeInterval(self):
|
|
config = self._getConfig()
|
|
value = config['DEFAULT']['SignOfLifeLog']
|
|
|
|
if not value:
|
|
value = 0
|
|
|
|
return int(value)
|
|
|
|
|
|
def _getGoeChargerStatusUrl(self):
|
|
config = self._getConfig()
|
|
accessType = config['DEFAULT']['AccessType']
|
|
|
|
if accessType == 'OnPremise':
|
|
URL = "http://%s/api/status" % (config['ONPREMISE']['Host'])
|
|
else:
|
|
raise ValueError("AccessType %s is not supported" % (config['DEFAULT']['AccessType']))
|
|
|
|
return URL
|
|
|
|
def _getGoeChargerMqttPayloadUrl(self, parameter, value):
|
|
config = self._getConfig()
|
|
accessType = config['DEFAULT']['AccessType']
|
|
|
|
if accessType == 'OnPremise':
|
|
URL = "http://%s/mqtt?payload=%s=%s" % (config['ONPREMISE']['Host'], parameter, value)
|
|
else:
|
|
raise ValueError("AccessType %s is not supported" % (config['DEFAULT']['AccessType']))
|
|
|
|
return URL
|
|
|
|
def _setGoeChargerValue(self, parameter, value):
|
|
URL = self._getGoeChargerMqttPayloadUrl(parameter, str(value))
|
|
request_data = requests.get(url = URL)
|
|
|
|
# check for response
|
|
if not request_data:
|
|
raise ConnectionError("No response from go-eCharger - %s" % (URL))
|
|
|
|
json_data = request_data.json()
|
|
|
|
# check for Json
|
|
if not json_data:
|
|
raise ValueError("Converting response to JSON failed")
|
|
|
|
if json_data[parameter] == str(value):
|
|
return True
|
|
else:
|
|
logging.warning("go-eCharger parameter %s not set to %s" % (parameter, str(value)))
|
|
return False
|
|
|
|
|
|
def _getGoeChargerData(self, filter):
|
|
URL = "%s?filter=%s" % (self._getGoeChargerStatusUrl(), filter)
|
|
try:
|
|
request_data = requests.get(url = URL, timeout=1)
|
|
except Exception:
|
|
return None
|
|
|
|
# check for response
|
|
if not request_data:
|
|
raise ConnectionError("No response from go-eCharger - %s" % (URL))
|
|
|
|
json_data = request_data.json()
|
|
|
|
# check for Json
|
|
if not json_data:
|
|
raise ValueError("Converting response to JSON failed")
|
|
|
|
|
|
return json_data
|
|
|
|
|
|
def _signOfLife(self):
|
|
logging.info("--- Start: sign of life ---")
|
|
logging.info("Last _update() call: %s" % (self._lastUpdate))
|
|
logging.info("Last '/Ac/Power': %s" % (self._dbusservice['/Ac/Power']))
|
|
logging.info("--- End: sign of life ---")
|
|
return True
|
|
|
|
def _update(self):
|
|
try:
|
|
#get data from go-eCharger
|
|
data = self._getGoeChargerData('nrg,eto,wh,alw,amp,ama,car,tmp,tma')
|
|
|
|
if data is not None:
|
|
|
|
'''
|
|
data['nrg']
|
|
0 = U L1
|
|
1 = U L2
|
|
2 = U L3
|
|
3 = U N
|
|
4 = I L1
|
|
5 = I L2
|
|
6 = I L3
|
|
7 = P L1
|
|
8 = P L2
|
|
9 = P L3
|
|
10 = P N
|
|
11 = P Total
|
|
12 = PF L1
|
|
13 = PF L2
|
|
14 = PF L3
|
|
15 = PF N
|
|
'''
|
|
config = self._getConfig()
|
|
hardwareVersion = int(config['DEFAULT']['HardwareVersion'])
|
|
|
|
#send data to DBus
|
|
self._dbusservice['/Ac/Voltage'] = int(data['nrg'][0])
|
|
self._dbusservice['/Ac/L1/Power'] = int(data['nrg'][7])
|
|
self._dbusservice['/Ac/L2/Power'] = int(data['nrg'][8])
|
|
self._dbusservice['/Ac/L3/Power'] = int(data['nrg'][9])
|
|
self._dbusservice['/Ac/Power'] = int(data['nrg'][11])
|
|
self._dbusservice['/Current'] = max(data['nrg'][4], data['nrg'][5], data['nrg'][6])
|
|
if int(hardwareVersion) < 4:
|
|
self._dbusservice['/Ac/Energy/Forward'] = int(float(data['eto']) / 1000.0)
|
|
else:
|
|
self._dbusservice['/Ac/Energy/Forward'] = round(data['wh'] / 1000, 2)
|
|
|
|
self._dbusservice['/StartStop'] = int(data['alw'])
|
|
self._dbusservice['/SetCurrent'] = int(data['amp'])
|
|
self._dbusservice['/MaxCurrent'] = int(data['ama'])
|
|
|
|
# update chargingTime, increment charge time only on active charging (2), reset when no car connected (1)
|
|
timeDelta = time.time() - self._lastUpdate
|
|
if int(data['car']) == 2 and self._lastUpdate > 0: # vehicle loads
|
|
self._chargingTime += timeDelta
|
|
elif int(data['car']) == 1: # charging station ready, no vehicle
|
|
self._chargingTime = 0
|
|
self._dbusservice['/ChargingTime'] = int(self._chargingTime)
|
|
|
|
self._dbusservice['/Mode'] = 0 # Manual, no control
|
|
|
|
config = self._getConfig()
|
|
hardwareVersion = int(config['DEFAULT']['HardwareVersion'])
|
|
if '/MCU/Temperature' in self._dbusservice: # check if path exists, at some point it was removed
|
|
if hardwareVersion >= 3:
|
|
self._dbusservice['/MCU/Temperature'] = int(data['tma'][0] if data['tma'][0] else 0)
|
|
else:
|
|
self._dbusservice['/MCU/Temperature'] = int(data['tmp'])
|
|
|
|
# carState, null if internal error (Unknown/Error=0, Idle=1, Charging=2, WaitCar=3, Complete=4, Error=5)
|
|
# status 0=Disconnected; 1=Connected; 2=Charging; 3=Charged; 4=Waiting for sun; 5=Waiting for RFID; 6=Waiting for start; 7=Low SOC; 8=Ground fault; 9=Welded contacts; 10=CP Input shorted; 11=Residual current detected; 12=Under voltage detected; 13=Overvoltage detected; 14=Overheating detected
|
|
status = 0
|
|
if int(data['car']) == 1:
|
|
status = 0
|
|
elif int(data['car']) == 2:
|
|
status = 2
|
|
elif int(data['car']) == 3:
|
|
status = 6
|
|
elif int(data['car']) == 4:
|
|
status = 3
|
|
self._dbusservice['/Status'] = status
|
|
|
|
#logging
|
|
logging.debug("Wallbox Consumption (/Ac/Power): %s" % (self._dbusservice['/Ac/Power']))
|
|
logging.debug("Wallbox Forward (/Ac/Energy/Forward): %s" % (self._dbusservice['/Ac/Energy/Forward']))
|
|
logging.debug("---")
|
|
|
|
# increment UpdateIndex - to show that new data is available
|
|
index = self._dbusservice['/UpdateIndex'] + 1 # increment index
|
|
if index > 255: # maximum value of the index
|
|
index = 0 # overflow from 255 to 0
|
|
self._dbusservice['/UpdateIndex'] = index
|
|
|
|
#update lastupdate vars
|
|
self._lastUpdate = time.time()
|
|
else:
|
|
logging.debug("Wallbox is not available")
|
|
|
|
except Exception as e:
|
|
logging.critical('Error at %s', '_update', exc_info=e)
|
|
|
|
# return true, otherwise add_timeout will be removed from GObject - see docs http://library.isr.ist.utl.pt/docs/pygtk2reference/gobject-functions.html#function-gobject--timeout-add
|
|
return True
|
|
|
|
def _handlechangedvalue(self, path, value):
|
|
logging.info("someone else updated %s to %s" % (path, value))
|
|
|
|
if path == '/SetCurrent':
|
|
return self._setGoeChargerValue('amp', value)
|
|
elif path == '/StartStop':
|
|
return self._setGoeChargerValue('alw', value)
|
|
elif path == '/MaxCurrent':
|
|
return self._setGoeChargerValue('ama', value)
|
|
else:
|
|
logging.info("mapping for evcharger path %s does not exist" % (path))
|
|
return False
|
|
|
|
|
|
def main():
|
|
#configure logging
|
|
config = configparser.ConfigParser()
|
|
config.read(f"{(os.path.dirname(os.path.realpath(__file__)))}/config.ini")
|
|
logging_level = config["DEFAULT"]["Logging"].upper()
|
|
|
|
logging.basicConfig( format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s',
|
|
datefmt='%Y-%m-%d %H:%M:%S',
|
|
level=logging_level,
|
|
handlers=[
|
|
RotatingFileHandler("%s/current.log" % (os.path.dirname(os.path.realpath(__file__))), maxBytes=10000),
|
|
logging.StreamHandler()
|
|
])
|
|
|
|
try:
|
|
logging.info("Start")
|
|
|
|
from dbus.mainloop.glib import DBusGMainLoop
|
|
# Have a mainloop, so we can send/receive asynchronous calls to and from dbus
|
|
DBusGMainLoop(set_as_default=True)
|
|
|
|
#formatting
|
|
_kwh = lambda p, v: (str(round(v, 2)) + 'kWh')
|
|
_a = lambda p, v: (str(round(v, 1)) + 'A')
|
|
_w = lambda p, v: (str(round(v, 1)) + 'W')
|
|
_v = lambda p, v: (str(round(v, 1)) + 'V')
|
|
_degC = lambda p, v: (str(v) + '°C')
|
|
_s = lambda p, v: (str(v) + 's')
|
|
|
|
#start our main-service
|
|
pvac_output = DbusGoeChargerService(
|
|
servicename='com.victronenergy.evcharger',
|
|
paths={
|
|
'/Ac/Power': {'initial': 0, 'textformat': _w},
|
|
'/Ac/L1/Power': {'initial': 0, 'textformat': _w},
|
|
'/Ac/L2/Power': {'initial': 0, 'textformat': _w},
|
|
'/Ac/L3/Power': {'initial': 0, 'textformat': _w},
|
|
'/Ac/Energy/Forward': {'initial': 0, 'textformat': _kwh},
|
|
'/ChargingTime': {'initial': 0, 'textformat': _s},
|
|
|
|
'/Ac/Voltage': {'initial': 0, 'textformat': _v},
|
|
'/Current': {'initial': 0, 'textformat': _a},
|
|
'/SetCurrent': {'initial': 0, 'textformat': _a},
|
|
'/MaxCurrent': {'initial': 0, 'textformat': _a},
|
|
'/MCU/Temperature': {'initial': 0, 'textformat': _degC},
|
|
'/StartStop': {'initial': 0, 'textformat': lambda p, v: (str(v))}
|
|
}
|
|
)
|
|
|
|
logging.info('Connected to dbus, and switching over to gobject.MainLoop() (= event based)')
|
|
mainloop = gobject.MainLoop()
|
|
mainloop.run()
|
|
except Exception as e:
|
|
logging.critical('Error at %s', 'main', exc_info=e)
|
|
if __name__ == "__main__":
|
|
main()
|