5 from gi.repository import GObject
7 import gobject as GObject
10 from dbus.mainloop.glib import DBusGMainLoop
15 BLUEZ_SERVICE_NAME = 'org.bluez'
16 DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
17 DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
19 GATT_SERVICE_IFACE = 'org.bluez.GattService1'
20 GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1'
22 HR_SVC_UUID = '0000180d-0000-1000-8000-00805f9b34fb'
23 HR_MSRMT_UUID = '00002a37-0000-1000-8000-00805f9b34fb'
24 BODY_SNSR_LOC_UUID = '00002a38-0000-1000-8000-00805f9b34fb'
25 HR_CTRL_PT_UUID = '00002a39-0000-1000-8000-00805f9b34fb'
27 # The objects that we interact with.
30 body_snsr_loc_chrc = None
31 hr_ctrl_pt_chrc = None
34 def generic_error_cb(error):
35 print('D-Bus call failed: ' + str(error))
39 def body_sensor_val_to_str(val):
55 return 'Reserved value'
58 def sensor_contact_val_to_str(val):
59 if val == 0 or val == 1:
60 return 'not supported'
62 return 'no contact detected'
64 return 'contact detected'
66 return 'invalid value'
69 def body_sensor_val_cb(value):
71 print('Invalid body sensor location value: ' + repr(value))
74 print('Body sensor location value: ' + body_sensor_val_to_str(value[0]))
77 def hr_msrmt_start_notify_cb():
78 print('HR Measurement notifications enabled')
81 def hr_msrmt_changed_cb(iface, changed_props, invalidated_props):
82 if iface != GATT_CHRC_IFACE:
85 if not len(changed_props):
88 value = changed_props.get('Value', None)
92 print('New HR Measurement')
95 value_format = flags & 0x01
96 sc_status = (flags >> 1) & 0x03
97 ee_status = flags & 0x08
99 if value_format == 0x00:
103 hr_msrmt = value[1] | (value[2] << 8)
106 print('\tHR: ' + str(int(hr_msrmt)))
107 print('\tSensor Contact status: ' +
108 sensor_contact_val_to_str(sc_status))
111 print('\tEnergy Expended: ' + str(int(value[next_ind])))
115 # Read the Body Sensor Location value and print it asynchronously.
116 body_snsr_loc_chrc[0].ReadValue(reply_handler=body_sensor_val_cb,
117 error_handler=generic_error_cb,
118 dbus_interface=GATT_CHRC_IFACE)
120 # Listen to PropertiesChanged signals from the Heart Measurement
122 hr_msrmt_prop_iface = dbus.Interface(hr_msrmt_chrc[0], DBUS_PROP_IFACE)
123 hr_msrmt_prop_iface.connect_to_signal("PropertiesChanged",
126 # Subscribe to Heart Rate Measurement notifications.
127 hr_msrmt_chrc[0].StartNotify(reply_handler=hr_msrmt_start_notify_cb,
128 error_handler=generic_error_cb,
129 dbus_interface=GATT_CHRC_IFACE)
132 def process_chrc(chrc_path):
133 chrc = bus.get_object(BLUEZ_SERVICE_NAME, chrc_path)
134 chrc_props = chrc.GetAll(GATT_CHRC_IFACE,
135 dbus_interface=DBUS_PROP_IFACE)
137 uuid = chrc_props['UUID']
139 if uuid == HR_MSRMT_UUID:
141 hr_msrmt_chrc = (chrc, chrc_props)
142 elif uuid == BODY_SNSR_LOC_UUID:
143 global body_snsr_loc_chrc
144 body_snsr_loc_chrc = (chrc, chrc_props)
145 elif uuid == HR_CTRL_PT_UUID:
146 global hr_ctrl_pt_chrc
147 hr_ctrl_pt_chrc = (chrc, chrc_props)
149 print('Unrecognized characteristic: ' + uuid)
154 def process_hr_service(service_path, chrc_paths):
155 service = bus.get_object(BLUEZ_SERVICE_NAME, service_path)
156 service_props = service.GetAll(GATT_SERVICE_IFACE,
157 dbus_interface=DBUS_PROP_IFACE)
159 uuid = service_props['UUID']
161 if uuid != HR_SVC_UUID:
164 print('Heart Rate Service found: ' + service_path)
166 # Process the characteristics.
167 for chrc_path in chrc_paths:
168 process_chrc(chrc_path)
171 hr_service = (service, service_props, service_path)
176 def interfaces_removed_cb(object_path, interfaces):
180 if object_path == hr_service[2]:
181 print('Service was removed')
186 # Set up the main loop.
187 DBusGMainLoop(set_as_default=True)
189 bus = dbus.SystemBus()
191 mainloop = GObject.MainLoop()
193 om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE)
194 om.connect_to_signal('InterfacesRemoved', interfaces_removed_cb)
196 objects = om.GetManagedObjects()
199 for path, interfaces in objects.items():
200 if GATT_CHRC_IFACE not in interfaces.keys():
204 for path, interfaces in objects.items():
205 if GATT_SERVICE_IFACE not in interfaces.keys():
208 chrc_paths = [d for d in chrcs if d.startswith(path + "/")]
210 if process_hr_service(path, chrc_paths):
214 print('No Heart Rate Service found')
222 if __name__ == '__main__':