diff --git a/tools/esp_prov/esp_prov.py b/tools/esp_prov/esp_prov.py index 0ea172da53..2f6c69c5e2 100644 --- a/tools/esp_prov/esp_prov.py +++ b/tools/esp_prov/esp_prov.py @@ -42,13 +42,19 @@ def get_transport(sel_transport, softap_endpoint=None, ble_devname=None): if (sel_transport == 'softap'): tp = transport.Transport_Softap(softap_endpoint) elif (sel_transport == 'ble'): - tp = transport.Transport_BLE(devname = ble_devname, - service_uuid = '0000ffff-0000-1000-8000-00805f9b34fb', - nu_lookup = { - 'prov-session': 'ff51', - 'prov-config' : 'ff52', - 'proto-ver' : 'ff53' - }) + # BLE client is now capable of automatically figuring out + # the primary service from the advertisement data and the + # characteristics corresponding to each endpoint. + # Below, the service_uuid field and 16bit UUIDs in the nu_lookup + # table are provided only to support devices running older firmware, + # in which case, the automated discovery will fail and the client + # will fallback to using the provided UUIDs instead + tp = transport.Transport_BLE(devname=ble_devname, + service_uuid='0000ffff-0000-1000-8000-00805f9b34fb', + nu_lookup={'prov-session': 'ff51', + 'prov-config': 'ff52', + 'proto-ver': 'ff53' + }) elif (sel_transport == 'console'): tp = transport.Transport_Console() return tp diff --git a/tools/esp_prov/transport/ble_cli.py b/tools/esp_prov/transport/ble_cli.py index d8b2dff882..0ff18c17f3 100644 --- a/tools/esp_prov/transport/ble_cli.py +++ b/tools/esp_prov/transport/ble_cli.py @@ -15,6 +15,7 @@ from __future__ import print_function from builtins import input +from future.utils import iteritems import platform @@ -37,20 +38,24 @@ if platform.system() == 'Linux': # BLE client (Linux Only) using Bluez and DBus class BLE_Bluez_Client: - def connect(self, devname, iface, srv_uuid): + def connect(self, devname, iface, chrc_names, fallback_srv_uuid): self.devname = devname - self.srv_uuid = srv_uuid + self.srv_uuid_fallback = fallback_srv_uuid + self.chrc_names = [name.lower() for name in chrc_names] self.device = None self.adapter = None self.adapter_props = None self.services = None + self.nu_lookup = None + self.characteristics = dict() + self.srv_uuid_adv = None dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) bus = dbus.SystemBus() manager = dbus.Interface(bus.get_object("org.bluez", "/"), "org.freedesktop.DBus.ObjectManager") objects = manager.GetManagedObjects() - for path, interfaces in objects.items(): + for path, interfaces in iteritems(objects): adapter = interfaces.get("org.bluez.Adapter1") if adapter != None: if path.endswith(iface): @@ -91,8 +96,8 @@ class BLE_Bluez_Client: manager = dbus.Interface(bus.get_object("org.bluez", "/"), "org.freedesktop.DBus.ObjectManager") objects = manager.GetManagedObjects() dev_path = None - for path, interfaces in objects.items(): - if "org.bluez.Device1" not in interfaces.keys(): + for path, interfaces in iteritems(objects): + if "org.bluez.Device1" not in interfaces: continue if interfaces["org.bluez.Device1"].get("Name") == self.devname: dev_path = path @@ -103,6 +108,19 @@ class BLE_Bluez_Client: try: self.device = bus.get_object("org.bluez", dev_path) + try: + uuids = self.device.Get('org.bluez.Device1', 'UUIDs', + dbus_interface='org.freedesktop.DBus.Properties') + # There should be 1 service UUID in advertising data + # If bluez had cached an old version of the advertisement data + # the list of uuids may be incorrect, in which case connection + # or service discovery may fail the first time. If that happens + # the cache will be refreshed before next retry + if len(uuids) == 1: + self.srv_uuid_adv = uuids[0] + except dbus.exceptions.DBusException as e: + print(e) + self.device.Connect(dbus_interface='org.bluez.Device1') except Exception as e: print(e) @@ -113,33 +131,84 @@ class BLE_Bluez_Client: bus = dbus.SystemBus() manager = dbus.Interface(bus.get_object("org.bluez", "/"), "org.freedesktop.DBus.ObjectManager") objects = manager.GetManagedObjects() - srv_path = None - for path, interfaces in objects.items(): - if "org.bluez.GattService1" not in interfaces.keys(): + service_found = False + for srv_path, srv_interfaces in iteritems(objects): + if "org.bluez.GattService1" not in srv_interfaces: continue - if path.startswith(self.device.object_path): - service = bus.get_object("org.bluez", path) - uuid = service.Get('org.bluez.GattService1', 'UUID', - dbus_interface='org.freedesktop.DBus.Properties') - if uuid == self.srv_uuid: - srv_path = path + if not srv_path.startswith(self.device.object_path): + continue + service = bus.get_object("org.bluez", srv_path) + srv_uuid = service.Get('org.bluez.GattService1', 'UUID', + dbus_interface='org.freedesktop.DBus.Properties') + + # If service UUID doesn't match the one found in advertisement data + # then also check if it matches the fallback UUID + if srv_uuid not in [self.srv_uuid_adv, self.srv_uuid_fallback]: + continue + + nu_lookup = dict() + characteristics = dict() + for chrc_path, chrc_interfaces in iteritems(objects): + if "org.bluez.GattCharacteristic1" not in chrc_interfaces: + continue + if not chrc_path.startswith(service.object_path): + continue + chrc = bus.get_object("org.bluez", chrc_path) + uuid = chrc.Get('org.bluez.GattCharacteristic1', 'UUID', + dbus_interface='org.freedesktop.DBus.Properties') + characteristics[uuid] = chrc + for desc_path, desc_interfaces in iteritems(objects): + if "org.bluez.GattDescriptor1" not in desc_interfaces: + continue + if not desc_path.startswith(chrc.object_path): + continue + desc = bus.get_object("org.bluez", desc_path) + desc_uuid = desc.Get('org.bluez.GattDescriptor1', 'UUID', + dbus_interface='org.freedesktop.DBus.Properties') + if desc_uuid[4:8] != '2901': + continue + try: + readval = desc.ReadValue({}, dbus_interface='org.bluez.GattDescriptor1') + except dbus.exceptions.DBusException: + break + found_name = ''.join(chr(b) for b in readval).lower() + nu_lookup[found_name] = uuid break - if srv_path == None: + match_found = True + for name in self.chrc_names: + if name not in nu_lookup: + # Endpoint name not present + match_found = False + break + + # Create lookup table only if all endpoint names found + self.nu_lookup = [None, nu_lookup][match_found] + self.characteristics = characteristics + service_found = True + + # If the service UUID matches that in the advertisement + # we can stop the search now. If it doesn't match, we + # have found the service corresponding to the fallback + # UUID, in which case don't break and keep searching + # for the advertised service + if srv_uuid == self.srv_uuid_adv: + break + + if not service_found: + self.device.Disconnect(dbus_interface='org.bluez.Device1') + if self.adapter: + self.adapter.RemoveDevice(self.device) + self.device = None + self.nu_lookup = None + self.characteristics = dict() raise RuntimeError("Provisioning service not found") - self.characteristics = dict() - for path, interfaces in objects.items(): - if "org.bluez.GattCharacteristic1" not in interfaces.keys(): - continue - if path.startswith(srv_path): - chrc = bus.get_object("org.bluez", path) - uuid = chrc.Get('org.bluez.GattCharacteristic1', 'UUID', - dbus_interface='org.freedesktop.DBus.Properties') - self.characteristics[uuid] = chrc + def get_nu_lookup(self): + return self.nu_lookup def has_characteristic(self, uuid): - if uuid in self.characteristics.keys(): + if uuid in self.characteristics: return True return False @@ -148,6 +217,9 @@ class BLE_Bluez_Client: self.device.Disconnect(dbus_interface='org.bluez.Device1') if self.adapter: self.adapter.RemoveDevice(self.device) + self.device = None + self.nu_lookup = None + self.characteristics = dict() if self.adapter_props: self.adapter_props.Set("org.bluez.Adapter1", "Powered", dbus.Boolean(0)) @@ -163,7 +235,7 @@ class BLE_Bluez_Client: # Console based BLE client for Cross Platform support class BLE_Console_Client: - def connect(self, devname, iface, srv_uuid): + def connect(self, devname, iface, chrc_names, fallback_srv_uuid): print("BLE client is running in console mode") print("\tThis could be due to your platform not being supported or dependencies not being met") print("\tPlease ensure all pre-requisites are met to run the full fledged client") @@ -172,11 +244,14 @@ class BLE_Console_Client: if resp != 'Y' and resp != 'y': return False print("BLECLI >> List available attributes of the connected device") - resp = input("BLECLI >> Is the service UUID '" + srv_uuid + "' listed among available attributes? [y/n] ") + resp = input("BLECLI >> Is the service UUID '" + fallback_srv_uuid + "' listed among available attributes? [y/n] ") if resp != 'Y' and resp != 'y': return False return True + def get_nu_lookup(self): + return None + def has_characteristic(self, uuid): resp = input("BLECLI >> Is the characteristic UUID '" + uuid + "' listed among available attributes? [y/n] ") if resp != 'Y' and resp != 'y': diff --git a/tools/esp_prov/transport/transport_ble.py b/tools/esp_prov/transport/transport_ble.py index 20cb144f18..af26a2c700 100644 --- a/tools/esp_prov/transport/transport_ble.py +++ b/tools/esp_prov/transport/transport_ble.py @@ -26,19 +26,28 @@ class Transport_BLE(Transport): # Calculate characteristic UUID for each endpoint nu_lookup[name] = service_uuid[:4] + '{:02x}'.format( int(nu_lookup[name], 16) & int(service_uuid[4:8], 16)) + service_uuid[8:] - self.name_uuid_lookup = nu_lookup # Get BLE client module self.cli = ble_cli.get_client() # Use client to connect to BLE device and bind to service - if not self.cli.connect(devname = devname, iface = 'hci0', srv_uuid = service_uuid): + if not self.cli.connect(devname=devname, iface='hci0', + chrc_names=nu_lookup.keys(), + fallback_srv_uuid=service_uuid): raise RuntimeError("Failed to initialize transport") - # Check if expected characteristics are provided by the service - for name in self.name_uuid_lookup.keys(): - if not self.cli.has_characteristic(self.name_uuid_lookup[name]): - raise RuntimeError("'" + name + "' endpoint not found") + # Irrespective of provided parameters, let the client + # generate a lookup table by reading advertisement data + # and characteristic user descriptors + self.name_uuid_lookup = self.cli.get_nu_lookup() + + # If that doesn't work, use the lookup table provided as parameter + if self.name_uuid_lookup is None: + self.name_uuid_lookup = nu_lookup + # Check if expected characteristics are provided by the service + for name in self.name_uuid_lookup.keys(): + if not self.cli.has_characteristic(self.name_uuid_lookup[name]): + raise RuntimeError("'" + name + "' endpoint not found") def __del__(self): # Make sure device is disconnected before application gets closed