]> granicus.if.org Git - esp-idf/blob - tools/ble/lib_gatt.py
Add NimBLE bleprph,blecent,blehr example tests
[esp-idf] / tools / ble / lib_gatt.py
1 #!/usr/bin/env python
2 #
3 # Copyright 2019 Espressif Systems (Shanghai) PTE LTD
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 #     http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 # Creating GATT Application which then becomes available to remote devices.
19
20 from __future__ import print_function
21 import sys
22
23 try:
24     import dbus
25     import dbus.service
26 except ImportError as e:
27     if 'linux' not in sys.platform:
28         sys.exit("Error: Only supported on Linux platform")
29     print(e)
30     print("Install packages `libgirepository1.0-dev gir1.2-gtk-3.0 libcairo2-dev libdbus-1-dev libdbus-glib-1-dev` for resolving the issue")
31     print("Run `pip install -r $IDF_PATH/tools/ble/requirements.txt` for resolving the issue")
32     raise
33
34 alert_status_char_obj = None
35
36 GATT_APP_OBJ = False
37 CHAR_READ = False
38 CHAR_WRITE = False
39 CHAR_SUBSCRIBE = False
40
41 DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
42 DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
43 GATT_MANAGER_IFACE = 'org.bluez.GattManager1'
44 GATT_SERVICE_IFACE = 'org.bluez.GattService1'
45 GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1'
46 GATT_DESC_IFACE = 'org.bluez.GattDescriptor1'
47
48
49 class InvalidArgsException(dbus.exceptions.DBusException):
50     _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs'
51
52
53 class NotSupportedException(dbus.exceptions.DBusException):
54     _dbus_error_name = 'org.bluez.Error.NotSupported'
55
56
57 class Application(dbus.service.Object):
58     """
59     org.bluez.GattApplication1 interface implementation
60     """
61
62     def __init__(self, bus, path):
63         self.path = path
64         self.services = []
65         srv_obj = AlertNotificationService(bus, '0001')
66         self.add_service(srv_obj)
67         dbus.service.Object.__init__(self, bus, self.path)
68
69     def __del__(self):
70         pass
71
72     def get_path(self):
73         return dbus.ObjectPath(self.path)
74
75     def add_service(self, service):
76         self.services.append(service)
77
78     @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')
79     def GetManagedObjects(self):
80         global GATT_APP_OBJ
81         response = {}
82
83         for service in self.services:
84             response[service.get_path()] = service.get_properties()
85             chrcs = service.get_characteristics()
86             for chrc in chrcs:
87                 response[chrc.get_path()] = chrc.get_properties()
88                 descs = chrc.get_descriptors()
89                 for desc in descs:
90                     response[desc.get_path()] = desc.get_properties()
91
92         GATT_APP_OBJ = True
93         return response
94
95     def Release(self):
96         pass
97
98
99 class Service(dbus.service.Object):
100     """
101     org.bluez.GattService1 interface implementation
102     """
103     PATH_BASE = '/org/bluez/hci0/service'
104
105     def __init__(self, bus, index, uuid, primary=False):
106         self.path = self.PATH_BASE + str(index)
107         self.bus = bus
108         self.uuid = uuid
109         self.primary = primary
110         self.characteristics = []
111         dbus.service.Object.__init__(self, bus, self.path)
112
113     def get_properties(self):
114         return {
115             GATT_SERVICE_IFACE: {
116                 'UUID': self.uuid,
117                 'Primary': self.primary,
118                 'Characteristics': dbus.Array(
119                     self.get_characteristic_paths(),
120                     signature='o')
121             }
122         }
123
124     def get_path(self):
125         return dbus.ObjectPath(self.path)
126
127     def add_characteristic(self, characteristic):
128         self.characteristics.append(characteristic)
129
130     def get_characteristic_paths(self):
131         result = []
132         for chrc in self.characteristics:
133             result.append(chrc.get_path())
134         return result
135
136     def get_characteristics(self):
137         return self.characteristics
138
139     @dbus.service.method(DBUS_PROP_IFACE,
140                          in_signature='s',
141                          out_signature='a{sv}')
142     def GetAll(self, interface):
143         if interface != GATT_SERVICE_IFACE:
144             raise InvalidArgsException()
145         return self.get_properties()[GATT_SERVICE_IFACE]
146
147
148 class Characteristic(dbus.service.Object):
149     """
150     org.bluez.GattCharacteristic1 interface implementation
151     """
152     def __init__(self, bus, index, uuid, flags, service):
153         self.path = service.path + '/char' + str(index)
154         self.bus = bus
155         self.uuid = uuid
156         self.service = service
157         self.flags = flags
158         self.value = [0]
159         self.descriptors = []
160         dbus.service.Object.__init__(self, bus, self.path)
161
162     def get_properties(self):
163         return {
164             GATT_CHRC_IFACE: {
165                 'Service': self.service.get_path(),
166                 'UUID': self.uuid,
167                 'Flags': self.flags,
168                 'Value': self.value,
169                 'Descriptors': dbus.Array(self.get_descriptor_paths(), signature='o')
170
171             }
172         }
173
174     def get_path(self):
175         return dbus.ObjectPath(self.path)
176
177     def add_descriptor(self, descriptor):
178         self.descriptors.append(descriptor)
179
180     def get_descriptor_paths(self):
181         result = []
182         for desc in self.descriptors:
183             result.append(desc.get_path())
184         return result
185
186     def get_descriptors(self):
187         return self.descriptors
188
189     @dbus.service.method(DBUS_PROP_IFACE, in_signature='s', out_signature='a{sv}')
190     def GetAll(self, interface):
191         if interface != GATT_CHRC_IFACE:
192             raise InvalidArgsException()
193
194         return self.get_properties()[GATT_CHRC_IFACE]
195
196     @dbus.service.method(GATT_CHRC_IFACE, in_signature='a{sv}', out_signature='ay')
197     def ReadValue(self, options):
198         print('\nDefault ReadValue called, returning error')
199         raise NotSupportedException()
200
201     @dbus.service.method(GATT_CHRC_IFACE, in_signature='aya{sv}')
202     def WriteValue(self, value, options):
203         print('\nDefault WriteValue called, returning error')
204         raise NotSupportedException()
205
206     @dbus.service.method(GATT_CHRC_IFACE)
207     def StartNotify(self):
208         print('Default StartNotify called, returning error')
209         raise NotSupportedException()
210
211     @dbus.service.method(GATT_CHRC_IFACE)
212     def StopNotify(self):
213         print('Default StopNotify called, returning error')
214         raise NotSupportedException()
215
216     @dbus.service.signal(DBUS_PROP_IFACE,
217                          signature='sa{sv}as')
218     def PropertiesChanged(self, interface, changed, invalidated):
219         print("\nProperties Changed")
220
221
222 class Descriptor(dbus.service.Object):
223     """
224     org.bluez.GattDescriptor1 interface implementation
225     """
226     def __init__(self, bus, index, uuid, flags, characteristic):
227         self.path = characteristic.path + '/desc' + str(index)
228         self.bus = bus
229         self.uuid = uuid
230         self.flags = flags
231         self.chrc = characteristic
232         dbus.service.Object.__init__(self, bus, self.path)
233
234     def get_properties(self):
235         return {
236             GATT_DESC_IFACE: {
237                 'Characteristic': self.chrc.get_path(),
238                 'UUID': self.uuid,
239                 'Flags': self.flags,
240             }
241         }
242
243     def get_path(self):
244         return dbus.ObjectPath(self.path)
245
246     @dbus.service.method(DBUS_PROP_IFACE,
247                          in_signature='s',
248                          out_signature='a{sv}')
249     def GetAll(self, interface):
250         if interface != GATT_DESC_IFACE:
251             raise InvalidArgsException()
252
253         return self.get_properties()[GATT_DESC_IFACE]
254
255     @dbus.service.method(GATT_DESC_IFACE, in_signature='a{sv}', out_signature='ay')
256     def ReadValue(self, options):
257         print('Default ReadValue called, returning error')
258         raise NotSupportedException()
259
260     @dbus.service.method(GATT_DESC_IFACE, in_signature='aya{sv}')
261     def WriteValue(self, value, options):
262         print('Default WriteValue called, returning error')
263         raise NotSupportedException()
264
265
266 class AlertNotificationService(Service):
267     TEST_SVC_UUID = '00001811-0000-1000-8000-00805f9b34fb'
268
269     def __init__(self, bus, index):
270         global alert_status_char_obj
271         Service.__init__(self, bus, index, self.TEST_SVC_UUID, primary=True)
272         self.add_characteristic(SupportedNewAlertCategoryCharacteristic(bus, '0001', self))
273         self.add_characteristic(AlertNotificationControlPointCharacteristic(bus, '0002', self))
274         alert_status_char_obj = UnreadAlertStatusCharacteristic(bus, '0003', self)
275         self.add_characteristic(alert_status_char_obj)
276
277
278 class SupportedNewAlertCategoryCharacteristic(Characteristic):
279     SUPPORT_NEW_ALERT_UUID = '00002A47-0000-1000-8000-00805f9b34fb'
280
281     def __init__(self, bus, index, service):
282         Characteristic.__init__(
283             self, bus, index,
284             self.SUPPORT_NEW_ALERT_UUID,
285             ['read'],
286             service)
287
288         self.value = [dbus.Byte(2)]
289
290     def ReadValue(self, options):
291         global CHAR_READ
292         CHAR_READ = True
293         val_list = []
294         for val in self.value:
295             val_list.append(dbus.Byte(val))
296         print("Read Request received\n", "\tSupportedNewAlertCategoryCharacteristic")
297         print("\tValue:", "\t", val_list)
298         return val_list
299
300
301 class AlertNotificationControlPointCharacteristic(Characteristic):
302     ALERT_NOTIF_UUID = '00002A44-0000-1000-8000-00805f9b34fb'
303
304     def __init__(self, bus, index, service):
305         Characteristic.__init__(
306             self, bus, index,
307             self.ALERT_NOTIF_UUID,
308             ['read','write'],
309             service)
310
311         self.value = [dbus.Byte(0)]
312
313     def ReadValue(self, options):
314         val_list = []
315         for val in self.value:
316             val_list.append(dbus.Byte(val))
317         print("Read Request received\n", "\tAlertNotificationControlPointCharacteristic")
318         print("\tValue:", "\t", val_list)
319         return val_list
320
321     def WriteValue(self, value, options):
322         global CHAR_WRITE
323         CHAR_WRITE = True
324         print("Write Request received\n", "\tAlertNotificationControlPointCharacteristic")
325         print("\tCurrent value:", "\t", self.value)
326         val_list = []
327         for val in value:
328             val_list.append(val)
329         self.value = val_list
330         # Check if new value is written
331         print("\tNew value:", "\t", self.value)
332         if not self.value == value:
333             print("Failed: Write Request\n\tNew value not written\tCurrent value:", self.value)
334
335
336 class UnreadAlertStatusCharacteristic(Characteristic):
337     UNREAD_ALERT_STATUS_UUID = '00002A45-0000-1000-8000-00805f9b34fb'
338
339     def __init__(self, bus, index, service):
340         Characteristic.__init__(
341             self, bus, index,
342             self.UNREAD_ALERT_STATUS_UUID,
343             ['read', 'write', 'notify'],
344             service)
345         self.value = [dbus.Byte(0)]
346         self.cccd_obj = ClientCharacteristicConfigurationDescriptor(bus, '0001', self)
347         self.add_descriptor(self.cccd_obj)
348         self.notifying = False
349
350     def StartNotify(self):
351         global CHAR_SUBSCRIBE
352         CHAR_SUBSCRIBE = True
353
354         if self.notifying:
355             print('\nAlready notifying, nothing to do')
356             return
357         self.notifying = True
358         print("\nNotify Started")
359         self.cccd_obj.WriteValue([dbus.Byte(1), dbus.Byte(0)])
360         self.cccd_obj.ReadValue()
361
362     def StopNotify(self):
363         if not self.notifying:
364             print('\nNot notifying, nothing to do')
365             return
366         self.notifying = False
367         print("\nNotify Stopped")
368
369     def ReadValue(self, options):
370         print("Read Request received\n", "\tUnreadAlertStatusCharacteristic")
371         val_list = []
372         for val in self.value:
373             val_list.append(dbus.Byte(val))
374         print("\tValue:", "\t", val_list)
375         return val_list
376
377     def WriteValue(self, value, options):
378         print("Write Request received\n", "\tUnreadAlertStatusCharacteristic")
379         val_list = []
380         for val in value:
381             val_list.append(val)
382         self.value = val_list
383         # Check if new value is written
384         print("\tNew value:", "\t", self.value)
385         if not self.value == value:
386             print("Failed: Write Request\n\tNew value not written\tCurrent value:", self.value)
387
388
389 class ClientCharacteristicConfigurationDescriptor(Descriptor):
390     CCCD_UUID = '00002902-0000-1000-8000-00805f9b34fb'
391
392     def __init__(self, bus, index, characteristic):
393         self.value = [dbus.Byte(0)]
394         Descriptor.__init__(
395             self, bus, index,
396             self.CCCD_UUID,
397             ['read', 'write'],
398             characteristic)
399
400     def ReadValue(self):
401         print("\tValue on read:", "\t", self.value)
402         return self.value
403
404     def WriteValue(self, value):
405         val_list = []
406         for val in value:
407             val_list.append(val)
408         self.value = val_list
409         # Check if new value is written
410         print("New value on write:", "\t", self.value)
411         if not self.value == value:
412             print("Failed: Write Request\n\tNew value not written\tCurrent value:", self.value)