mirror of
				https://github.com/espressif/esp-idf.git
				synced 2025-11-04 09:01:40 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			713 lines
		
	
	
		
			28 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			713 lines
		
	
	
		
			28 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
#!/usr/bin/env python
 | 
						|
#
 | 
						|
# Copyright 2019 Espressif Systems (Shanghai) PTE LTD
 | 
						|
#
 | 
						|
# Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
# you may not use this file except in compliance with the License.
 | 
						|
# You may obtain a copy of the License at
 | 
						|
#
 | 
						|
#     http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
#
 | 
						|
# Unless required by applicable law or agreed to in writing, software
 | 
						|
# distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
# See the License for the specific language governing permissions and
 | 
						|
# limitations under the License.
 | 
						|
#
 | 
						|
 | 
						|
# DBus-Bluez BLE library
 | 
						|
 | 
						|
from __future__ import print_function
 | 
						|
 | 
						|
import sys
 | 
						|
import time
 | 
						|
 | 
						|
try:
 | 
						|
    import dbus
 | 
						|
    import dbus.mainloop.glib
 | 
						|
    from gi.repository import GLib
 | 
						|
except ImportError as e:
 | 
						|
    if 'linux' not in sys.platform:
 | 
						|
        raise e
 | 
						|
    print(e)
 | 
						|
    print('Install packages `libgirepository1.0-dev gir1.2-gtk-3.0 libcairo2-dev libdbus-1-dev libdbus-glib-1-dev` for resolving the issue')
 | 
						|
    print('Run `pip install -r $IDF_PATH/tools/ble/requirements.txt` for resolving the issue')
 | 
						|
    raise
 | 
						|
 | 
						|
from . import lib_gap, lib_gatt
 | 
						|
 | 
						|
BLUEZ_SERVICE_NAME = 'org.bluez'
 | 
						|
DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
 | 
						|
DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
 | 
						|
 | 
						|
ADAPTER_IFACE = 'org.bluez.Adapter1'
 | 
						|
DEVICE_IFACE = 'org.bluez.Device1'
 | 
						|
 | 
						|
GATT_MANAGER_IFACE = 'org.bluez.GattManager1'
 | 
						|
LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1'
 | 
						|
 | 
						|
GATT_SERVICE_IFACE = 'org.bluez.GattService1'
 | 
						|
GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1'
 | 
						|
 | 
						|
 | 
						|
class DBusException(dbus.exceptions.DBusException):
 | 
						|
    pass
 | 
						|
 | 
						|
 | 
						|
class Characteristic:
 | 
						|
    def __init__(self):
 | 
						|
        self.iface = None
 | 
						|
        self.path = None
 | 
						|
        self.props = None
 | 
						|
 | 
						|
 | 
						|
class Service:
 | 
						|
    def __init__(self):
 | 
						|
        self.iface = None
 | 
						|
        self.path = None
 | 
						|
        self.props = None
 | 
						|
        self.chars = []
 | 
						|
 | 
						|
 | 
						|
class Device:
 | 
						|
    def __init__(self):
 | 
						|
        self.iface = None
 | 
						|
        self.path = None
 | 
						|
        self.props = None
 | 
						|
        self.name = None
 | 
						|
        self.addr = None
 | 
						|
        self.services = []
 | 
						|
 | 
						|
 | 
						|
class Adapter:
 | 
						|
    def __init__(self):
 | 
						|
        self.iface = None
 | 
						|
        self.path = None
 | 
						|
        self.props = None
 | 
						|
 | 
						|
 | 
						|
class BLE_Bluez_Client:
 | 
						|
    def __init__(self, iface=None):
 | 
						|
        self.bus = None
 | 
						|
        self.hci_iface = iface
 | 
						|
        self.adapter = Adapter()
 | 
						|
        self.device = None
 | 
						|
        self.gatt_app = None
 | 
						|
        self.gatt_mgr = None
 | 
						|
        self.mainloop = None
 | 
						|
        self.loop_cnt = 0
 | 
						|
 | 
						|
        try:
 | 
						|
            dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
 | 
						|
            self.bus = dbus.SystemBus()
 | 
						|
        except dbus.exceptions.DBusException as dbus_err:
 | 
						|
            raise DBusException('Failed to initialise client: {}'.format(dbus_err))
 | 
						|
        except Exception as err:
 | 
						|
            raise Exception('Failed to initialise client: {}'.format(err))
 | 
						|
 | 
						|
    def __del__(self):
 | 
						|
        try:
 | 
						|
            # Cleanup
 | 
						|
            self.disconnect()
 | 
						|
            print('Test Exit')
 | 
						|
        except Exception as e:
 | 
						|
            print(e)
 | 
						|
 | 
						|
    def set_adapter(self):
 | 
						|
        '''
 | 
						|
            Discover Bluetooth Adapter
 | 
						|
            Power On Bluetooth Adapter
 | 
						|
        '''
 | 
						|
        try:
 | 
						|
            print('discovering adapter')
 | 
						|
            dbus_obj_mgr = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE)
 | 
						|
            dbus_objs = dbus_obj_mgr.GetManagedObjects()
 | 
						|
            for path, interfaces in dbus_objs.items():
 | 
						|
                adapter = interfaces.get(ADAPTER_IFACE)
 | 
						|
                if adapter is not None and path.endswith(self.hci_iface):
 | 
						|
                    self.adapter.iface = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, path), ADAPTER_IFACE)
 | 
						|
                    self.adapter.props = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, path), DBUS_PROP_IFACE)
 | 
						|
                    self.adapter.path = path
 | 
						|
                    break
 | 
						|
 | 
						|
            if self.adapter.iface is None:
 | 
						|
                print('bluetooth adapter not found')
 | 
						|
                return False
 | 
						|
 | 
						|
            print('bluetooth adapter discovered')
 | 
						|
            print('checking if bluetooth adapter is already powered on')
 | 
						|
            # Check if adapter is already powered on
 | 
						|
            powered = self.adapter.props.Get(ADAPTER_IFACE, 'Powered')
 | 
						|
            if powered == 1:
 | 
						|
                print('adapter already powered on')
 | 
						|
                return True
 | 
						|
            # Power On Adapter
 | 
						|
            print('powering on adapter')
 | 
						|
            self.adapter.props.Set(ADAPTER_IFACE, 'Powered', dbus.Boolean(1))
 | 
						|
            # Check if adapter is powered on
 | 
						|
            print('checking if adapter is powered on')
 | 
						|
            for cnt in range(10, 0, -1):
 | 
						|
                time.sleep(5)
 | 
						|
                powered_on = self.adapter.props.Get(ADAPTER_IFACE, 'Powered')
 | 
						|
                if powered_on == 1:
 | 
						|
                    # Set adapter props again with powered on value
 | 
						|
                    self.adapter.props = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, self.adapter.path), DBUS_PROP_IFACE)
 | 
						|
                    print('bluetooth adapter powered on')
 | 
						|
                    return True
 | 
						|
                print('number of retries left({})'.format(cnt - 1))
 | 
						|
 | 
						|
            # Adapter not powered on
 | 
						|
            print('bluetooth adapter not powered on')
 | 
						|
            return False
 | 
						|
 | 
						|
        except Exception as err:
 | 
						|
            raise Exception('Failed to set adapter: {}'.format(err))
 | 
						|
 | 
						|
    def connect(self, devname=None, devaddr=None):
 | 
						|
        '''
 | 
						|
            Start Discovery and Connect to the device
 | 
						|
        '''
 | 
						|
        try:
 | 
						|
            device_found = None
 | 
						|
            start_discovery = False
 | 
						|
            self.device = Device()
 | 
						|
 | 
						|
            discovery_val = self.adapter.props.Get(ADAPTER_IFACE, 'Discovering')
 | 
						|
            # Start Discovery
 | 
						|
            if discovery_val == 0:
 | 
						|
                print('starting discovery')
 | 
						|
                self.adapter.iface.StartDiscovery()
 | 
						|
                start_discovery = True
 | 
						|
 | 
						|
                for cnt in range(10, 0, -1):
 | 
						|
                    time.sleep(5)
 | 
						|
                    discovery_val = self.adapter.props.Get(ADAPTER_IFACE, 'Discovering')
 | 
						|
                    if discovery_val == 1:
 | 
						|
                        print('start discovery successful')
 | 
						|
                        break
 | 
						|
                    print('number of retries left ({})'.format(cnt - 1))
 | 
						|
 | 
						|
                if discovery_val == 0:
 | 
						|
                    print('start discovery failed')
 | 
						|
                    return False
 | 
						|
 | 
						|
            # Get device
 | 
						|
            for cnt in range(10, 0, -1):
 | 
						|
                # Wait for device to be discovered
 | 
						|
                time.sleep(5)
 | 
						|
                device_found = self.get_device(
 | 
						|
                    devname=devname,
 | 
						|
                    devaddr=devaddr)
 | 
						|
                if device_found:
 | 
						|
                    break
 | 
						|
                # Retry
 | 
						|
                print('number of retries left ({})'.format(cnt - 1))
 | 
						|
 | 
						|
            if not device_found:
 | 
						|
                print('expected device {} [ {} ] not found'.format(devname, devaddr))
 | 
						|
                return False
 | 
						|
 | 
						|
            # Connect to expected device found
 | 
						|
            print('connecting to device {} [ {} ] '.format(self.device.name, self.device.addr))
 | 
						|
            self.device.iface.Connect(dbus_interface=DEVICE_IFACE)
 | 
						|
            for cnt in range(10, 0, -1):
 | 
						|
                time.sleep(5)
 | 
						|
                connected = self.device.props.Get(DEVICE_IFACE, 'Connected')
 | 
						|
                if connected == 1:
 | 
						|
                    # Set device props again with connected on value
 | 
						|
                    self.device.props = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, self.device.path), DBUS_PROP_IFACE)
 | 
						|
                    print('connected to device with iface {}'.format(self.device.path))
 | 
						|
                    return True
 | 
						|
                print('number of retries left({})'.format(cnt - 1))
 | 
						|
 | 
						|
            # Device not connected
 | 
						|
            print('connection to device failed')
 | 
						|
            return False
 | 
						|
 | 
						|
        except Exception as err:
 | 
						|
            raise Exception('Connect to device failed : {}'.format(err))
 | 
						|
        finally:
 | 
						|
            try:
 | 
						|
                if start_discovery:
 | 
						|
                    print('stopping discovery')
 | 
						|
                    self.adapter.iface.StopDiscovery()
 | 
						|
                    for cnt in range(10, 0, -1):
 | 
						|
                        time.sleep(5)
 | 
						|
                        discovery_val = self.adapter.props.Get(ADAPTER_IFACE, 'Discovering')
 | 
						|
                        if discovery_val == 0:
 | 
						|
                            print('stop discovery successful')
 | 
						|
                            break
 | 
						|
                        print('number of retries left ({})'.format(cnt - 1))
 | 
						|
                    if discovery_val == 1:
 | 
						|
                        print('stop discovery failed')
 | 
						|
            except dbus.exceptions.DBusException as dbus_err:
 | 
						|
                print('Warning: Failure during cleanup for device connection : {}'.format(dbus_err))
 | 
						|
 | 
						|
    def get_device(self, devname=None, devaddr=None):
 | 
						|
        '''
 | 
						|
            Get device based on device name
 | 
						|
            and device address and connect to device
 | 
						|
        '''
 | 
						|
        dev_path = None
 | 
						|
        expected_device_addr = devaddr.lower()
 | 
						|
        expected_device_name = devname.lower()
 | 
						|
 | 
						|
        print('checking if expected device {} [ {} ] is present'.format(devname, devaddr))
 | 
						|
 | 
						|
        dbus_obj_mgr = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE)
 | 
						|
        dbus_objs = dbus_obj_mgr.GetManagedObjects()
 | 
						|
 | 
						|
        # Check if expected device is present
 | 
						|
        for path, interfaces in dbus_objs.items():
 | 
						|
            if DEVICE_IFACE not in interfaces.keys():
 | 
						|
                continue
 | 
						|
 | 
						|
            # Check expected device address is received device address
 | 
						|
            received_device_addr_path = (path.replace('_', ':')).lower()
 | 
						|
            if expected_device_addr not in received_device_addr_path:
 | 
						|
                continue
 | 
						|
 | 
						|
            device_props = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, path), DBUS_PROP_IFACE)
 | 
						|
            received_device_name = device_props.Get(DEVICE_IFACE, 'Name').lower()
 | 
						|
 | 
						|
            # Check expected device name is received device name
 | 
						|
            if expected_device_name == received_device_name:
 | 
						|
                # Set device iface path
 | 
						|
                dev_path = path
 | 
						|
                break
 | 
						|
 | 
						|
        if not dev_path:
 | 
						|
            print('\nBLE device not found')
 | 
						|
            return False
 | 
						|
 | 
						|
        print('device {} [ {} ] found'.format(devname, devaddr))
 | 
						|
 | 
						|
        # Set device details
 | 
						|
        self.device.iface = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, dev_path), DEVICE_IFACE)
 | 
						|
        self.device.props = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, dev_path), DBUS_PROP_IFACE)
 | 
						|
        self.device.path = dev_path
 | 
						|
        self.device.name = devname
 | 
						|
        self.device.addr = devaddr
 | 
						|
        return True
 | 
						|
 | 
						|
    def get_services(self):
 | 
						|
        '''
 | 
						|
        Retrieve Services found in the device connected
 | 
						|
        '''
 | 
						|
        try:
 | 
						|
            # Get current dbus objects
 | 
						|
            dbus_obj_mgr = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE)
 | 
						|
            dbus_objs = dbus_obj_mgr.GetManagedObjects()
 | 
						|
 | 
						|
            # Get services
 | 
						|
            for path, interfaces in dbus_objs.items():
 | 
						|
                if GATT_SERVICE_IFACE in interfaces.keys():
 | 
						|
                    if not path.startswith(self.device.path):
 | 
						|
                        continue
 | 
						|
                    received_service = self.bus.get_object(BLUEZ_SERVICE_NAME, path)
 | 
						|
                    # Retrieve all services on device iface path
 | 
						|
                    # and set each service received
 | 
						|
                    service = Service()
 | 
						|
                    service.path = path
 | 
						|
                    service.iface = dbus.Interface(received_service, GATT_SERVICE_IFACE)
 | 
						|
                    service.props = dbus.Interface(received_service, DBUS_PROP_IFACE)
 | 
						|
                    self.device.services.append(service)
 | 
						|
 | 
						|
            if not self.device.services:
 | 
						|
                print('no services found for device: {}'.format(self.device.path))
 | 
						|
                return False
 | 
						|
 | 
						|
            return True
 | 
						|
 | 
						|
        except Exception as err:
 | 
						|
            raise Exception('Failed to get services: {}'.format(err))
 | 
						|
 | 
						|
    def get_chars(self):
 | 
						|
        '''
 | 
						|
            Get characteristics of the services set for the device connected
 | 
						|
        '''
 | 
						|
        try:
 | 
						|
            if not self.device.services:
 | 
						|
                print('No services set for device: {}'.format(self.device.path))
 | 
						|
                return
 | 
						|
 | 
						|
            # Read chars for all the services received for device
 | 
						|
            for service in self.device.services:
 | 
						|
                char_found = False
 | 
						|
                dbus_obj_mgr = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE)
 | 
						|
                dbus_objs = dbus_obj_mgr.GetManagedObjects()
 | 
						|
                for path, interfaces in dbus_objs.items():
 | 
						|
                    if GATT_CHRC_IFACE in interfaces.keys():
 | 
						|
                        if not path.startswith(self.device.path):
 | 
						|
                            continue
 | 
						|
                        if not path.startswith(service.path):
 | 
						|
                            continue
 | 
						|
                        # Set characteristics
 | 
						|
                        received_char = self.bus.get_object(BLUEZ_SERVICE_NAME, path)
 | 
						|
                        char = Characteristic()
 | 
						|
                        char.path = path
 | 
						|
                        char.iface = dbus.Interface(received_char, GATT_CHRC_IFACE)
 | 
						|
                        char.props = dbus.Interface(received_char, DBUS_PROP_IFACE)
 | 
						|
                        service.chars.append(char)
 | 
						|
                        char_found = True
 | 
						|
 | 
						|
                if not char_found:
 | 
						|
                    print('Characteristic not found for service: {}'.format(service.iface))
 | 
						|
 | 
						|
        except Exception as err:
 | 
						|
            raise Exception('Failed to get characteristics : {}'.format(err))
 | 
						|
 | 
						|
    def read_chars(self):
 | 
						|
        '''
 | 
						|
            Read value of characteristics
 | 
						|
        '''
 | 
						|
        try:
 | 
						|
            if not self.device.services:
 | 
						|
                print('No services set for device: {}'.format(self.device.path))
 | 
						|
                return
 | 
						|
 | 
						|
            # Read chars for all services of device
 | 
						|
            for service in self.device.services:
 | 
						|
                # Read properties of characteristic
 | 
						|
                for char in service.chars:
 | 
						|
                    # Print path
 | 
						|
                    print('Characteristic: {}'.format(char.path))
 | 
						|
                    # Print uuid
 | 
						|
                    uuid = char.props.Get(GATT_CHRC_IFACE, 'UUID')
 | 
						|
                    print('UUID: {}'.format(uuid))
 | 
						|
                    # Print flags
 | 
						|
                    flags = [flag for flag in char.props.Get(GATT_CHRC_IFACE, 'Flags')]
 | 
						|
                    print('Flags: {}'.format(flags))
 | 
						|
                    # Read value if `read` flag is present
 | 
						|
                    if 'read' in flags:
 | 
						|
                        value = char.iface.ReadValue({}, dbus_interface=GATT_CHRC_IFACE)
 | 
						|
                        print('Value: {}'.format(value))
 | 
						|
 | 
						|
        except Exception as err:
 | 
						|
            raise Exception('Failed to read characteristics : {}'.format(err))
 | 
						|
 | 
						|
    def write_chars(self, new_value):
 | 
						|
        '''
 | 
						|
            Write to characteristics
 | 
						|
        '''
 | 
						|
        try:
 | 
						|
            if not self.device.services:
 | 
						|
                print('No services set for device: {}'.format(self.device.path))
 | 
						|
                return False
 | 
						|
 | 
						|
            print('writing data to characteristics with read and write permission')
 | 
						|
            # Read chars of all services of device
 | 
						|
            for service in self.device.services:
 | 
						|
                if not service.chars:
 | 
						|
                    print('No chars found for service: {}'.format(service.path))
 | 
						|
                    continue
 | 
						|
                for char in service.chars:
 | 
						|
                    flags = [flag.lower() for flag in char.props.Get(GATT_CHRC_IFACE, 'Flags')]
 | 
						|
                    if not ('read' in flags and 'write' in flags):
 | 
						|
                        continue
 | 
						|
 | 
						|
                    # Write new value to characteristic
 | 
						|
                    curr_value = char.iface.ReadValue({}, dbus_interface=GATT_CHRC_IFACE)
 | 
						|
                    print('current value: {}'.format(curr_value))
 | 
						|
 | 
						|
                    print('writing {} to characteristic {}'.format(new_value, char.path))
 | 
						|
                    char.iface.WriteValue(new_value, {}, dbus_interface=GATT_CHRC_IFACE)
 | 
						|
 | 
						|
                    time.sleep(5)
 | 
						|
                    updated_value = char.iface.ReadValue({}, dbus_interface=GATT_CHRC_IFACE)
 | 
						|
                    print('updated value: {}'.format(updated_value))
 | 
						|
 | 
						|
                    if not (ord(new_value) == int(updated_value[0])):
 | 
						|
                        print('write operation to {} failed'.format(char.path))
 | 
						|
                        return False
 | 
						|
                    print('write operation to {} successful'.format(char.path))
 | 
						|
                    return True
 | 
						|
 | 
						|
        except Exception as err:
 | 
						|
            raise Exception('Failed to write to characteristics: {}'.format(err))
 | 
						|
 | 
						|
    def get_char_if_exists(self, char_uuid):
 | 
						|
        '''
 | 
						|
            Get char if exists for given uuid
 | 
						|
        '''
 | 
						|
        try:
 | 
						|
            for service in self.device.services:
 | 
						|
                for char in service.chars:
 | 
						|
                    curr_uuid = char.props.Get(GATT_CHRC_IFACE, 'UUID')
 | 
						|
                    if char_uuid.lower() in curr_uuid.lower():
 | 
						|
                        return char
 | 
						|
            print('char {} not found'.format(char_uuid))
 | 
						|
            return False
 | 
						|
        except Exception as err:
 | 
						|
            raise Exception('Failed to get char based on uuid {} - {}'.format(char_uuid, err))
 | 
						|
 | 
						|
    def get_service_if_exists(self, service_uuid):
 | 
						|
        try:
 | 
						|
            for service in self.device.services:
 | 
						|
                uuid = service.props.Get(GATT_SERVICE_IFACE, 'UUID')
 | 
						|
                if service_uuid.lower() in uuid.lower():
 | 
						|
                    return service
 | 
						|
            print('service {} not found'.format(service_uuid))
 | 
						|
            return False
 | 
						|
        except Exception as err:
 | 
						|
            raise Exception('Failed to get service based on uuid {} - {}'.format(service_uuid, err))
 | 
						|
 | 
						|
    def start_notify(self, char):
 | 
						|
        try:
 | 
						|
            notify_started = 0
 | 
						|
            notifying = char.props.Get(GATT_CHRC_IFACE, 'Notifying')
 | 
						|
            if notifying == 0:
 | 
						|
                # Start Notify
 | 
						|
                char.iface.StartNotify()
 | 
						|
                notify_started = 1
 | 
						|
                # Check notify started
 | 
						|
                for _ in range(10, 0, -1):
 | 
						|
                    notifying = char.props.Get(GATT_CHRC_IFACE, 'Notifying')
 | 
						|
                    if notifying == 1:
 | 
						|
                        print('subscribe to notifications: on')
 | 
						|
                        break
 | 
						|
                if notifying == 0:
 | 
						|
                    print('Failed to start notifications')
 | 
						|
                    return False
 | 
						|
 | 
						|
            # Get updated value
 | 
						|
            for _ in range(10, 0, -1):
 | 
						|
                time.sleep(1)
 | 
						|
                char_value = char.props.Get(GATT_CHRC_IFACE, 'Value')
 | 
						|
                print(char_value)
 | 
						|
 | 
						|
            return None
 | 
						|
 | 
						|
        except Exception as err:
 | 
						|
            raise Exception('Failed to perform notification operation: {}'.format(err))
 | 
						|
        finally:
 | 
						|
            try:
 | 
						|
                if notify_started == 1:
 | 
						|
                    # Stop notify
 | 
						|
                    char.iface.StopNotify()
 | 
						|
                    for _ in range(10, 0, -1):
 | 
						|
                        notifying = char.props.Get(GATT_CHRC_IFACE, 'Notifying')
 | 
						|
                        if notifying == 0:
 | 
						|
                            print('subscribe to notifications: off')
 | 
						|
                            break
 | 
						|
                    if notifying == 1:
 | 
						|
                        print('Failed to stop notifications')
 | 
						|
            except dbus.exceptions.DBusException as dbus_err:
 | 
						|
                print('Warning: Failure during cleanup for start notify : {}'.format(dbus_err))
 | 
						|
 | 
						|
    def _create_mainloop(self):
 | 
						|
        '''
 | 
						|
            Create GLibMainLoop
 | 
						|
        '''
 | 
						|
        if not self.mainloop:
 | 
						|
            self.mainloop = GLib.MainLoop()
 | 
						|
 | 
						|
    def register_gatt_app(self):
 | 
						|
        '''
 | 
						|
            Create Gatt Application
 | 
						|
            Register Gatt Application
 | 
						|
        '''
 | 
						|
        try:
 | 
						|
            # Create mainloop, if does not exist
 | 
						|
            self._create_mainloop()
 | 
						|
 | 
						|
            # Create Gatt Application
 | 
						|
            self.gatt_app = lib_gatt.AlertNotificationApp(self.bus, self.adapter.path)
 | 
						|
            print('GATT Application created')
 | 
						|
            self.gatt_mgr = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, self.adapter.path), GATT_MANAGER_IFACE)
 | 
						|
 | 
						|
            # Register Gatt Application
 | 
						|
            self.gatt_mgr.RegisterApplication(
 | 
						|
                self.gatt_app, {},
 | 
						|
                reply_handler=self.gatt_app_success_handler,
 | 
						|
                error_handler=self.gatt_app_error_handler)
 | 
						|
            self.mainloop.run()
 | 
						|
 | 
						|
        except dbus.exceptions.DBusException as dbus_err:
 | 
						|
            raise DBusException('Failed to create GATT Application : {}'.format(dbus_err))
 | 
						|
        except Exception as err:
 | 
						|
            raise Exception('Failed to register Gatt Application: {}'.format(err))
 | 
						|
 | 
						|
    def gatt_app_success_handler(self):
 | 
						|
        print('GATT Application successfully registered')
 | 
						|
        self.mainloop.quit()
 | 
						|
 | 
						|
    def gatt_app_error_handler(self):
 | 
						|
        raise DBusException('Failed to register GATT Application')
 | 
						|
 | 
						|
    def check_le_iface(self):
 | 
						|
        '''
 | 
						|
            Check if LEAdvertisingManager1 interface exists
 | 
						|
        '''
 | 
						|
        try:
 | 
						|
            dbus_obj_mgr = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE)
 | 
						|
            dbus_objs = dbus_obj_mgr.GetManagedObjects()
 | 
						|
            for path, iface in dbus_objs.items():
 | 
						|
                if LE_ADVERTISING_MANAGER_IFACE in iface:
 | 
						|
                    le_adv_iface_path = path
 | 
						|
                    break
 | 
						|
            # Check LEAdvertisingManager1 interface is found
 | 
						|
            assert le_adv_iface_path, '\n Cannot start advertising. LEAdvertisingManager1 Interface not found'
 | 
						|
 | 
						|
            return le_adv_iface_path
 | 
						|
 | 
						|
        except AssertionError:
 | 
						|
            raise
 | 
						|
        except Exception as err:
 | 
						|
            raise Exception('Failed to find LEAdvertisingManager1 interface: {}'.format(err))
 | 
						|
 | 
						|
    def register_adv(self, adv_host_name, adv_type, adv_uuid):
 | 
						|
        try:
 | 
						|
            # Gatt Application is expected to be registered
 | 
						|
            if not self.gatt_app:
 | 
						|
                print('No Gatt Application is registered')
 | 
						|
                return
 | 
						|
 | 
						|
            adv_iface_index = 0
 | 
						|
 | 
						|
            # Create mainloop, if does not exist
 | 
						|
            self._create_mainloop()
 | 
						|
 | 
						|
            # Check LEAdvertisingManager1 interface exists
 | 
						|
            le_iface_path = self.check_le_iface()
 | 
						|
 | 
						|
            # Create Advertisement data
 | 
						|
            leadv_obj = lib_gap.Advertisement(
 | 
						|
                self.bus,
 | 
						|
                adv_iface_index,
 | 
						|
                adv_type,
 | 
						|
                adv_uuid,
 | 
						|
                adv_host_name)
 | 
						|
            print('Advertisement registered')
 | 
						|
 | 
						|
            # Register Advertisement
 | 
						|
            leadv_mgr_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, le_iface_path), LE_ADVERTISING_MANAGER_IFACE)
 | 
						|
            leadv_mgr_iface_obj.RegisterAdvertisement(
 | 
						|
                leadv_obj.get_path(), {},
 | 
						|
                reply_handler=self.adv_success_handler,
 | 
						|
                error_handler=self.adv_error_handler)
 | 
						|
 | 
						|
            # Handler to read events received and exit from mainloop
 | 
						|
            GLib.timeout_add_seconds(3, self.check_adv)
 | 
						|
 | 
						|
            self.mainloop.run()
 | 
						|
 | 
						|
        except AssertionError:
 | 
						|
            raise
 | 
						|
        except dbus.exceptions.DBusException as dbus_err:
 | 
						|
            raise DBusException('Failure during registering advertisement : {}'.format(dbus_err))
 | 
						|
        except Exception as err:
 | 
						|
            raise Exception('Failure during registering advertisement : {}'.format(err))
 | 
						|
        else:
 | 
						|
            try:
 | 
						|
                try:
 | 
						|
                    # Stop Notify if not already stopped
 | 
						|
                    chars = self.gatt_app.service.get_characteristics()
 | 
						|
                    for char in chars:
 | 
						|
                        if char.uuid == lib_gatt.CHAR_UUIDS['UNREAD_ALERT_STATUS_UUID']:
 | 
						|
                            if char.notifying:
 | 
						|
                                char.StopNotify()
 | 
						|
                except dbus.exceptions.DBusException as dbus_err:
 | 
						|
                    print('Warning: {}'.format(dbus_err))
 | 
						|
 | 
						|
                try:
 | 
						|
                    # Unregister Advertisement
 | 
						|
                    leadv_mgr_iface_obj.UnregisterAdvertisement(leadv_obj.get_path())
 | 
						|
                except dbus.exceptions.DBusException as dbus_err:
 | 
						|
                    print('Warning: {}'.format(dbus_err))
 | 
						|
 | 
						|
                try:
 | 
						|
                    # Remove advertising data
 | 
						|
                    dbus.service.Object.remove_from_connection(leadv_obj)
 | 
						|
                except LookupError as err:
 | 
						|
                    print('Warning: Failed to remove connection from dbus for advertisement object: {} - {}'.format(leadv_obj, err))
 | 
						|
 | 
						|
                try:
 | 
						|
                    # Unregister Gatt Application
 | 
						|
                    self.gatt_mgr.UnregisterApplication(self.gatt_app.get_path())
 | 
						|
                except dbus.exceptions.DBusException as dbus_err:
 | 
						|
                    print('Warning: {}'.format(dbus_err))
 | 
						|
 | 
						|
                try:
 | 
						|
                    # Remove Gatt Application
 | 
						|
                    dbus.service.Object.remove_from_connection(self.gatt_app)
 | 
						|
                except LookupError as err:
 | 
						|
                    print('Warning: Failed to remove connection from dbus for Gatt application object: {} - {}'.format(self.gatt_app, err))
 | 
						|
 | 
						|
            except RuntimeError as err:
 | 
						|
                print('Warning: Failure during cleanup of Advertisement: {}'.format(err))
 | 
						|
 | 
						|
    def adv_success_handler(self):
 | 
						|
        print('Registered Advertisement successfully')
 | 
						|
 | 
						|
    def adv_error_handler(self, err):
 | 
						|
        raise DBusException('{}'.format(err))
 | 
						|
 | 
						|
    def check_adv(self):
 | 
						|
        '''
 | 
						|
            Handler to check for events triggered (read/write/subscribe)
 | 
						|
            for advertisement registered for AlertNotificationApp
 | 
						|
        '''
 | 
						|
        try:
 | 
						|
            retry = 10
 | 
						|
            # Exit loop if read and write and subscribe is successful
 | 
						|
            if self.gatt_app.service.get_char_status(lib_gatt.CHAR_UUIDS['SUPPORT_NEW_ALERT_UUID'], 'read') and \
 | 
						|
                self.gatt_app.service.get_char_status(lib_gatt.CHAR_UUIDS['ALERT_NOTIF_UUID'], 'write') and \
 | 
						|
                    self.gatt_app.service.get_char_status(lib_gatt.CHAR_UUIDS['UNREAD_ALERT_STATUS_UUID'], 'notify'):
 | 
						|
                if self.mainloop.is_running():
 | 
						|
                    self.mainloop.quit()
 | 
						|
                    # return False to stop polling
 | 
						|
                    return False
 | 
						|
 | 
						|
            self.loop_cnt += 1
 | 
						|
            print('Check read/write/subscribe events are received...Retry {}'.format(self.loop_cnt))
 | 
						|
 | 
						|
            # Exit loop if max retry value is reached and
 | 
						|
            # all three events (read and write and subscribe) have not yet passed
 | 
						|
            # Retry total 10 times
 | 
						|
            if self.loop_cnt == (retry - 1):
 | 
						|
                if self.mainloop.is_running():
 | 
						|
                    self.mainloop.quit()
 | 
						|
                    # return False to stop polling
 | 
						|
                    return False
 | 
						|
 | 
						|
            # return True to continue polling
 | 
						|
            return True
 | 
						|
 | 
						|
        except RuntimeError as err:
 | 
						|
            print('Failure in advertisment handler: {}'.format(err))
 | 
						|
            if self.mainloop.is_running():
 | 
						|
                self.mainloop.quit()
 | 
						|
                # return False to stop polling
 | 
						|
                return False
 | 
						|
 | 
						|
    def disconnect(self):
 | 
						|
        '''
 | 
						|
        Disconnect device
 | 
						|
        '''
 | 
						|
        try:
 | 
						|
            if not self.device or not self.device.iface:
 | 
						|
                return
 | 
						|
            print('disconnecting device')
 | 
						|
            # Disconnect device
 | 
						|
            device_conn = self.device.props.Get(DEVICE_IFACE, 'Connected')
 | 
						|
            if device_conn == 1:
 | 
						|
                self.device.iface.Disconnect(dbus_interface=DEVICE_IFACE)
 | 
						|
                for cnt in range(10, 0, -1):
 | 
						|
                    time.sleep(5)
 | 
						|
                    device_conn = self.device.props.Get(DEVICE_IFACE, 'Connected')
 | 
						|
                    if device_conn == 0:
 | 
						|
                        print('device disconnected')
 | 
						|
                        break
 | 
						|
                    print('number of retries left ({})'.format(cnt - 1))
 | 
						|
                if device_conn == 1:
 | 
						|
                    print('failed to disconnect device')
 | 
						|
 | 
						|
                self.adapter.iface.RemoveDevice(self.device.iface)
 | 
						|
                self.device = None
 | 
						|
 | 
						|
        except dbus.exceptions.DBusException as dbus_err:
 | 
						|
            print('Warning: {}'.format(dbus_err))
 | 
						|
        except Exception as err:
 | 
						|
            raise Exception('Failed to disconnect device: {}'.format(err))
 |