ci: Fix blecent example test

(cherry picked from commit fc146a98e4)
This commit is contained in:
Shivani Tipnis
2020-11-20 12:13:15 +05:30
parent c33fc7821a
commit cbabe5bacc
2 changed files with 209 additions and 149 deletions

View File

@ -109,21 +109,21 @@ def test_example_app_ble_central(env, extra_data):
ble_client_obj.disconnect() ble_client_obj.disconnect()
# Check dut responses # Check dut responses
dut.expect("Connection established", timeout=30) dut.expect("Connection established", timeout=60)
dut.expect("Service discovery complete; status=0", timeout=30) dut.expect("Service discovery complete; status=0", timeout=60)
print("Service discovery passed\n\tService Discovery Status: 0") print("Service discovery passed\n\tService Discovery Status: 0")
dut.expect("GATT procedure initiated: read;", timeout=30) dut.expect("GATT procedure initiated: read;", timeout=60)
dut.expect("Read complete; status=0", timeout=30) dut.expect("Read complete; status=0", timeout=60)
print("Read passed\n\tSupportedNewAlertCategoryCharacteristic\n\tRead Status: 0") print("Read passed\n\tSupportedNewAlertCategoryCharacteristic\n\tRead Status: 0")
dut.expect("GATT procedure initiated: write;", timeout=30) dut.expect("GATT procedure initiated: write;", timeout=60)
dut.expect("Write complete; status=0", timeout=30) dut.expect("Write complete; status=0", timeout=60)
print("Write passed\n\tAlertNotificationControlPointCharacteristic\n\tWrite Status: 0") print("Write passed\n\tAlertNotificationControlPointCharacteristic\n\tWrite Status: 0")
dut.expect("GATT procedure initiated: write;", timeout=30) dut.expect("GATT procedure initiated: write;", timeout=60)
dut.expect("Subscribe complete; status=0", timeout=30) dut.expect("Subscribe complete; status=0", timeout=60)
print("Subscribe passed\n\tClientCharacteristicConfigurationDescriptor\n\tSubscribe Status: 0") print("Subscribe passed\n\tClientCharacteristicConfigurationDescriptor\n\tSubscribe Status: 0")

View File

@ -47,26 +47,33 @@ srv_added_old_cnt = 0
srv_added_new_cnt = 0 srv_added_new_cnt = 0
verify_signal_check = 0 verify_signal_check = 0
blecent_retry_check_cnt = 0 blecent_retry_check_cnt = 0
gatt_app_retry_check_cnt = 0
verify_service_cnt = 0 verify_service_cnt = 0
verify_readchars_cnt = 0 verify_readchars_cnt = 0
adv_retry_check_cnt = 0
blecent_adv_uuid = '1811' blecent_adv_uuid = '1811'
gatt_app_obj_check = False gatt_app_obj_check = False
gatt_app_reg_check = False gatt_app_reg_check = False
adv_checks_done = False
gatt_checks_done = False
adv_data_check = False adv_data_check = False
adv_reg_check = False adv_reg_check = False
read_req_check = False read_req_check = False
write_req_check = False write_req_check = False
subscribe_req_check = False subscribe_req_check = False
ble_hr_chrc = False ble_hr_chrc = False
discovery_start = False
DISCOVERY_START = False signal_caught = False
SIGNAL_CAUGHT = False test_checks_pass = False
adv_stop = False
TEST_CHECKS_PASS = False services_resolved = False
ADV_STOP = False service_uuid_found = False
adapter_on = False
SERVICES_RESOLVED = False device_connected = False
SERVICE_UUID_FOUND = False gatt_app_registered = False
adv_registered = False
adv_active_instance = False
chrc_value_cnt = False
BLUEZ_SERVICE_NAME = 'org.bluez' BLUEZ_SERVICE_NAME = 'org.bluez'
DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager' DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
@ -81,13 +88,6 @@ LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1'
GATT_SERVICE_IFACE = 'org.bluez.GattService1' GATT_SERVICE_IFACE = 'org.bluez.GattService1'
GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1' GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1'
ADAPTER_ON = False
DEVICE_CONNECTED = False
GATT_APP_REGISTERED = False
ADV_REGISTERED = False
ADV_ACTIVE_INSTANCE = False
CHRC_VALUE_CNT = False
# Set up the main loop. # Set up the main loop.
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
@ -100,7 +100,7 @@ def verify_signal_is_caught():
global verify_signal_check global verify_signal_check
verify_signal_check += 1 verify_signal_check += 1
if (not SIGNAL_CAUGHT and verify_signal_check == 15) or (SIGNAL_CAUGHT): if (not signal_caught and verify_signal_check == 15) or (signal_caught):
if event_loop.is_running(): if event_loop.is_running():
event_loop.quit() event_loop.quit()
return False # polling for checks will stop return False # polling for checks will stop
@ -112,25 +112,25 @@ def set_props_status(props):
""" """
Set Adapter status if it is powered on or off Set Adapter status if it is powered on or off
""" """
global ADAPTER_ON, SERVICES_RESOLVED, GATT_OBJ_REMOVED, GATT_APP_REGISTERED, \ global adapter_on, services_resolved, GATT_OBJ_REMOVED, gatt_app_registered, \
ADV_REGISTERED, ADV_ACTIVE_INSTANCE, DEVICE_CONNECTED, CHRC_VALUE, CHRC_VALUE_CNT, \ adv_registered, adv_active_instance, device_connected, CHRC_VALUE, chrc_value_cnt, \
SIGNAL_CAUGHT signal_caught
is_service_uuid = False is_service_uuid = False
# Signal caught for change in Adapter Powered property # Signal caught for change in Adapter Powered property
if 'Powered' in props: if 'Powered' in props:
if props['Powered'] == 1: if props['Powered'] == 1:
SIGNAL_CAUGHT = True signal_caught = True
ADAPTER_ON = True adapter_on = True
else: else:
SIGNAL_CAUGHT = True signal_caught = True
ADAPTER_ON = False adapter_on = False
if 'ServicesResolved' in props: if 'ServicesResolved' in props:
if props['ServicesResolved'] == 1: if props['ServicesResolved'] == 1:
SIGNAL_CAUGHT = True signal_caught = True
SERVICES_RESOLVED = True services_resolved = True
else: else:
SIGNAL_CAUGHT = True signal_caught = True
SERVICES_RESOLVED = False services_resolved = False
if 'UUIDs' in props: if 'UUIDs' in props:
# Signal caught for add/remove GATT data having service uuid # Signal caught for add/remove GATT data having service uuid
for uuid in props['UUIDs']: for uuid in props['UUIDs']:
@ -139,31 +139,31 @@ def set_props_status(props):
if not is_service_uuid: if not is_service_uuid:
# Signal caught for removing GATT data having service uuid # Signal caught for removing GATT data having service uuid
# and for unregistering GATT application # and for unregistering GATT application
GATT_APP_REGISTERED = False gatt_app_registered = False
lib_gatt.GATT_APP_OBJ = False lib_gatt.GATT_APP_OBJ = False
if 'ActiveInstances' in props: if 'ActiveInstances' in props:
# Signal caught for Advertising - add/remove Instances property # Signal caught for Advertising - add/remove Instances property
if props['ActiveInstances'] == 1: if props['ActiveInstances'] == 1:
ADV_ACTIVE_INSTANCE = True adv_active_instance = True
elif props['ActiveInstances'] == 0: elif props['ActiveInstances'] == 0:
ADV_ACTIVE_INSTANCE = False adv_active_instance = False
ADV_REGISTERED = False adv_registered = False
lib_gap.ADV_OBJ = False lib_gap.ADV_OBJ = False
if 'Connected' in props: if 'Connected' in props:
# Signal caught for device connect/disconnect # Signal caught for device connect/disconnect
if props['Connected'] == 1: if props['Connected'] == 1:
SIGNAL_CAUGHT = True signal_caught = True
DEVICE_CONNECTED = True device_connected = True
else: else:
SIGNAL_CAUGHT = True signal_caught = True
DEVICE_CONNECTED = False device_connected = False
if 'Value' in props: if 'Value' in props:
# Signal caught for change in chars value # Signal caught for change in chars value
if ble_hr_chrc: if ble_hr_chrc:
CHRC_VALUE_CNT += 1 chrc_value_cnt += 1
print(props['Value']) print(props['Value'])
if CHRC_VALUE_CNT == 10: if chrc_value_cnt == 10:
SIGNAL_CAUGHT = True signal_caught = True
return False return False
return False return False
@ -221,9 +221,9 @@ class BLE_Bluez_Client:
Discover Bluetooth Adapter Discover Bluetooth Adapter
Power On Bluetooth Adapter Power On Bluetooth Adapter
''' '''
global verify_signal_check, SIGNAL_CAUGHT, ADAPTER_ON global verify_signal_check, signal_caught, adapter_on
verify_signal_check = 0 verify_signal_check = 0
ADAPTER_ON = False adapter_on = False
try: try:
print("discovering adapter...") print("discovering adapter...")
for path, interfaces in self.ble_objs.items(): for path, interfaces in self.ble_objs.items():
@ -247,7 +247,7 @@ class BLE_Bluez_Client:
print("bluetooth adapter discovered") print("bluetooth adapter discovered")
# Check if adapter is already powered on # Check if adapter is already powered on
if ADAPTER_ON: if adapter_on:
print("Adapter already powered on") print("Adapter already powered on")
return True return True
@ -256,11 +256,11 @@ class BLE_Bluez_Client:
self.props_iface_obj.connect_to_signal('PropertiesChanged', props_change_handler) self.props_iface_obj.connect_to_signal('PropertiesChanged', props_change_handler)
self.props_iface_obj.Set(ADAPTER_IFACE, "Powered", dbus.Boolean(1)) self.props_iface_obj.Set(ADAPTER_IFACE, "Powered", dbus.Boolean(1))
SIGNAL_CAUGHT = False signal_caught = False
GLib.timeout_add_seconds(5, verify_signal_is_caught) GLib.timeout_add_seconds(5, verify_signal_is_caught)
event_loop.run() event_loop.run()
if ADAPTER_ON: if adapter_on:
print("bluetooth adapter powered on") print("bluetooth adapter powered on")
return True return True
else: else:
@ -275,14 +275,14 @@ class BLE_Bluez_Client:
Connect to the device discovered Connect to the device discovered
Retry 10 times to discover and connect to device Retry 10 times to discover and connect to device
''' '''
global DISCOVERY_START, SIGNAL_CAUGHT, DEVICE_CONNECTED, verify_signal_check global discovery_start, signal_caught, device_connected, verify_signal_check
device_found = False device_found = False
DEVICE_CONNECTED = False device_connected = False
try: try:
self.adapter.StartDiscovery() self.adapter.StartDiscovery()
print("\nStarted Discovery") print("\nStarted Discovery")
DISCOVERY_START = True discovery_start = True
for retry_cnt in range(10,0,-1): for retry_cnt in range(10,0,-1):
verify_signal_check = 0 verify_signal_check = 0
@ -295,10 +295,10 @@ class BLE_Bluez_Client:
if device_found: if device_found:
self.device.Connect(dbus_interface=DEVICE_IFACE) self.device.Connect(dbus_interface=DEVICE_IFACE)
time.sleep(15) time.sleep(15)
SIGNAL_CAUGHT = False signal_caught = False
GLib.timeout_add_seconds(5, verify_signal_is_caught) GLib.timeout_add_seconds(5, verify_signal_is_caught)
event_loop.run() event_loop.run()
if DEVICE_CONNECTED: if device_connected:
print("\nConnected to device") print("\nConnected to device")
return True return True
else: else:
@ -361,21 +361,21 @@ class BLE_Bluez_Client:
''' '''
Verify service uuid found Verify service uuid found
''' '''
global SERVICE_UUID_FOUND global service_uuid_found
srv_uuid_found = [uuid for uuid in self.srv_uuid if service_uuid in uuid] srv_uuid_found = [uuid for uuid in self.srv_uuid if service_uuid in uuid]
if srv_uuid_found: if srv_uuid_found:
SERVICE_UUID_FOUND = True service_uuid_found = True
def get_services(self, service_uuid=None): def get_services(self, service_uuid=None):
''' '''
Retrieve Services found in the device connected Retrieve Services found in the device connected
''' '''
global SIGNAL_CAUGHT, SERVICE_UUID_FOUND, SERVICES_RESOLVED, verify_signal_check global signal_caught, service_uuid_found, services_resolved, verify_signal_check
verify_signal_check = 0 verify_signal_check = 0
SERVICE_UUID_FOUND = False service_uuid_found = False
SERVICES_RESOLVED = False services_resolved = False
SIGNAL_CAUGHT = False signal_caught = False
try: try:
om_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, "/"), DBUS_OM_IFACE) om_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, "/"), DBUS_OM_IFACE)
@ -387,12 +387,12 @@ class BLE_Bluez_Client:
GLib.timeout_add_seconds(5, verify_signal_is_caught) GLib.timeout_add_seconds(5, verify_signal_is_caught)
om_iface_obj.connect_to_signal('InterfacesAdded', self.srvc_iface_added_handler) om_iface_obj.connect_to_signal('InterfacesAdded', self.srvc_iface_added_handler)
event_loop.run() event_loop.run()
if not SERVICES_RESOLVED: if not services_resolved:
raise Exception("Services not found...") raise Exception("Services not found...")
if service_uuid: if service_uuid:
self.verify_service_uuid_found(service_uuid) self.verify_service_uuid_found(service_uuid)
if not SERVICE_UUID_FOUND: if not service_uuid_found:
raise Exception("Service with uuid: %s not found..." % service_uuid) raise Exception("Service with uuid: %s not found..." % service_uuid)
# Services found # Services found
@ -405,7 +405,7 @@ class BLE_Bluez_Client:
''' '''
Add services found as lib_ble_client obj Add services found as lib_ble_client obj
''' '''
global chrc, chrc_discovered, SIGNAL_CAUGHT global chrc, chrc_discovered, signal_caught
chrc_val = None chrc_val = None
if self.device and path.startswith(self.device.object_path): if self.device and path.startswith(self.device.object_path):
@ -418,17 +418,17 @@ class BLE_Bluez_Client:
chrc_val = chrc.ReadValue({}, dbus_interface=GATT_CHRC_IFACE) chrc_val = chrc.ReadValue({}, dbus_interface=GATT_CHRC_IFACE)
uuid = chrc_props['UUID'] uuid = chrc_props['UUID']
self.chars[path] = chrc_val, chrc_flags, uuid self.chars[path] = chrc_val, chrc_flags, uuid
SIGNAL_CAUGHT = True signal_caught = True
def read_chars(self): def read_chars(self):
''' '''
Read characteristics found in the device connected Read characteristics found in the device connected
''' '''
global iface_added, chrc_discovered, SIGNAL_CAUGHT, verify_signal_check global iface_added, chrc_discovered, signal_caught, verify_signal_check
verify_signal_check = 0 verify_signal_check = 0
chrc_discovered = False chrc_discovered = False
iface_added = False iface_added = False
SIGNAL_CAUGHT = False signal_caught = False
try: try:
om_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, "/"), DBUS_OM_IFACE) om_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, "/"), DBUS_OM_IFACE)
@ -484,7 +484,7 @@ class BLE_Bluez_Client:
Retrieve updated value Retrieve updated value
Stop Notifications Stop Notifications
''' '''
global ble_hr_chrc, verify_signal_check, SIGNAL_CAUGHT, CHRC_VALUE_CNT global ble_hr_chrc, verify_signal_check, signal_caught, chrc_value_cnt
srv_path = None srv_path = None
chrc = None chrc = None
@ -492,7 +492,7 @@ class BLE_Bluez_Client:
chrc_path = None chrc_path = None
chars_ret = None chars_ret = None
ble_hr_chrc = True ble_hr_chrc = True
CHRC_VALUE_CNT = 0 chrc_value_cnt = 0
try: try:
# Get HR Measurement characteristic # Get HR Measurement characteristic
@ -523,7 +523,7 @@ class BLE_Bluez_Client:
chrc_props_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, chrc_path), DBUS_PROP_IFACE) chrc_props_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, chrc_path), DBUS_PROP_IFACE)
chrc_props_iface_obj.connect_to_signal('PropertiesChanged', props_change_handler) chrc_props_iface_obj.connect_to_signal('PropertiesChanged', props_change_handler)
SIGNAL_CAUGHT = False signal_caught = False
verify_signal_check = 0 verify_signal_check = 0
GLib.timeout_add_seconds(5, verify_signal_is_caught) GLib.timeout_add_seconds(5, verify_signal_is_caught)
event_loop.run() event_loop.run()
@ -538,16 +538,16 @@ class BLE_Bluez_Client:
print(traceback.format_exc()) print(traceback.format_exc())
return False return False
def create_gatt_app(self): def create_and_reg_gatt_app(self):
''' '''
Create GATT data Create GATT data
Register GATT Application Register GATT Application
''' '''
global gatt_app_obj, gatt_manager_iface_obj, GATT_APP_REGISTERED global gatt_app_obj, gatt_manager_iface_obj, gatt_app_registered
gatt_app_obj = None gatt_app_obj = None
gatt_manager_iface_obj = None gatt_manager_iface_obj = None
GATT_APP_REGISTERED = False gatt_app_registered = False
lib_gatt.GATT_APP_OBJ = False lib_gatt.GATT_APP_OBJ = False
try: try:
@ -566,15 +566,15 @@ class BLE_Bluez_Client:
''' '''
GATT Application Register success handler GATT Application Register success handler
''' '''
global GATT_APP_REGISTERED global gatt_app_registered
GATT_APP_REGISTERED = True gatt_app_registered = True
def gatt_app_error_handler(self, error): def gatt_app_error_handler(self, error):
''' '''
GATT Application Register error handler GATT Application Register error handler
''' '''
global GATT_APP_REGISTERED global gatt_app_registered
GATT_APP_REGISTERED = False gatt_app_registered = False
def start_advertising(self, adv_host_name, adv_iface_index, adv_type, adv_uuid): def start_advertising(self, adv_host_name, adv_iface_index, adv_type, adv_uuid):
''' '''
@ -582,28 +582,47 @@ class BLE_Bluez_Client:
Register Advertisement Register Advertisement
Start Advertising Start Advertising
''' '''
global le_adv_obj, le_adv_manager_iface_obj, ADV_ACTIVE_INSTANCE, ADV_REGISTERED global le_adv_obj, le_adv_manager_iface_obj, adv_active_instance, adv_registered
le_adv_obj = None le_adv_obj = None
le_adv_manager_iface_obj = None le_adv_manager_iface_obj = None
le_adv_iface_path = None le_adv_iface_path = None
ADV_ACTIVE_INSTANCE = False adv_active_instance = False
ADV_REGISTERED = False adv_registered = False
lib_gap.ADV_OBJ = False lib_gap.ADV_OBJ = False
try: try:
print("Advertising started") print("Advertising started")
gatt_app_ret = self.create_gatt_app() gatt_app_ret = self.create_and_reg_gatt_app()
if not gatt_app_ret: # Check if gatt app create and register command
raise Exception # is sent successfully
assert gatt_app_ret
GLib.timeout_add_seconds(2, self.verify_gatt_app_reg)
event_loop.run()
# Check if Gatt Application is registered
assert gatt_app_registered
for path,interface in self.ble_objs.items(): for path,interface in self.ble_objs.items():
if LE_ADVERTISING_MANAGER_IFACE in interface: if LE_ADVERTISING_MANAGER_IFACE in interface:
le_adv_iface_path = path le_adv_iface_path = path
if le_adv_iface_path is None: # Check LEAdvertisingManager1 interface is found
raise Exception('\n Cannot start advertising. LEAdvertisingManager1 Interface not found') assert le_adv_iface_path, '\n Cannot start advertising. LEAdvertisingManager1 Interface not found'
# Get device when connected
if not self.device:
om_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, "/"), DBUS_OM_IFACE)
self.ble_objs = om_iface_obj.GetManagedObjects()
for path, interfaces in self.ble_objs.items():
if DEVICE_IFACE not in interfaces.keys():
continue
device_props_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, path), DBUS_PROP_IFACE)
device_props_iface_obj.connect_to_signal('PropertiesChanged', props_change_handler)
self.device = self.bus.get_object(BLUEZ_SERVICE_NAME, path)
le_adv_obj = lib_gap.Advertisement(self.bus, adv_iface_index, adv_type, adv_uuid, adv_host_name) le_adv_obj = lib_gap.Advertisement(self.bus, adv_iface_index, adv_type, adv_uuid, adv_host_name)
le_adv_manager_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, le_adv_iface_path), LE_ADVERTISING_MANAGER_IFACE) le_adv_manager_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, le_adv_iface_path), LE_ADVERTISING_MANAGER_IFACE)
@ -612,13 +631,30 @@ class BLE_Bluez_Client:
reply_handler=self.adv_handler, reply_handler=self.adv_handler,
error_handler=self.adv_error_handler) error_handler=self.adv_error_handler)
GLib.timeout_add_seconds(2, self.verify_adv_reg)
event_loop.run()
# Check if advertising is registered
assert adv_registered
ret_val = self.verify_blecent_app()
# Check if blecent has passed
assert ret_val
except Exception:
print(traceback.format_exc())
return False
def verify_blecent_app(self):
'''
Verify read/write/subscribe operations
'''
try:
GLib.timeout_add_seconds(2, self.verify_blecent) GLib.timeout_add_seconds(2, self.verify_blecent)
event_loop.run() event_loop.run()
if TEST_CHECKS_PASS: return test_checks_pass
return True
else:
raise Exception
except Exception: except Exception:
print(traceback.format_exc()) print(traceback.format_exc())
@ -628,47 +664,83 @@ class BLE_Bluez_Client:
''' '''
Advertisement Register success handler Advertisement Register success handler
''' '''
global ADV_REGISTERED global adv_registered
ADV_REGISTERED = True adv_registered = True
def adv_error_handler(self, error): def adv_error_handler(self, error):
''' '''
Advertisement Register error handler Advertisement Register error handler
''' '''
global ADV_REGISTERED global adv_registered
ADV_REGISTERED = False adv_registered = False
def verify_gatt_app_reg(self):
"""
Verify GATT Application is registered
"""
global gatt_app_retry_check_cnt, gatt_checks_done
gatt_app_retry_check_cnt = 0
gatt_checks_done = False
# Check for success
if lib_gatt.GATT_APP_OBJ:
print("GATT Data created")
if gatt_app_registered:
print("GATT Application registered")
gatt_checks_done = True
if gatt_app_retry_check_cnt == 20:
if not gatt_app_registered:
print("Failure: GATT Application could not be registered")
gatt_checks_done = True
# End polling if app is registered or cnt has reached 10
if gatt_checks_done:
if event_loop.is_running():
event_loop.quit()
return False # polling for checks will stop
gatt_app_retry_check_cnt += 1
# Default return True - polling for checks will continue
return True
def verify_adv_reg(self):
"""
Verify Advertisement is registered
"""
global adv_retry_check_cnt, adv_checks_done
adv_retry_check_cnt = 0
adv_checks_done = False
if lib_gap.ADV_OBJ:
print("Advertising data created")
if adv_registered or adv_active_instance:
print("Advertisement registered")
adv_checks_done = True
if adv_retry_check_cnt == 10:
if not adv_registered and not adv_active_instance:
print("Failure: Advertisement could not be registered")
adv_checks_done = True
# End polling if success or cnt has reached 10
if adv_checks_done:
if event_loop.is_running():
event_loop.quit()
return False # polling for checks will stop
adv_retry_check_cnt += 1
# Default return True - polling for checks will continue
return True
def verify_blecent(self): def verify_blecent(self):
""" """
Verify blecent test commands are successful Verify blecent test commands are successful
""" """
global blecent_retry_check_cnt, gatt_app_obj_check, gatt_app_reg_check,\ global blecent_retry_check_cnt, read_req_check, write_req_check,\
adv_data_check, adv_reg_check, read_req_check, write_req_check,\ subscribe_req_check, test_checks_pass
subscribe_req_check, TEST_CHECKS_PASS
# Get device when connected
if not self.device:
om_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, "/"), DBUS_OM_IFACE)
self.ble_objs = om_iface_obj.GetManagedObjects()
for path, interfaces in self.ble_objs.items():
if DEVICE_IFACE not in interfaces.keys():
continue
device_props_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, path), DBUS_PROP_IFACE)
device_props_iface_obj.connect_to_signal('PropertiesChanged', props_change_handler)
self.device = self.bus.get_object(BLUEZ_SERVICE_NAME, path)
# Check for failures after 10 retries # Check for failures after 10 retries
if blecent_retry_check_cnt == 10: if blecent_retry_check_cnt == 10:
# check for failures # check for failures
if not gatt_app_obj_check:
print("Failure: GATT Data could not be created")
if not gatt_app_reg_check:
print("Failure: GATT Application could not be registered")
if not adv_data_check:
print("Failure: Advertising data could not be created")
if not adv_reg_check:
print("Failure: Advertisement could not be registered")
if not read_req_check: if not read_req_check:
print("Failure: Read Request not received") print("Failure: Read Request not received")
if not write_req_check: if not write_req_check:
@ -677,23 +749,11 @@ class BLE_Bluez_Client:
print("Failure: Subscribe Request not received") print("Failure: Subscribe Request not received")
# Blecent Test failed # Blecent Test failed
TEST_CHECKS_PASS = False test_checks_pass = False
if subscribe_req_check: if subscribe_req_check:
lib_gatt.alert_status_char_obj.StopNotify() lib_gatt.alert_status_char_obj.StopNotify()
else: else:
# Check for success # Check for success
if not gatt_app_obj_check and lib_gatt.GATT_APP_OBJ:
print("GATT Data created")
gatt_app_obj_check = True
if not gatt_app_reg_check and GATT_APP_REGISTERED:
print("GATT Application registered")
gatt_app_reg_check = True
if not adv_data_check and lib_gap.ADV_OBJ:
print("Advertising data created")
adv_data_check = True
if not adv_reg_check and ADV_REGISTERED and ADV_ACTIVE_INSTANCE:
print("Advertisement registered")
adv_reg_check = True
if not read_req_check and lib_gatt.CHAR_READ: if not read_req_check and lib_gatt.CHAR_READ:
read_req_check = True read_req_check = True
if not write_req_check and lib_gatt.CHAR_WRITE: if not write_req_check and lib_gatt.CHAR_WRITE:
@ -704,10 +764,10 @@ class BLE_Bluez_Client:
if read_req_check and write_req_check and subscribe_req_check: if read_req_check and write_req_check and subscribe_req_check:
# all checks passed # all checks passed
# Blecent Test passed # Blecent Test passed
TEST_CHECKS_PASS = True test_checks_pass = True
lib_gatt.alert_status_char_obj.StopNotify() lib_gatt.alert_status_char_obj.StopNotify()
if (blecent_retry_check_cnt == 10 or TEST_CHECKS_PASS): if (blecent_retry_check_cnt == 10 or test_checks_pass):
if event_loop.is_running(): if event_loop.is_running():
event_loop.quit() event_loop.quit()
return False # polling for checks will stop return False # polling for checks will stop
@ -723,7 +783,7 @@ class BLE_Bluez_Client:
Verify cleanup is successful Verify cleanup is successful
""" """
global blecent_retry_check_cnt, gatt_app_obj_check, gatt_app_reg_check,\ global blecent_retry_check_cnt, gatt_app_obj_check, gatt_app_reg_check,\
adv_data_check, adv_reg_check, ADV_STOP adv_data_check, adv_reg_check, adv_stop
if blecent_retry_check_cnt == 0: if blecent_retry_check_cnt == 0:
gatt_app_obj_check = False gatt_app_obj_check = False
@ -744,24 +804,24 @@ class BLE_Bluez_Client:
print("Warning: Advertisement could not be unregistered") print("Warning: Advertisement could not be unregistered")
# Blecent Test failed # Blecent Test failed
ADV_STOP = False adv_stop = False
else: else:
# Check for success # Check for success
if not gatt_app_obj_check and not lib_gatt.GATT_APP_OBJ: if not gatt_app_obj_check and not lib_gatt.GATT_APP_OBJ:
print("GATT Data removed") print("GATT Data removed")
gatt_app_obj_check = True gatt_app_obj_check = True
if not gatt_app_reg_check and not GATT_APP_REGISTERED: if not gatt_app_reg_check and not gatt_app_registered:
print("GATT Application unregistered") print("GATT Application unregistered")
gatt_app_reg_check = True gatt_app_reg_check = True
if not adv_data_check and not adv_reg_check and not (ADV_REGISTERED or ADV_ACTIVE_INSTANCE or lib_gap.ADV_OBJ): if not adv_data_check and not adv_reg_check and not (adv_registered or adv_active_instance or lib_gap.ADV_OBJ):
print("Advertising data removed") print("Advertising data removed")
print("Advertisement unregistered") print("Advertisement unregistered")
adv_data_check = True adv_data_check = True
adv_reg_check = True adv_reg_check = True
# all checks passed # all checks passed
ADV_STOP = True adv_stop = True
if (blecent_retry_check_cnt == 10 or ADV_STOP): if (blecent_retry_check_cnt == 10 or adv_stop):
if event_loop.is_running(): if event_loop.is_running():
event_loop.quit() event_loop.quit()
return False # polling for checks will stop return False # polling for checks will stop
@ -783,7 +843,7 @@ class BLE_Bluez_Client:
Adapter is powered off Adapter is powered off
''' '''
try: try:
global blecent_retry_check_cnt, DISCOVERY_START, verify_signal_check, SIGNAL_CAUGHT global blecent_retry_check_cnt, discovery_start, verify_signal_check, signal_caught
blecent_retry_check_cnt = 0 blecent_retry_check_cnt = 0
verify_signal_check = 0 verify_signal_check = 0
@ -791,14 +851,14 @@ class BLE_Bluez_Client:
self.props_iface_obj.connect_to_signal('PropertiesChanged', props_change_handler) self.props_iface_obj.connect_to_signal('PropertiesChanged', props_change_handler)
if ADV_REGISTERED: if adv_registered:
# Unregister Advertisement # Unregister Advertisement
le_adv_manager_iface_obj.UnregisterAdvertisement(le_adv_obj.get_path()) le_adv_manager_iface_obj.UnregisterAdvertisement(le_adv_obj.get_path())
# Remove Advertising data # Remove Advertising data
dbus.service.Object.remove_from_connection(le_adv_obj) dbus.service.Object.remove_from_connection(le_adv_obj)
if GATT_APP_REGISTERED: if gatt_app_registered:
# Unregister GATT Application # Unregister GATT Application
gatt_manager_iface_obj.UnregisterApplication(gatt_app_obj.get_path()) gatt_manager_iface_obj.UnregisterApplication(gatt_app_obj.get_path())
@ -808,10 +868,10 @@ class BLE_Bluez_Client:
GLib.timeout_add_seconds(5, self.verify_blecent_disconnect) GLib.timeout_add_seconds(5, self.verify_blecent_disconnect)
event_loop.run() event_loop.run()
if ADV_STOP: if adv_stop:
print("Stop Advertising status: ", ADV_STOP) print("Stop Advertising status: ", adv_stop)
else: else:
print("Warning: Stop Advertising status: ", ADV_STOP) print("Warning: Stop Advertising status: ", adv_stop)
# Disconnect device # Disconnect device
if self.device: if self.device:
@ -820,16 +880,16 @@ class BLE_Bluez_Client:
if self.adapter: if self.adapter:
self.adapter.RemoveDevice(self.device) self.adapter.RemoveDevice(self.device)
self.device = None self.device = None
if DISCOVERY_START: if discovery_start:
self.adapter.StopDiscovery() self.adapter.StopDiscovery()
DISCOVERY_START = False discovery_start = False
time.sleep(15) time.sleep(15)
SIGNAL_CAUGHT = False signal_caught = False
GLib.timeout_add_seconds(5, verify_signal_is_caught) GLib.timeout_add_seconds(5, verify_signal_is_caught)
event_loop.run() event_loop.run()
if not DEVICE_CONNECTED: if not device_connected:
print("device disconnected") print("device disconnected")
else: else:
print("Warning: device could not be disconnected") print("Warning: device could not be disconnected")