3 # Copyright 2019 Espressif Systems (Shanghai) PTE LTD
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
18 # Creating GATT Application which then becomes available to remote devices.
20 from __future__ import print_function
26 except ImportError as e:
27 if 'linux' not in sys.platform:
28 sys.exit("Error: Only supported on Linux platform")
30 print("Install packages `libgirepository1.0-dev gir1.2-gtk-3.0 libcairo2-dev libdbus-1-dev libdbus-glib-1-dev` for resolving the issue")
31 print("Run `pip install -r $IDF_PATH/tools/ble/requirements.txt` for resolving the issue")
34 alert_status_char_obj = None
39 CHAR_SUBSCRIBE = False
41 DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
42 DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
43 GATT_MANAGER_IFACE = 'org.bluez.GattManager1'
44 GATT_SERVICE_IFACE = 'org.bluez.GattService1'
45 GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1'
46 GATT_DESC_IFACE = 'org.bluez.GattDescriptor1'
49 class InvalidArgsException(dbus.exceptions.DBusException):
50 _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs'
53 class NotSupportedException(dbus.exceptions.DBusException):
54 _dbus_error_name = 'org.bluez.Error.NotSupported'
57 class Application(dbus.service.Object):
59 org.bluez.GattApplication1 interface implementation
62 def __init__(self, bus, path):
65 srv_obj = AlertNotificationService(bus, '0001')
66 self.add_service(srv_obj)
67 dbus.service.Object.__init__(self, bus, self.path)
73 return dbus.ObjectPath(self.path)
75 def add_service(self, service):
76 self.services.append(service)
78 @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')
79 def GetManagedObjects(self):
83 for service in self.services:
84 response[service.get_path()] = service.get_properties()
85 chrcs = service.get_characteristics()
87 response[chrc.get_path()] = chrc.get_properties()
88 descs = chrc.get_descriptors()
90 response[desc.get_path()] = desc.get_properties()
99 class Service(dbus.service.Object):
101 org.bluez.GattService1 interface implementation
103 PATH_BASE = '/org/bluez/hci0/service'
105 def __init__(self, bus, index, uuid, primary=False):
106 self.path = self.PATH_BASE + str(index)
109 self.primary = primary
110 self.characteristics = []
111 dbus.service.Object.__init__(self, bus, self.path)
113 def get_properties(self):
115 GATT_SERVICE_IFACE: {
117 'Primary': self.primary,
118 'Characteristics': dbus.Array(
119 self.get_characteristic_paths(),
125 return dbus.ObjectPath(self.path)
127 def add_characteristic(self, characteristic):
128 self.characteristics.append(characteristic)
130 def get_characteristic_paths(self):
132 for chrc in self.characteristics:
133 result.append(chrc.get_path())
136 def get_characteristics(self):
137 return self.characteristics
139 @dbus.service.method(DBUS_PROP_IFACE,
141 out_signature='a{sv}')
142 def GetAll(self, interface):
143 if interface != GATT_SERVICE_IFACE:
144 raise InvalidArgsException()
145 return self.get_properties()[GATT_SERVICE_IFACE]
148 class Characteristic(dbus.service.Object):
150 org.bluez.GattCharacteristic1 interface implementation
152 def __init__(self, bus, index, uuid, flags, service):
153 self.path = service.path + '/char' + str(index)
156 self.service = service
159 self.descriptors = []
160 dbus.service.Object.__init__(self, bus, self.path)
162 def get_properties(self):
165 'Service': self.service.get_path(),
169 'Descriptors': dbus.Array(self.get_descriptor_paths(), signature='o')
175 return dbus.ObjectPath(self.path)
177 def add_descriptor(self, descriptor):
178 self.descriptors.append(descriptor)
180 def get_descriptor_paths(self):
182 for desc in self.descriptors:
183 result.append(desc.get_path())
186 def get_descriptors(self):
187 return self.descriptors
189 @dbus.service.method(DBUS_PROP_IFACE, in_signature='s', out_signature='a{sv}')
190 def GetAll(self, interface):
191 if interface != GATT_CHRC_IFACE:
192 raise InvalidArgsException()
194 return self.get_properties()[GATT_CHRC_IFACE]
196 @dbus.service.method(GATT_CHRC_IFACE, in_signature='a{sv}', out_signature='ay')
197 def ReadValue(self, options):
198 print('\nDefault ReadValue called, returning error')
199 raise NotSupportedException()
201 @dbus.service.method(GATT_CHRC_IFACE, in_signature='aya{sv}')
202 def WriteValue(self, value, options):
203 print('\nDefault WriteValue called, returning error')
204 raise NotSupportedException()
206 @dbus.service.method(GATT_CHRC_IFACE)
207 def StartNotify(self):
208 print('Default StartNotify called, returning error')
209 raise NotSupportedException()
211 @dbus.service.method(GATT_CHRC_IFACE)
212 def StopNotify(self):
213 print('Default StopNotify called, returning error')
214 raise NotSupportedException()
216 @dbus.service.signal(DBUS_PROP_IFACE,
217 signature='sa{sv}as')
218 def PropertiesChanged(self, interface, changed, invalidated):
219 print("\nProperties Changed")
222 class Descriptor(dbus.service.Object):
224 org.bluez.GattDescriptor1 interface implementation
226 def __init__(self, bus, index, uuid, flags, characteristic):
227 self.path = characteristic.path + '/desc' + str(index)
231 self.chrc = characteristic
232 dbus.service.Object.__init__(self, bus, self.path)
234 def get_properties(self):
237 'Characteristic': self.chrc.get_path(),
244 return dbus.ObjectPath(self.path)
246 @dbus.service.method(DBUS_PROP_IFACE,
248 out_signature='a{sv}')
249 def GetAll(self, interface):
250 if interface != GATT_DESC_IFACE:
251 raise InvalidArgsException()
253 return self.get_properties()[GATT_DESC_IFACE]
255 @dbus.service.method(GATT_DESC_IFACE, in_signature='a{sv}', out_signature='ay')
256 def ReadValue(self, options):
257 print('Default ReadValue called, returning error')
258 raise NotSupportedException()
260 @dbus.service.method(GATT_DESC_IFACE, in_signature='aya{sv}')
261 def WriteValue(self, value, options):
262 print('Default WriteValue called, returning error')
263 raise NotSupportedException()
266 class AlertNotificationService(Service):
267 TEST_SVC_UUID = '00001811-0000-1000-8000-00805f9b34fb'
269 def __init__(self, bus, index):
270 global alert_status_char_obj
271 Service.__init__(self, bus, index, self.TEST_SVC_UUID, primary=True)
272 self.add_characteristic(SupportedNewAlertCategoryCharacteristic(bus, '0001', self))
273 self.add_characteristic(AlertNotificationControlPointCharacteristic(bus, '0002', self))
274 alert_status_char_obj = UnreadAlertStatusCharacteristic(bus, '0003', self)
275 self.add_characteristic(alert_status_char_obj)
278 class SupportedNewAlertCategoryCharacteristic(Characteristic):
279 SUPPORT_NEW_ALERT_UUID = '00002A47-0000-1000-8000-00805f9b34fb'
281 def __init__(self, bus, index, service):
282 Characteristic.__init__(
284 self.SUPPORT_NEW_ALERT_UUID,
288 self.value = [dbus.Byte(2)]
290 def ReadValue(self, options):
294 for val in self.value:
295 val_list.append(dbus.Byte(val))
296 print("Read Request received\n", "\tSupportedNewAlertCategoryCharacteristic")
297 print("\tValue:", "\t", val_list)
301 class AlertNotificationControlPointCharacteristic(Characteristic):
302 ALERT_NOTIF_UUID = '00002A44-0000-1000-8000-00805f9b34fb'
304 def __init__(self, bus, index, service):
305 Characteristic.__init__(
307 self.ALERT_NOTIF_UUID,
311 self.value = [dbus.Byte(0)]
313 def ReadValue(self, options):
315 for val in self.value:
316 val_list.append(dbus.Byte(val))
317 print("Read Request received\n", "\tAlertNotificationControlPointCharacteristic")
318 print("\tValue:", "\t", val_list)
321 def WriteValue(self, value, options):
324 print("Write Request received\n", "\tAlertNotificationControlPointCharacteristic")
325 print("\tCurrent value:", "\t", self.value)
329 self.value = val_list
330 # Check if new value is written
331 print("\tNew value:", "\t", self.value)
332 if not self.value == value:
333 print("Failed: Write Request\n\tNew value not written\tCurrent value:", self.value)
336 class UnreadAlertStatusCharacteristic(Characteristic):
337 UNREAD_ALERT_STATUS_UUID = '00002A45-0000-1000-8000-00805f9b34fb'
339 def __init__(self, bus, index, service):
340 Characteristic.__init__(
342 self.UNREAD_ALERT_STATUS_UUID,
343 ['read', 'write', 'notify'],
345 self.value = [dbus.Byte(0)]
346 self.cccd_obj = ClientCharacteristicConfigurationDescriptor(bus, '0001', self)
347 self.add_descriptor(self.cccd_obj)
348 self.notifying = False
350 def StartNotify(self):
351 global CHAR_SUBSCRIBE
352 CHAR_SUBSCRIBE = True
355 print('\nAlready notifying, nothing to do')
357 self.notifying = True
358 print("\nNotify Started")
359 self.cccd_obj.WriteValue([dbus.Byte(1), dbus.Byte(0)])
360 self.cccd_obj.ReadValue()
362 def StopNotify(self):
363 if not self.notifying:
364 print('\nNot notifying, nothing to do')
366 self.notifying = False
367 print("\nNotify Stopped")
369 def ReadValue(self, options):
370 print("Read Request received\n", "\tUnreadAlertStatusCharacteristic")
372 for val in self.value:
373 val_list.append(dbus.Byte(val))
374 print("\tValue:", "\t", val_list)
377 def WriteValue(self, value, options):
378 print("Write Request received\n", "\tUnreadAlertStatusCharacteristic")
382 self.value = val_list
383 # Check if new value is written
384 print("\tNew value:", "\t", self.value)
385 if not self.value == value:
386 print("Failed: Write Request\n\tNew value not written\tCurrent value:", self.value)
389 class ClientCharacteristicConfigurationDescriptor(Descriptor):
390 CCCD_UUID = '00002902-0000-1000-8000-00805f9b34fb'
392 def __init__(self, bus, index, characteristic):
393 self.value = [dbus.Byte(0)]
401 print("\tValue on read:", "\t", self.value)
404 def WriteValue(self, value):
408 self.value = val_list
409 # Check if new value is written
410 print("New value on write:", "\t", self.value)
411 if not self.value == value:
412 print("Failed: Write Request\n\tNew value not written\tCurrent value:", self.value)