import os
import sys
import re
-from threading import Thread
+import threading
+import traceback
+import Queue
import subprocess
try:
# > export TEST_FW_PATH=~/esp/esp-idf/tools/tiny-test-fw
-def blehr_client_task(dut_addr, dut):
+def blehr_client_task(hr_obj, dut_addr):
interface = 'hci0'
ble_devname = 'blehr_sensor_1.0'
hr_srv_uuid = '180d'
# Connect BLE Device
is_connected = ble_client_obj.connect()
if not is_connected:
- Utility.console_log("Connection to device ", ble_devname, "failed !!")
# Call disconnect to perform cleanup operations before exiting application
ble_client_obj.disconnect()
- return
+ raise RuntimeError("Connection to device " + str(ble_devname) + " failed !!")
# Read Services
services_ret = ble_client_obj.get_services()
if services_ret:
- print("\nServices\n")
- print(services_ret)
+ Utility.console_log("\nServices\n")
+ Utility.console_log(str(services_ret))
else:
- print("Failure: Read Services failed")
ble_client_obj.disconnect()
- return
+ raise RuntimeError("Failure: Read Services failed")
'''
Blehr application run:
'''
blehr_ret = ble_client_obj.hr_update_simulation(hr_srv_uuid, hr_char_uuid)
if blehr_ret:
- print("Success: blehr example test passed")
+ Utility.console_log("Success: blehr example test passed")
else:
- print("Failure: blehr example test failed")
+ raise RuntimeError("Failure: blehr example test failed")
# Call disconnect to perform cleanup operations before exiting application
ble_client_obj.disconnect()
+class BleHRThread(threading.Thread):
+ def __init__(self, dut_addr, exceptions_queue):
+ threading.Thread.__init__(self)
+ self.dut_addr = dut_addr
+ self.exceptions_queue = exceptions_queue
+
+ def run(self):
+ try:
+ blehr_client_task(self, self.dut_addr)
+ except Exception:
+ self.exceptions_queue.put(traceback.format_exc(), block=False)
+
+
@IDF.idf_example_test(env_tag="Example_WIFI_BT")
def test_example_app_ble_hr(env, extra_data):
"""
4. Updated value is retrieved
5. Stop Notifications
"""
- try:
- # Acquire DUT
- dut = env.get_dut("blehr", "examples/bluetooth/nimble/blehr")
-
- # Get binary file
- binary_file = os.path.join(dut.app.binary_path, "blehr.bin")
- bin_size = os.path.getsize(binary_file)
- IDF.log_performance("blehr_bin_size", "{}KB".format(bin_size // 1024))
- IDF.check_performance("blehr_bin_size", bin_size // 1024)
-
- # Upload binary and start testing
- Utility.console_log("Starting blehr simple example test app")
- dut.start_app()
-
- subprocess.check_output(['rm','-rf','/var/lib/bluetooth/*'])
-
- # Get device address from dut
- dut_addr = dut.expect(re.compile(r"Device Address: ([a-fA-F0-9:]+)"), timeout=30)[0]
-
- # Starting a py-client in a separate thread
- thread1 = Thread(target=blehr_client_task, args=(dut_addr,dut,))
- thread1.start()
- thread1.join()
-
- # Check dut responses
- dut.expect("subscribe event; cur_notify=1", timeout=30)
- dut.expect("GATT procedure initiated: notify;", timeout=30)
- dut.expect("subscribe event; cur_notify=0", timeout=30)
- dut.expect("disconnect;", timeout=30)
-
- except Exception as e:
- sys.exit(e)
+ subprocess.check_output(['rm','-rf','/var/lib/bluetooth/*'])
+ subprocess.check_output(['hciconfig','hci0','reset'])
+
+ # Acquire DUT
+ dut = env.get_dut("blehr", "examples/bluetooth/nimble/blehr")
+
+ # Get binary file
+ binary_file = os.path.join(dut.app.binary_path, "blehr.bin")
+ bin_size = os.path.getsize(binary_file)
+ IDF.log_performance("blehr_bin_size", "{}KB".format(bin_size // 1024))
+ IDF.check_performance("blehr_bin_size", bin_size // 1024)
+
+ # Upload binary and start testing
+ Utility.console_log("Starting blehr simple example test app")
+ dut.start_app()
+ dut.reset()
+
+ # Get device address from dut
+ dut_addr = dut.expect(re.compile(r"Device Address: ([a-fA-F0-9:]+)"), timeout=30)[0]
+ exceptions_queue = Queue.Queue()
+ # Starting a py-client in a separate thread
+ blehr_thread_obj = BleHRThread(dut_addr, exceptions_queue)
+ blehr_thread_obj.start()
+ blehr_thread_obj.join()
+
+ exception_msg = None
+ while True:
+ try:
+ exception_msg = exceptions_queue.get(block=False)
+ except Queue.Empty:
+ break
+ else:
+ Utility.console_log("\n" + exception_msg)
+
+ if exception_msg:
+ raise Exception("Thread did not run successfully")
+
+ # Check dut responses
+ dut.expect("subscribe event; cur_notify=1", timeout=30)
+ dut.expect("subscribe event; cur_notify=0", timeout=30)
+ dut.expect("disconnect;", timeout=30)
if __name__ == '__main__':
import os
import sys
import re
-from threading import Thread
+import Queue
+import traceback
+import threading
import subprocess
try:
# > export TEST_FW_PATH=~/esp/esp-idf/tools/tiny-test-fw
-def bleprph_client_task(dut_addr, dut):
+def bleprph_client_task(prph_obj, dut, dut_addr):
interface = 'hci0'
ble_devname = 'nimble-bleprph'
srv_uuid = '2f12'
# Connect BLE Device
is_connected = ble_client_obj.connect()
if not is_connected:
- Utility.console_log("Connection to device ", ble_devname, "failed !!")
# Call disconnect to perform cleanup operations before exiting application
ble_client_obj.disconnect()
- return
+ raise RuntimeError("Connection to device " + ble_devname + " failed !!")
# Check dut responses
dut.expect("GAP procedure initiated: advertise;", timeout=30)
# Read Services
services_ret = ble_client_obj.get_services(srv_uuid)
if services_ret:
- print("\nServices\n")
- print(services_ret)
+ Utility.console_log("\nServices\n")
+ Utility.console_log(str(services_ret))
else:
- print("Failure: Read Services failed")
ble_client_obj.disconnect()
- return
+ raise RuntimeError("Failure: Read Services failed")
# Read Characteristics
chars_ret = {}
if chars_ret:
Utility.console_log("\nCharacteristics retrieved")
for path, props in chars_ret.items():
- print("\n\tCharacteristic: ", path)
- print("\tCharacteristic UUID: ", props[2])
- print("\tValue: ", props[0])
- print("\tProperties: : ", props[1])
+ Utility.console_log("\n\tCharacteristic: " + str(path))
+ Utility.console_log("\tCharacteristic UUID: " + str(props[2]))
+ Utility.console_log("\tValue: " + str(props[0]))
+ Utility.console_log("\tProperties: : " + str(props[1]))
else:
- print("Failure: Read Characteristics failed")
ble_client_obj.disconnect()
- return
+ raise RuntimeError("Failure: Read Characteristics failed")
'''
Write Characteristics
if chars_ret_on_write:
Utility.console_log("\nCharacteristics after write operation")
for path, props in chars_ret_on_write.items():
- print("\n\tCharacteristic:", path)
- print("\tCharacteristic UUID: ", props[2])
- print("\tValue:", props[0])
- print("\tProperties: : ", props[1])
+ Utility.console_log("\n\tCharacteristic:" + str(path))
+ Utility.console_log("\tCharacteristic UUID: " + str(props[2]))
+ Utility.console_log("\tValue:" + str(props[0]))
+ Utility.console_log("\tProperties: : " + str(props[1]))
else:
- print("Failure: Write Characteristics failed")
ble_client_obj.disconnect()
- return
+ raise RuntimeError("Failure: Write Characteristics failed")
# Call disconnect to perform cleanup operations before exiting application
ble_client_obj.disconnect()
+class BlePrphThread(threading.Thread):
+ def __init__(self, dut, dut_addr, exceptions_queue):
+ threading.Thread.__init__(self)
+ self.dut = dut
+ self.dut_addr = dut_addr
+ self.exceptions_queue = exceptions_queue
+
+ def run(self):
+ try:
+ bleprph_client_task(self, self.dut, self.dut_addr)
+ except Exception:
+ self.exceptions_queue.put(traceback.format_exc(), block=False)
+
+
@IDF.idf_example_test(env_tag="Example_WIFI_BT")
def test_example_app_ble_peripheral(env, extra_data):
"""
4. Read Characteristics
5. Write Characteristics
"""
- try:
-
- # Acquire DUT
- dut = env.get_dut("bleprph", "examples/bluetooth/nimble/bleprph")
-
- # Get binary file
- binary_file = os.path.join(dut.app.binary_path, "bleprph.bin")
- bin_size = os.path.getsize(binary_file)
- IDF.log_performance("bleprph_bin_size", "{}KB".format(bin_size // 1024))
- IDF.check_performance("bleprph_bin_size", bin_size // 1024)
+ subprocess.check_output(['rm','-rf','/var/lib/bluetooth/*'])
+ subprocess.check_output(['hciconfig','hci0','reset'])
+
+ # Acquire DUT
+ dut = env.get_dut("bleprph", "examples/bluetooth/nimble/bleprph")
+
+ # Get binary file
+ binary_file = os.path.join(dut.app.binary_path, "bleprph.bin")
+ bin_size = os.path.getsize(binary_file)
+ IDF.log_performance("bleprph_bin_size", "{}KB".format(bin_size // 1024))
+ IDF.check_performance("bleprph_bin_size", bin_size // 1024)
+
+ # Upload binary and start testing
+ Utility.console_log("Starting bleprph simple example test app")
+ dut.start_app()
+ dut.reset()
+
+ # Get device address from dut
+ dut_addr = dut.expect(re.compile(r"Device Address: ([a-fA-F0-9:]+)"), timeout=30)[0]
+
+ exceptions_queue = Queue.Queue()
+ # Starting a py-client in a separate thread
+ bleprph_thread_obj = BlePrphThread(dut, dut_addr, exceptions_queue)
+ bleprph_thread_obj.start()
+ bleprph_thread_obj.join()
+
+ exception_msg = None
+ while True:
+ try:
+ exception_msg = exceptions_queue.get(block=False)
+ except Queue.Empty:
+ break
+ else:
+ Utility.console_log("\n" + exception_msg)
+
+ if exception_msg:
+ raise Exception("Thread did not run successfully")
- # Upload binary and start testing
- Utility.console_log("Starting bleprph simple example test app")
- dut.start_app()
-
- subprocess.check_output(['rm','-rf','/var/lib/bluetooth/*'])
-
- # Get device address from dut
- dut_addr = dut.expect(re.compile(r"Device Address: ([a-fA-F0-9:]+)"), timeout=30)[0]
-
- # Starting a py-client in a separate thread
- thread1 = Thread(target=bleprph_client_task, args=(dut_addr,dut,))
- thread1.start()
- thread1.join()
-
- # Check dut responses
- dut.expect("connection established; status=0", timeout=30)
- dut.expect("disconnect;", timeout=30)
- except Exception as e:
- sys.exit(e)
+ # Check dut responses
+ dut.expect("connection established; status=0", timeout=30)
+ dut.expect("disconnect;", timeout=30)
if __name__ == '__main__':
from __future__ import print_function
import sys
import time
+import traceback
try:
from future.moves.itertools import zip_longest
srv_added_old_cnt = 0
srv_added_new_cnt = 0
+verify_signal_check = 0
blecent_retry_check_cnt = 0
verify_service_cnt = 0
verify_readchars_cnt = 0
blecent_adv_uuid = '1811'
-iface_added = False
gatt_app_obj_check = False
gatt_app_reg_check = False
adv_data_check = False
ble_hr_chrc = False
DISCOVERY_START = False
+SIGNAL_CAUGHT = False
TEST_CHECKS_PASS = False
ADV_STOP = False
event_loop = GLib.MainLoop()
+def verify_signal_is_caught():
+ global verify_signal_check
+ verify_signal_check += 1
+
+ if (not SIGNAL_CAUGHT and verify_signal_check == 15) or (SIGNAL_CAUGHT):
+ if event_loop.is_running():
+ event_loop.quit()
+ return False # polling for checks will stop
+
+ return True # polling will continue
+
+
def set_props_status(props):
"""
Set Adapter status if it is powered on or off
"""
global ADAPTER_ON, SERVICES_RESOLVED, GATT_OBJ_REMOVED, GATT_APP_REGISTERED, \
- ADV_REGISTERED, ADV_ACTIVE_INSTANCE, DEVICE_CONNECTED, CHRC_VALUE, CHRC_VALUE_CNT
+ ADV_REGISTERED, ADV_ACTIVE_INSTANCE, DEVICE_CONNECTED, CHRC_VALUE, CHRC_VALUE_CNT, \
+ SIGNAL_CAUGHT
is_service_uuid = False
# Signal caught for change in Adapter Powered property
if 'Powered' in props:
if props['Powered'] == 1:
+ SIGNAL_CAUGHT = True
ADAPTER_ON = True
else:
+ SIGNAL_CAUGHT = True
ADAPTER_ON = False
- event_loop.quit()
- elif 'ServicesResolved' in props:
+ if 'ServicesResolved' in props:
if props['ServicesResolved'] == 1:
+ SIGNAL_CAUGHT = True
SERVICES_RESOLVED = True
else:
+ SIGNAL_CAUGHT = True
SERVICES_RESOLVED = False
- elif 'UUIDs' in props:
+ if 'UUIDs' in props:
# Signal caught for add/remove GATT data having service uuid
for uuid in props['UUIDs']:
if blecent_adv_uuid in uuid:
# and for unregistering GATT application
GATT_APP_REGISTERED = False
lib_gatt.GATT_APP_OBJ = False
- elif 'ActiveInstances' in props:
+ if 'ActiveInstances' in props:
# Signal caught for Advertising - add/remove Instances property
if props['ActiveInstances'] == 1:
ADV_ACTIVE_INSTANCE = True
ADV_ACTIVE_INSTANCE = False
ADV_REGISTERED = False
lib_gap.ADV_OBJ = False
- elif 'Connected' in props:
+ if 'Connected' in props:
# Signal caught for device connect/disconnect
- if props['Connected'] is True:
+ if props['Connected'] == 1:
+ SIGNAL_CAUGHT = True
DEVICE_CONNECTED = True
- event_loop.quit()
else:
+ SIGNAL_CAUGHT = True
DEVICE_CONNECTED = False
- elif 'Value' in props:
+ if 'Value' in props:
# Signal caught for change in chars value
if ble_hr_chrc:
CHRC_VALUE_CNT += 1
print(props['Value'])
if CHRC_VALUE_CNT == 10:
- event_loop.quit()
+ SIGNAL_CAUGHT = True
+ return False
+
+ return False
def props_change_handler(iface, changed_props, invalidated):
Discover Bluetooth Adapter
Power On Bluetooth Adapter
'''
+ global verify_signal_check, SIGNAL_CAUGHT, ADAPTER_ON
+ verify_signal_check = 0
+ ADAPTER_ON = False
try:
print("discovering adapter...")
for path, interfaces in self.ble_objs.items():
break
if self.adapter is None:
- raise RuntimeError("\nError: bluetooth adapter not found")
+ raise Exception("Bluetooth adapter not found")
if self.props_iface_obj is None:
- raise RuntimeError("\nError: properties interface not found")
+ raise Exception("Properties interface not found")
print("bluetooth adapter discovered")
- self.props_iface_obj.connect_to_signal('PropertiesChanged', props_change_handler)
-
# Check if adapter is already powered on
if ADAPTER_ON:
- print("bluetooth adapter is already on")
+ print("Adapter already powered on")
return True
# Power On Adapter
print("powering on adapter...")
+ self.props_iface_obj.connect_to_signal('PropertiesChanged', props_change_handler)
self.props_iface_obj.Set(ADAPTER_IFACE, "Powered", dbus.Boolean(1))
+ SIGNAL_CAUGHT = False
+ GLib.timeout_add_seconds(5, verify_signal_is_caught)
event_loop.run()
if ADAPTER_ON:
print("bluetooth adapter powered on")
return True
else:
- print("Failure: bluetooth adapter not powered on")
- return False
+ raise Exception("Failure: bluetooth adapter not powered on")
- except Exception as e:
- print(e)
- sys.exit(1)
+ except Exception:
+ print(traceback.format_exc())
+ return False
def connect(self):
'''
Connect to the device discovered
Retry 10 times to discover and connect to device
'''
- global DISCOVERY_START
-
+ global DISCOVERY_START, SIGNAL_CAUGHT, DEVICE_CONNECTED, verify_signal_check
device_found = False
+ DEVICE_CONNECTED = False
try:
self.adapter.StartDiscovery()
print("\nStarted Discovery")
DISCOVERY_START = True
for retry_cnt in range(10,0,-1):
+ verify_signal_check = 0
try:
if self.device is None:
print("\nConnecting to device...")
device_found = self.get_device()
if device_found:
self.device.Connect(dbus_interface=DEVICE_IFACE)
- event_loop.quit()
- print("\nConnected to device")
- return True
+ time.sleep(15)
+ SIGNAL_CAUGHT = False
+ GLib.timeout_add_seconds(5, verify_signal_is_caught)
+ event_loop.run()
+ if DEVICE_CONNECTED:
+ print("\nConnected to device")
+ return True
+ else:
+ raise Exception
except Exception as e:
print(e)
print("\nRetries left", retry_cnt - 1)
# Device not found
return False
- except Exception as e:
- print(e)
+ except Exception:
+ print(traceback.format_exc())
self.device = None
return False
break
if dev_path is None:
- print("\nBLE device not found")
- return False
+ raise Exception("\nBLE device not found")
device_props_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, dev_path), DBUS_PROP_IFACE)
device_props_iface_obj.connect_to_signal('PropertiesChanged', props_change_handler)
if path not in self.services:
self.services.append(path)
- def verify_get_services(self):
- global SERVICE_SCAN_FAIL, verify_service_cnt
- verify_service_cnt += 1
- if iface_added and self.services and SERVICES_RESOLVED:
- event_loop.quit()
-
- if verify_service_cnt == 10:
- event_loop.quit()
-
- def verify_service_uuid_found(self):
+ def verify_service_uuid_found(self, service_uuid):
'''
Verify service uuid found
'''
if srv_uuid_found:
SERVICE_UUID_FOUND = True
- def get_services(self, srv_uuid=None):
+ def get_services(self, service_uuid=None):
'''
Retrieve Services found in the device connected
'''
- global service_uuid, iface_added, SERVICE_UUID_FOUND
- service_uuid = srv_uuid
- iface_added = False
+ global SIGNAL_CAUGHT, SERVICE_UUID_FOUND, SERVICES_RESOLVED, verify_signal_check
+ verify_signal_check = 0
SERVICE_UUID_FOUND = False
+ SERVICES_RESOLVED = False
+ SIGNAL_CAUGHT = False
+
try:
om_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, "/"), DBUS_OM_IFACE)
self.ble_objs = om_iface_obj.GetManagedObjects()
-
for path, interfaces in self.ble_objs.items():
self.srvc_iface_added_handler(path, interfaces)
# If services not found, then they may not have been added yet on dbus
if not self.services:
- iface_added = True
- GLib.timeout_add_seconds(2, self.verify_get_services)
+ GLib.timeout_add_seconds(5, verify_signal_is_caught)
om_iface_obj.connect_to_signal('InterfacesAdded', self.srvc_iface_added_handler)
event_loop.run()
+ if not SERVICES_RESOLVED:
+ raise Exception("Services not found...")
+
if service_uuid:
- self.verify_service_uuid_found()
+ self.verify_service_uuid_found(service_uuid)
if not SERVICE_UUID_FOUND:
- raise Exception("Service with uuid: %s not found !!!" % service_uuid)
+ raise Exception("Service with uuid: %s not found..." % service_uuid)
+
+ # Services found
return self.srv_uuid
- except Exception as e:
- print("Error: ", e)
+ except Exception:
+ print(traceback.format_exc())
return False
def chrc_iface_added_handler(self, path, interfaces):
'''
Add services found as lib_ble_client obj
'''
- global chrc, chrc_discovered
+ global chrc, chrc_discovered, SIGNAL_CAUGHT
chrc_val = None
if self.device and path.startswith(self.device.object_path):
chrc_val = chrc.ReadValue({}, dbus_interface=GATT_CHRC_IFACE)
uuid = chrc_props['UUID']
self.chars[path] = chrc_val, chrc_flags, uuid
-
- def verify_get_chars(self):
- global verify_readchars_cnt
- verify_readchars_cnt += 1
- if iface_added and self.chars:
- event_loop.quit()
- if verify_readchars_cnt == 10:
- event_loop.quit()
+ SIGNAL_CAUGHT = True
def read_chars(self):
'''
Read characteristics found in the device connected
'''
- global iface_added, chrc_discovered
+ global iface_added, chrc_discovered, SIGNAL_CAUGHT, verify_signal_check
+ verify_signal_check = 0
chrc_discovered = False
iface_added = False
+ SIGNAL_CAUGHT = False
try:
om_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, "/"), DBUS_OM_IFACE)
self.chrc_iface_added_handler(path, interfaces)
# If chars not found, then they have not been added yet to interface
+ time.sleep(15)
if not self.chars:
iface_added = True
- GLib.timeout_add_seconds(2, self.verify_get_chars)
+ GLib.timeout_add_seconds(5, verify_signal_is_caught)
om_iface_obj.connect_to_signal('InterfacesAdded', self.chars_iface_added_handler)
event_loop.run()
return self.chars
- except Exception as e:
- print("Error: ", e)
+ except Exception:
+ print(traceback.format_exc())
return False
def write_chars(self, write_val):
return False
self.chars[path] = chrc_val, props[1], props[2] # update value
if not char_write_props:
- print("Failure: Cannot perform write operation. Characteristic does not have write permission.")
- return False
+ raise Exception("Failure: Cannot perform write operation. Characteristic does not have write permission.")
return self.chars
- except Exception as e:
- print(e)
+ except Exception:
+ print(traceback.format_exc())
return False
def hr_update_simulation(self, hr_srv_uuid, hr_char_uuid):
Retrieve updated value
Stop Notifications
'''
- global ble_hr_chrc
+ global ble_hr_chrc, verify_signal_check, SIGNAL_CAUGHT, CHRC_VALUE_CNT
srv_path = None
chrc = None
chrc_path = None
chars_ret = None
ble_hr_chrc = True
+ CHRC_VALUE_CNT = 0
try:
# Get HR Measurement characteristic
break
if srv_path is None:
- print("Failure: HR UUID:", hr_srv_uuid, "not found")
- return False
+ raise Exception("Failure: HR UUID:", hr_srv_uuid, "not found")
chars_ret = self.read_chars()
if hr_char_uuid in props[2]: # uuid
break
if chrc is None:
- print("Failure: Characteristics for service: ", srv_path, "not found")
- return False
+ raise Exception("Failure: Characteristics for service: ", srv_path, "not found")
+
# Subscribe to notifications
print("\nSubscribe to notifications: On")
chrc.StartNotify(dbus_interface=GATT_CHRC_IFACE)
chrc_props_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, chrc_path), DBUS_PROP_IFACE)
chrc_props_iface_obj.connect_to_signal('PropertiesChanged', props_change_handler)
+ SIGNAL_CAUGHT = False
+ verify_signal_check = 0
+ GLib.timeout_add_seconds(5, verify_signal_is_caught)
event_loop.run()
chrc.StopNotify(dbus_interface=GATT_CHRC_IFACE)
time.sleep(2)
ble_hr_chrc = False
return True
- except Exception as e:
- print(e)
+ except Exception:
+ print(traceback.format_exc())
return False
def create_gatt_app(self):
Create GATT data
Register GATT Application
'''
- global gatt_app_obj, gatt_manager_iface_obj
+ global gatt_app_obj, gatt_manager_iface_obj, GATT_APP_REGISTERED
gatt_app_obj = None
gatt_manager_iface_obj = None
+ GATT_APP_REGISTERED = False
+ lib_gatt.GATT_APP_OBJ = False
try:
gatt_app_obj = lib_gatt.Application(self.bus, self.adapter_path[0])
reply_handler=self.gatt_app_handler,
error_handler=self.gatt_app_error_handler)
return True
- except Exception as e:
- print(e)
+ except Exception:
+ print(traceback.format_exc())
return False
def gatt_app_handler(self):
Register Advertisement
Start Advertising
'''
- global le_adv_obj, le_adv_manager_iface_obj
+ global le_adv_obj, le_adv_manager_iface_obj, ADV_ACTIVE_INSTANCE, ADV_REGISTERED
+
le_adv_obj = None
le_adv_manager_iface_obj = None
le_adv_iface_path = None
+ ADV_ACTIVE_INSTANCE = False
+ ADV_REGISTERED = False
+ lib_gap.ADV_OBJ = False
try:
print("Advertising started")
gatt_app_ret = self.create_gatt_app()
if not gatt_app_ret:
- return False
+ raise Exception
for path,interface in self.ble_objs.items():
if LE_ADVERTISING_MANAGER_IFACE in interface:
le_adv_iface_path = path
if le_adv_iface_path is None:
- print('\n Cannot start advertising. LEAdvertisingManager1 Interface not found')
- return False
+ raise Exception('\n Cannot start advertising. LEAdvertisingManager1 Interface not found')
le_adv_obj = lib_gap.Advertisement(self.bus, adv_iface_index, adv_type, adv_uuid, adv_host_name)
le_adv_manager_iface_obj = dbus.Interface(self.bus.get_object(BLUEZ_SERVICE_NAME, le_adv_iface_path), LE_ADVERTISING_MANAGER_IFACE)
if TEST_CHECKS_PASS:
return True
else:
- return False
+ raise Exception
- except Exception as e:
- print("in Exception")
- print(e)
+ except Exception:
+ print(traceback.format_exc())
return False
def adv_handler(self):
TEST_CHECKS_PASS = False
if subscribe_req_check:
lib_gatt.alert_status_char_obj.StopNotify()
- event_loop.quit()
- return False # polling for checks will stop
-
- # Check for success
- if not gatt_app_obj_check and lib_gatt.GATT_APP_OBJ:
- print("GATT Data created")
- gatt_app_obj_check = True
- if not gatt_app_reg_check and GATT_APP_REGISTERED:
- print("GATT Application registered")
- gatt_app_reg_check = True
- if not adv_data_check and lib_gap.ADV_OBJ:
- print("Advertising data created")
- adv_data_check = True
- if not adv_reg_check and ADV_REGISTERED and ADV_ACTIVE_INSTANCE:
- print("Advertisement registered")
- adv_reg_check = True
- if not read_req_check and lib_gatt.CHAR_READ:
- read_req_check = True
- if not write_req_check and lib_gatt.CHAR_WRITE:
- write_req_check = True
- if not subscribe_req_check and lib_gatt.CHAR_SUBSCRIBE:
- subscribe_req_check = True
+ else:
+ # Check for success
+ if not gatt_app_obj_check and lib_gatt.GATT_APP_OBJ:
+ print("GATT Data created")
+ gatt_app_obj_check = True
+ if not gatt_app_reg_check and GATT_APP_REGISTERED:
+ print("GATT Application registered")
+ gatt_app_reg_check = True
+ if not adv_data_check and lib_gap.ADV_OBJ:
+ print("Advertising data created")
+ adv_data_check = True
+ if not adv_reg_check and ADV_REGISTERED and ADV_ACTIVE_INSTANCE:
+ print("Advertisement registered")
+ adv_reg_check = True
+ if not read_req_check and lib_gatt.CHAR_READ:
+ read_req_check = True
+ if not write_req_check and lib_gatt.CHAR_WRITE:
+ write_req_check = True
+ if not subscribe_req_check and lib_gatt.CHAR_SUBSCRIBE:
+ subscribe_req_check = True
- # Increment retry count
- blecent_retry_check_cnt += 1
if read_req_check and write_req_check and subscribe_req_check:
# all checks passed
# Blecent Test passed
TEST_CHECKS_PASS = True
lib_gatt.alert_status_char_obj.StopNotify()
- event_loop.quit()
+
+ if (blecent_retry_check_cnt == 10 or TEST_CHECKS_PASS):
+ if event_loop.is_running():
+ event_loop.quit()
return False # polling for checks will stop
+ # Increment retry count
+ blecent_retry_check_cnt += 1
+
# Default return True - polling for checks will continue
return True
# Blecent Test failed
ADV_STOP = False
- event_loop.quit()
+ else:
+ # Check for success
+ if not gatt_app_obj_check and not lib_gatt.GATT_APP_OBJ:
+ print("GATT Data removed")
+ gatt_app_obj_check = True
+ if not gatt_app_reg_check and not GATT_APP_REGISTERED:
+ print("GATT Application unregistered")
+ gatt_app_reg_check = True
+ if not adv_data_check and not adv_reg_check and not (ADV_REGISTERED or ADV_ACTIVE_INSTANCE or lib_gap.ADV_OBJ):
+ print("Advertising data removed")
+ print("Advertisement unregistered")
+ adv_data_check = True
+ adv_reg_check = True
+ # all checks passed
+ ADV_STOP = True
+
+ if (blecent_retry_check_cnt == 10 or ADV_STOP):
+ if event_loop.is_running():
+ event_loop.quit()
return False # polling for checks will stop
- # Check for success
- if not gatt_app_obj_check and not lib_gatt.GATT_APP_OBJ:
- print("GATT Data removed")
- gatt_app_obj_check = True
- if not gatt_app_reg_check and not GATT_APP_REGISTERED:
- print("GATT Application unregistered")
- gatt_app_reg_check = True
- if not adv_data_check and not adv_reg_check and not (ADV_REGISTERED or ADV_ACTIVE_INSTANCE or lib_gap.ADV_OBJ):
- print("Advertising data removed")
- print("Advertisement unregistered")
- adv_data_check = True
- adv_reg_check = True
-
# Increment retry count
blecent_retry_check_cnt += 1
- if adv_reg_check:
- # all checks passed
- ADV_STOP = True
- event_loop.quit()
- return False # polling for checks will stop
# Default return True - polling for checks will continue
return True
Adapter is powered off
'''
try:
- global blecent_retry_check_cnt, DISCOVERY_START
+ global blecent_retry_check_cnt, DISCOVERY_START, verify_signal_check, SIGNAL_CAUGHT
blecent_retry_check_cnt = 0
+ verify_signal_check = 0
print("\nexiting from test...")
+ self.props_iface_obj.connect_to_signal('PropertiesChanged', props_change_handler)
+
if ADV_REGISTERED:
# Unregister Advertisement
le_adv_manager_iface_obj.UnregisterAdvertisement(le_adv_obj.get_path())
# Remove GATT data
dbus.service.Object.remove_from_connection(gatt_app_obj)
- GLib.timeout_add_seconds(2, self.verify_blecent_disconnect)
+ GLib.timeout_add_seconds(5, self.verify_blecent_disconnect)
event_loop.run()
if ADV_STOP:
if DISCOVERY_START:
self.adapter.StopDiscovery()
DISCOVERY_START = False
+ time.sleep(15)
- # Power off Adapter
- self.props_iface_obj.Set(ADAPTER_IFACE, "Powered", dbus.Boolean(0))
+ SIGNAL_CAUGHT = False
+ GLib.timeout_add_seconds(5, verify_signal_is_caught)
event_loop.run()
if not DEVICE_CONNECTED:
else:
print("Warning: device could not be disconnected")
- print("powering off adapter...")
- if not ADAPTER_ON:
- print("bluetooth adapter powered off")
- else:
- print("\nWarning: Bluetooth adapter not powered off")
-
- except Exception as e:
- print(e)
+ except Exception:
+ print(traceback.format_exc())
return False