OSDN Git Service

tests/example-gatt-client: Fix using invalid property
[android-x86/external-bluetooth-bluez.git] / test / example-gatt-client
1 #!/usr/bin/env python3
2
3 import dbus
4 try:
5   from gi.repository import GObject
6 except ImportError:
7   import gobject as GObject
8 import sys
9
10 from dbus.mainloop.glib import DBusGMainLoop
11
12 bus = None
13 mainloop = None
14
15 BLUEZ_SERVICE_NAME = 'org.bluez'
16 DBUS_OM_IFACE =      'org.freedesktop.DBus.ObjectManager'
17 DBUS_PROP_IFACE =    'org.freedesktop.DBus.Properties'
18
19 GATT_SERVICE_IFACE = 'org.bluez.GattService1'
20 GATT_CHRC_IFACE =    'org.bluez.GattCharacteristic1'
21
22 HR_SVC_UUID =        '0000180d-0000-1000-8000-00805f9b34fb'
23 HR_MSRMT_UUID =      '00002a37-0000-1000-8000-00805f9b34fb'
24 BODY_SNSR_LOC_UUID = '00002a38-0000-1000-8000-00805f9b34fb'
25 HR_CTRL_PT_UUID =    '00002a39-0000-1000-8000-00805f9b34fb'
26
27 # The objects that we interact with.
28 hr_service = None
29 hr_msrmt_chrc = None
30 body_snsr_loc_chrc = None
31 hr_ctrl_pt_chrc = None
32
33
34 def generic_error_cb(error):
35     print('D-Bus call failed: ' + str(error))
36     mainloop.quit()
37
38
39 def body_sensor_val_to_str(val):
40     if val == 0:
41         return 'Other'
42     if val == 1:
43         return 'Chest'
44     if val == 2:
45         return 'Wrist'
46     if val == 3:
47         return 'Finger'
48     if val == 4:
49         return 'Hand'
50     if val == 5:
51         return 'Ear Lobe'
52     if val == 6:
53         return 'Foot'
54
55     return 'Reserved value'
56
57
58 def sensor_contact_val_to_str(val):
59     if val == 0 or val == 1:
60         return 'not supported'
61     if val == 2:
62         return 'no contact detected'
63     if val == 3:
64         return 'contact detected'
65
66     return 'invalid value'
67
68
69 def body_sensor_val_cb(value):
70     if len(value) != 1:
71         print('Invalid body sensor location value: ' + repr(value))
72         return
73
74     print('Body sensor location value: ' + body_sensor_val_to_str(value[0]))
75
76
77 def hr_msrmt_start_notify_cb():
78     print('HR Measurement notifications enabled')
79
80
81 def hr_msrmt_changed_cb(iface, changed_props, invalidated_props):
82     if iface != GATT_CHRC_IFACE:
83         return
84
85     if not len(changed_props):
86         return
87
88     value = changed_props.get('Value', None)
89     if not value:
90         return
91
92     print('New HR Measurement')
93
94     flags = value[0]
95     value_format = flags & 0x01
96     sc_status = (flags >> 1) & 0x03
97     ee_status = flags & 0x08
98
99     if value_format == 0x00:
100         hr_msrmt = value[1]
101         next_ind = 2
102     else:
103         hr_msrmt = value[1] | (value[2] << 8)
104         next_ind = 3
105
106     print('\tHR: ' + str(int(hr_msrmt)))
107     print('\tSensor Contact status: ' +
108           sensor_contact_val_to_str(sc_status))
109
110     if ee_status:
111         print('\tEnergy Expended: ' + str(int(value[next_ind])))
112
113
114 def start_client():
115     # Read the Body Sensor Location value and print it asynchronously.
116     body_snsr_loc_chrc[0].ReadValue(reply_handler=body_sensor_val_cb,
117                                     error_handler=generic_error_cb,
118                                     dbus_interface=GATT_CHRC_IFACE)
119
120     # Listen to PropertiesChanged signals from the Heart Measurement
121     # Characteristic.
122     hr_msrmt_prop_iface = dbus.Interface(hr_msrmt_chrc[0], DBUS_PROP_IFACE)
123     hr_msrmt_prop_iface.connect_to_signal("PropertiesChanged",
124                                           hr_msrmt_changed_cb)
125
126     # Subscribe to Heart Rate Measurement notifications.
127     hr_msrmt_chrc[0].StartNotify(reply_handler=hr_msrmt_start_notify_cb,
128                                  error_handler=generic_error_cb,
129                                  dbus_interface=GATT_CHRC_IFACE)
130
131
132 def process_chrc(chrc_path):
133     chrc = bus.get_object(BLUEZ_SERVICE_NAME, chrc_path)
134     chrc_props = chrc.GetAll(GATT_CHRC_IFACE,
135                              dbus_interface=DBUS_PROP_IFACE)
136
137     uuid = chrc_props['UUID']
138
139     if uuid == HR_MSRMT_UUID:
140         global hr_msrmt_chrc
141         hr_msrmt_chrc = (chrc, chrc_props)
142     elif uuid == BODY_SNSR_LOC_UUID:
143         global body_snsr_loc_chrc
144         body_snsr_loc_chrc = (chrc, chrc_props)
145     elif uuid == HR_CTRL_PT_UUID:
146         global hr_ctrl_pt_chrc
147         hr_ctrl_pt_chrc = (chrc, chrc_props)
148     else:
149         print('Unrecognized characteristic: ' + uuid)
150
151     return True
152
153
154 def process_hr_service(service_path, chrc_paths):
155     service = bus.get_object(BLUEZ_SERVICE_NAME, service_path)
156     service_props = service.GetAll(GATT_SERVICE_IFACE,
157                                    dbus_interface=DBUS_PROP_IFACE)
158
159     uuid = service_props['UUID']
160
161     if uuid != HR_SVC_UUID:
162         return False
163
164     print('Heart Rate Service found: ' + service_path)
165
166     # Process the characteristics.
167     for chrc_path in chrc_paths:
168         process_chrc(chrc_path)
169
170     global hr_service
171     hr_service = (service, service_props, service_path)
172
173     return True
174
175
176 def interfaces_removed_cb(object_path, interfaces):
177     if not hr_service:
178         return
179
180     if object_path == hr_service[2]:
181         print('Service was removed')
182         mainloop.quit()
183
184
185 def main():
186     # Set up the main loop.
187     DBusGMainLoop(set_as_default=True)
188     global bus
189     bus = dbus.SystemBus()
190     global mainloop
191     mainloop = GObject.MainLoop()
192
193     om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE)
194     om.connect_to_signal('InterfacesRemoved', interfaces_removed_cb)
195
196     objects = om.GetManagedObjects()
197     chrcs = []
198
199     for path, interfaces in objects.items():
200         if GATT_CHRC_IFACE not in interfaces.keys():
201             continue
202         chrcs.append(path)
203
204     for path, interfaces in objects.items():
205         if GATT_SERVICE_IFACE not in interfaces.keys():
206             continue
207
208         chrc_paths = [d for d in chrcs if d.startswith(path + "/")]
209
210         if process_hr_service(path, chrc_paths):
211             break
212
213     if not hr_service:
214         print('No Heart Rate Service found')
215         sys.exit(1)
216
217     start_client()
218
219     mainloop.run()
220
221
222 if __name__ == '__main__':
223     main()