from __future__ import print_function
from builtins import input
+from future.utils import iteritems
import platform
# BLE client (Linux Only) using Bluez and DBus
class BLE_Bluez_Client:
- def connect(self, devname, iface, srv_uuid):
+ def connect(self, devname, iface, chrc_names, fallback_srv_uuid):
self.devname = devname
- self.srv_uuid = srv_uuid
+ self.srv_uuid_fallback = fallback_srv_uuid
+ self.chrc_names = [name.lower() for name in chrc_names]
self.device = None
self.adapter = None
self.adapter_props = None
self.services = None
+ self.nu_lookup = None
+ self.characteristics = dict()
+ self.srv_uuid_adv = None
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
manager = dbus.Interface(bus.get_object("org.bluez", "/"), "org.freedesktop.DBus.ObjectManager")
objects = manager.GetManagedObjects()
- for path, interfaces in objects.items():
+ for path, interfaces in iteritems(objects):
adapter = interfaces.get("org.bluez.Adapter1")
if adapter is not None:
if path.endswith(iface):
manager = dbus.Interface(bus.get_object("org.bluez", "/"), "org.freedesktop.DBus.ObjectManager")
objects = manager.GetManagedObjects()
dev_path = None
- for path, interfaces in objects.items():
- if "org.bluez.Device1" not in interfaces.keys():
+ for path, interfaces in iteritems(objects):
+ if "org.bluez.Device1" not in interfaces:
continue
if interfaces["org.bluez.Device1"].get("Name") == self.devname:
dev_path = path
try:
self.device = bus.get_object("org.bluez", dev_path)
+ try:
+ uuids = self.device.Get('org.bluez.Device1', 'UUIDs',
+ dbus_interface='org.freedesktop.DBus.Properties')
+ # There should be 1 service UUID in advertising data
+ # If bluez had cached an old version of the advertisement data
+ # the list of uuids may be incorrect, in which case connection
+ # or service discovery may fail the first time. If that happens
+ # the cache will be refreshed before next retry
+ if len(uuids) == 1:
+ self.srv_uuid_adv = uuids[0]
+ except dbus.exceptions.DBusException as e:
+ print(e)
+
self.device.Connect(dbus_interface='org.bluez.Device1')
except Exception as e:
print(e)
bus = dbus.SystemBus()
manager = dbus.Interface(bus.get_object("org.bluez", "/"), "org.freedesktop.DBus.ObjectManager")
objects = manager.GetManagedObjects()
- srv_path = None
- for path, interfaces in objects.items():
- if "org.bluez.GattService1" not in interfaces.keys():
+ service_found = False
+ for srv_path, srv_interfaces in iteritems(objects):
+ if "org.bluez.GattService1" not in srv_interfaces:
+ continue
+ if not srv_path.startswith(self.device.object_path):
continue
- if path.startswith(self.device.object_path):
- service = bus.get_object("org.bluez", path)
- uuid = service.Get('org.bluez.GattService1', 'UUID',
+ service = bus.get_object("org.bluez", srv_path)
+ srv_uuid = service.Get('org.bluez.GattService1', 'UUID',
dbus_interface='org.freedesktop.DBus.Properties')
- if uuid == self.srv_uuid:
- srv_path = path
+
+ # If service UUID doesn't match the one found in advertisement data
+ # then also check if it matches the fallback UUID
+ if srv_uuid not in [self.srv_uuid_adv, self.srv_uuid_fallback]:
+ continue
+
+ nu_lookup = dict()
+ characteristics = dict()
+ for chrc_path, chrc_interfaces in iteritems(objects):
+ if "org.bluez.GattCharacteristic1" not in chrc_interfaces:
+ continue
+ if not chrc_path.startswith(service.object_path):
+ continue
+ chrc = bus.get_object("org.bluez", chrc_path)
+ uuid = chrc.Get('org.bluez.GattCharacteristic1', 'UUID',
+ dbus_interface='org.freedesktop.DBus.Properties')
+ characteristics[uuid] = chrc
+ for desc_path, desc_interfaces in iteritems(objects):
+ if "org.bluez.GattDescriptor1" not in desc_interfaces:
+ continue
+ if not desc_path.startswith(chrc.object_path):
+ continue
+ desc = bus.get_object("org.bluez", desc_path)
+ desc_uuid = desc.Get('org.bluez.GattDescriptor1', 'UUID',
+ dbus_interface='org.freedesktop.DBus.Properties')
+ if desc_uuid[4:8] != '2901':
+ continue
+ try:
+ readval = desc.ReadValue({}, dbus_interface='org.bluez.GattDescriptor1')
+ except dbus.exceptions.DBusException:
+ break
+ found_name = ''.join(chr(b) for b in readval).lower()
+ nu_lookup[found_name] = uuid
break
- if srv_path is None:
+ match_found = True
+ for name in self.chrc_names:
+ if name not in nu_lookup:
+ # Endpoint name not present
+ match_found = False
+ break
+
+ # Create lookup table only if all endpoint names found
+ self.nu_lookup = [None, nu_lookup][match_found]
+ self.characteristics = characteristics
+ service_found = True
+
+ # If the service UUID matches that in the advertisement
+ # we can stop the search now. If it doesn't match, we
+ # have found the service corresponding to the fallback
+ # UUID, in which case don't break and keep searching
+ # for the advertised service
+ if srv_uuid == self.srv_uuid_adv:
+ break
+
+ if not service_found:
self.device.Disconnect(dbus_interface='org.bluez.Device1')
+ if self.adapter:
+ self.adapter.RemoveDevice(self.device)
self.device = None
+ self.nu_lookup = None
+ self.characteristics = dict()
raise RuntimeError("Provisioning service not found")
- self.characteristics = dict()
- for path, interfaces in objects.items():
- if "org.bluez.GattCharacteristic1" not in interfaces.keys():
- continue
- if path.startswith(srv_path):
- chrc = bus.get_object("org.bluez", path)
- uuid = chrc.Get('org.bluez.GattCharacteristic1', 'UUID',
- dbus_interface='org.freedesktop.DBus.Properties')
- self.characteristics[uuid] = chrc
+ def get_nu_lookup(self):
+ return self.nu_lookup
def has_characteristic(self, uuid):
- if uuid in self.characteristics.keys():
+ if uuid in self.characteristics:
return True
return False
if self.adapter:
self.adapter.RemoveDevice(self.device)
self.device = None
+ self.nu_lookup = None
+ self.characteristics = dict()
if self.adapter_props:
self.adapter_props.Set("org.bluez.Adapter1", "Powered", dbus.Boolean(0))
# Console based BLE client for Cross Platform support
class BLE_Console_Client:
- def connect(self, devname, iface, srv_uuid):
+ def connect(self, devname, iface, chrc_names, fallback_srv_uuid):
print("BLE client is running in console mode")
print("\tThis could be due to your platform not being supported or dependencies not being met")
print("\tPlease ensure all pre-requisites are met to run the full fledged client")
if resp != 'Y' and resp != 'y':
return False
print("BLECLI >> List available attributes of the connected device")
- resp = input("BLECLI >> Is the service UUID '" + srv_uuid + "' listed among available attributes? [y/n] ")
+ resp = input("BLECLI >> Is the service UUID '" + fallback_srv_uuid + "' listed among available attributes? [y/n] ")
if resp != 'Y' and resp != 'y':
return False
return True
+ def get_nu_lookup(self):
+ return None
+
def has_characteristic(self, uuid):
resp = input("BLECLI >> Is the characteristic UUID '" + uuid + "' listed among available attributes? [y/n] ")
if resp != 'Y' and resp != 'y':
# Calculate characteristic UUID for each endpoint
nu_lookup[name] = service_uuid[:4] + '{:02x}'.format(
int(nu_lookup[name], 16) & int(service_uuid[4:8], 16)) + service_uuid[8:]
- self.name_uuid_lookup = nu_lookup
# Get BLE client module
self.cli = ble_cli.get_client()
# Use client to connect to BLE device and bind to service
- if not self.cli.connect(devname=devname, iface='hci0', srv_uuid=service_uuid):
+ if not self.cli.connect(devname=devname, iface='hci0',
+ chrc_names=nu_lookup.keys(),
+ fallback_srv_uuid=service_uuid):
raise RuntimeError("Failed to initialize transport")
- # Check if expected characteristics are provided by the service
- for name in self.name_uuid_lookup.keys():
- if not self.cli.has_characteristic(self.name_uuid_lookup[name]):
- raise RuntimeError("'" + name + "' endpoint not found")
+ # Irrespective of provided parameters, let the client
+ # generate a lookup table by reading advertisement data
+ # and characteristic user descriptors
+ self.name_uuid_lookup = self.cli.get_nu_lookup()
+
+ # If that doesn't work, use the lookup table provided as parameter
+ if self.name_uuid_lookup is None:
+ self.name_uuid_lookup = nu_lookup
+ # Check if expected characteristics are provided by the service
+ for name in self.name_uuid_lookup.keys():
+ if not self.cli.has_characteristic(self.name_uuid_lookup[name]):
+ raise RuntimeError("'" + name + "' endpoint not found")
def __del__(self):
# Make sure device is disconnected before application gets closed