mirror of
https://github.com/espressif/esp-idf.git
synced 2025-11-15 15:00:02 +01:00
ble-wifi-example-tests: Add fixes and cleanups to ble and wifi tests
This commit is contained in:
@@ -42,13 +42,15 @@ if platform.system() == 'Linux':
|
||||
|
||||
# BLE client (Linux Only) using Bluez and DBus
|
||||
class BLE_Bluez_Client:
|
||||
def __init__(self):
|
||||
self.adapter_props = None
|
||||
|
||||
def connect(self, devname, iface, chrc_names, fallback_srv_uuid):
|
||||
self.devname = devname
|
||||
self.srv_uuid_fallback = fallback_srv_uuid
|
||||
self.chrc_names = [name.lower() for name in chrc_names]
|
||||
self.device = None
|
||||
self.adapter = None
|
||||
self.adapter_props = None
|
||||
self.services = None
|
||||
self.nu_lookup = None
|
||||
self.characteristics = dict()
|
||||
@@ -58,20 +60,53 @@ class BLE_Bluez_Client:
|
||||
bus = dbus.SystemBus()
|
||||
manager = dbus.Interface(bus.get_object('org.bluez', '/'), 'org.freedesktop.DBus.ObjectManager')
|
||||
objects = manager.GetManagedObjects()
|
||||
|
||||
adapter_path = None
|
||||
for path, interfaces in iteritems(objects):
|
||||
adapter = interfaces.get('org.bluez.Adapter1')
|
||||
if adapter is not None:
|
||||
if path.endswith(iface):
|
||||
self.adapter = dbus.Interface(bus.get_object('org.bluez', path), 'org.bluez.Adapter1')
|
||||
self.adapter_props = dbus.Interface(bus.get_object('org.bluez', path), 'org.freedesktop.DBus.Properties')
|
||||
adapter_path = path
|
||||
break
|
||||
|
||||
if self.adapter is None:
|
||||
raise RuntimeError('Bluetooth adapter not found')
|
||||
|
||||
# Power on bluetooth adapter
|
||||
self.adapter_props.Set('org.bluez.Adapter1', 'Powered', dbus.Boolean(1))
|
||||
self.adapter.StartDiscovery()
|
||||
print('checking if adapter is powered on')
|
||||
for cnt in range(10, 0, -1):
|
||||
time.sleep(5)
|
||||
powered_on = self.adapter_props.Get('org.bluez.Adapter1', 'Powered')
|
||||
if powered_on == 1:
|
||||
# Set adapter props again with powered on value
|
||||
self.adapter_props = dbus.Interface(bus.get_object('org.bluez', adapter_path), 'org.freedesktop.DBus.Properties')
|
||||
print('bluetooth adapter powered on')
|
||||
break
|
||||
print('number of retries left({})'.format(cnt - 1))
|
||||
if powered_on == 0:
|
||||
raise RuntimeError('Failed to starte bluetooth adapter')
|
||||
|
||||
# Start discovery if not already discovering
|
||||
started_discovery = 0
|
||||
discovery_val = self.adapter_props.Get('org.bluez.Adapter1', 'Discovering')
|
||||
if discovery_val == 0:
|
||||
print('starting discovery')
|
||||
self.adapter.StartDiscovery()
|
||||
# Set as start discovery is called
|
||||
started_discovery = 1
|
||||
for cnt in range(10, 0, -1):
|
||||
time.sleep(5)
|
||||
discovery_val = self.adapter_props.Get('org.bluez.Adapter1', 'Discovering')
|
||||
if discovery_val == 1:
|
||||
print('start discovery successful')
|
||||
break
|
||||
print('number of retries left ({})'.format(cnt - 1))
|
||||
|
||||
if discovery_val == 0:
|
||||
print('start discovery failed')
|
||||
raise RuntimeError('Failed to start discovery')
|
||||
|
||||
retry = 10
|
||||
while (retry > 0):
|
||||
@@ -80,8 +115,11 @@ class BLE_Bluez_Client:
|
||||
print('Connecting...')
|
||||
# Wait for device to be discovered
|
||||
time.sleep(5)
|
||||
self._connect_()
|
||||
print('Connected')
|
||||
connected = self._connect_()
|
||||
if connected:
|
||||
print('Connected')
|
||||
else:
|
||||
return False
|
||||
print('Getting Services...')
|
||||
# Wait for services to be discovered
|
||||
time.sleep(5)
|
||||
@@ -92,7 +130,21 @@ class BLE_Bluez_Client:
|
||||
retry -= 1
|
||||
print('Retries left', retry)
|
||||
continue
|
||||
self.adapter.StopDiscovery()
|
||||
|
||||
# Call StopDiscovery() for corresponding StartDiscovery() session
|
||||
if started_discovery == 1:
|
||||
print('stopping discovery')
|
||||
self.adapter.StopDiscovery()
|
||||
for cnt in range(10, 0, -1):
|
||||
time.sleep(5)
|
||||
discovery_val = self.adapter_props.Get('org.bluez.Adapter1', 'Discovering')
|
||||
if discovery_val == 0:
|
||||
print('stop discovery successful')
|
||||
break
|
||||
print('number of retries left ({})'.format(cnt - 1))
|
||||
if discovery_val == 1:
|
||||
print('stop discovery failed')
|
||||
|
||||
return False
|
||||
|
||||
def _connect_(self):
|
||||
@@ -123,9 +175,26 @@ class BLE_Bluez_Client:
|
||||
if len(uuids) == 1:
|
||||
self.srv_uuid_adv = uuids[0]
|
||||
except dbus.exceptions.DBusException as e:
|
||||
print(e)
|
||||
raise RuntimeError(e)
|
||||
|
||||
self.device.Connect(dbus_interface='org.bluez.Device1')
|
||||
# Check device is connected successfully
|
||||
for cnt in range(10, 0, -1):
|
||||
time.sleep(5)
|
||||
device_conn = self.device.Get(
|
||||
'org.bluez.Device1',
|
||||
'Connected',
|
||||
dbus_interface='org.freedesktop.DBus.Properties')
|
||||
if device_conn == 1:
|
||||
print('device is connected')
|
||||
break
|
||||
print('number of retries left ({})'.format(cnt - 1))
|
||||
if device_conn == 0:
|
||||
print('failed to connect device')
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
print(e)
|
||||
self.device = None
|
||||
@@ -173,8 +242,8 @@ class BLE_Bluez_Client:
|
||||
continue
|
||||
try:
|
||||
readval = desc.ReadValue({}, dbus_interface='org.bluez.GattDescriptor1')
|
||||
except dbus.exceptions.DBusException:
|
||||
break
|
||||
except dbus.exceptions.DBusException as err:
|
||||
raise RuntimeError('Failed to read value for descriptor while getting services - {}'.format(err))
|
||||
found_name = ''.join(chr(b) for b in readval).lower()
|
||||
nu_lookup[found_name] = uuid
|
||||
break
|
||||
@@ -201,6 +270,8 @@ class BLE_Bluez_Client:
|
||||
|
||||
if not service_found:
|
||||
self.device.Disconnect(dbus_interface='org.bluez.Device1')
|
||||
# Check if device is disconnected successfully
|
||||
self._check_device_disconnected()
|
||||
if self.adapter:
|
||||
self.adapter.RemoveDevice(self.device)
|
||||
self.device = None
|
||||
@@ -219,6 +290,8 @@ class BLE_Bluez_Client:
|
||||
def disconnect(self):
|
||||
if self.device:
|
||||
self.device.Disconnect(dbus_interface='org.bluez.Device1')
|
||||
# Check if device is disconnected successfully
|
||||
self._check_device_disconnected()
|
||||
if self.adapter:
|
||||
self.adapter.RemoveDevice(self.device)
|
||||
self.device = None
|
||||
@@ -227,6 +300,20 @@ class BLE_Bluez_Client:
|
||||
if self.adapter_props:
|
||||
self.adapter_props.Set('org.bluez.Adapter1', 'Powered', dbus.Boolean(0))
|
||||
|
||||
def _check_device_disconnected(self):
|
||||
for cnt in range(10, 0, -1):
|
||||
time.sleep(5)
|
||||
device_conn = self.device.Get(
|
||||
'org.bluez.Device1',
|
||||
'Connected',
|
||||
dbus_interface='org.freedesktop.DBus.Properties')
|
||||
if device_conn == 0:
|
||||
print('device disconnected')
|
||||
break
|
||||
print('number of retries left ({})'.format(cnt - 1))
|
||||
if device_conn == 1:
|
||||
print('failed to disconnect device')
|
||||
|
||||
def send_data(self, characteristic_uuid, data):
|
||||
try:
|
||||
path = self.characteristics[characteristic_uuid]
|
||||
|
||||
Reference in New Issue
Block a user