OSDN Git Service

test: Update GATT examples with the new API
authorLuiz Augusto von Dentz <luiz.von.dentz@intel.com>
Fri, 6 May 2016 08:36:28 +0000 (11:36 +0300)
committerLuiz Augusto von Dentz <luiz.von.dentz@intel.com>
Wed, 18 May 2016 11:16:28 +0000 (14:16 +0300)
test/example-gatt-client
test/example-gatt-server
test/test-gatt-profile

index b77a627..4d1df2f 100755 (executable)
@@ -113,7 +113,7 @@ def hr_msrmt_changed_cb(iface, changed_props, invalidated_props):
 
 def start_client():
     # Read the Body Sensor Location value and print it asynchronously.
-    body_snsr_loc_chrc[0].ReadValue(reply_handler=body_sensor_val_cb,
+    body_snsr_loc_chrc[0].ReadValue({}, reply_handler=body_sensor_val_cb,
                                     error_handler=generic_error_cb,
                                     dbus_interface=GATT_CHRC_IFACE)
 
index 93cf068..71aeb1b 100755 (executable)
@@ -166,13 +166,15 @@ class Characteristic(dbus.service.Object):
 
         return self.get_properties[GATT_CHRC_IFACE]
 
-    @dbus.service.method(GATT_CHRC_IFACE, out_signature='ay')
-    def ReadValue(self):
+    @dbus.service.method(GATT_CHRC_IFACE,
+                        in_signature='a{sv}',
+                        out_signature='ay')
+    def ReadValue(self, options):
         print('Default ReadValue called, returning error')
         raise NotSupportedException()
 
-    @dbus.service.method(GATT_CHRC_IFACE, in_signature='ay')
-    def WriteValue(self, value):
+    @dbus.service.method(GATT_CHRC_IFACE, in_signature='aya{sv}')
+    def WriteValue(self, value, options):
         print('Default WriteValue called, returning error')
         raise NotSupportedException()
 
@@ -222,13 +224,15 @@ class Descriptor(dbus.service.Object):
 
         return self.get_properties[GATT_CHRC_IFACE]
 
-    @dbus.service.method(GATT_DESC_IFACE, out_signature='ay')
-    def ReadValue(self):
+    @dbus.service.method(GATT_DESC_IFACE,
+                        in_signature='a{sv}',
+                        out_signature='ay')
+    def ReadValue(self, options):
         print ('Default ReadValue called, returning error')
         raise NotSupportedException()
 
-    @dbus.service.method(GATT_DESC_IFACE, in_signature='ay')
-    def WriteValue(self, value):
+    @dbus.service.method(GATT_DESC_IFACE, in_signature='aya{sv}')
+    def WriteValue(self, value, options):
         print('Default WriteValue called, returning error')
         raise NotSupportedException()
 
@@ -317,7 +321,7 @@ class BodySensorLocationChrc(Characteristic):
                 ['read'],
                 service)
 
-    def ReadValue(self):
+    def ReadValue(self, options):
         # Return 'Chest' as the sensor location.
         return [ 0x01 ]
 
@@ -331,7 +335,7 @@ class HeartRateControlPointChrc(Characteristic):
                 ['write'],
                 service)
 
-    def WriteValue(self, value):
+    def WriteValue(self, value, options):
         print('Heart Rate Control Point WriteValue called')
 
         if len(value) != 1:
@@ -393,7 +397,7 @@ class BatteryLevelCharacteristic(Characteristic):
         self.notify_battery_level()
         return True
 
-    def ReadValue(self):
+    def ReadValue(self, options):
         print('Battery Level read: ' + repr(self.battery_lvl))
         return [dbus.Byte(self.battery_lvl)]
 
@@ -425,6 +429,7 @@ class TestService(Service):
         Service.__init__(self, bus, index, self.TEST_SVC_UUID, False)
         self.add_characteristic(TestCharacteristic(bus, 0, self))
         self.add_characteristic(TestEncryptCharacteristic(bus, 1, self))
+        self.add_characteristic(TestSecureCharacteristic(bus, 2, self))
 
 class TestCharacteristic(Characteristic):
     """
@@ -445,11 +450,11 @@ class TestCharacteristic(Characteristic):
         self.add_descriptor(
                 CharacteristicUserDescriptionDescriptor(bus, 1, self))
 
-    def ReadValue(self):
+    def ReadValue(self, options):
         print('TestCharacteristic Read: ' + repr(self.value))
         return self.value
 
-    def WriteValue(self, value):
+    def WriteValue(self, value, options):
         print('TestCharacteristic Write: ' + repr(value))
         self.value = value
 
@@ -468,7 +473,7 @@ class TestDescriptor(Descriptor):
                 ['read', 'write'],
                 characteristic)
 
-    def ReadValue(self):
+    def ReadValue(self, options):
         return [
                 dbus.Byte('T'), dbus.Byte('e'), dbus.Byte('s'), dbus.Byte('t')
         ]
@@ -491,10 +496,10 @@ class CharacteristicUserDescriptionDescriptor(Descriptor):
                 ['read', 'write'],
                 characteristic)
 
-    def ReadValue(self):
+    def ReadValue(self, options):
         return self.value
 
-    def WriteValue(self, value):
+    def WriteValue(self, value, options):
         if not self.writable:
             raise NotPermittedException()
         self.value = value
@@ -517,11 +522,11 @@ class TestEncryptCharacteristic(Characteristic):
         self.add_descriptor(
                 CharacteristicUserDescriptionDescriptor(bus, 3, self))
 
-    def ReadValue(self):
+    def ReadValue(self, options):
         print('TestCharacteristic Read: ' + repr(self.value))
         return self.value
 
-    def WriteValue(self, value):
+    def WriteValue(self, value, options):
         print('TestCharacteristic Write: ' + repr(value))
         self.value = value
 
@@ -539,7 +544,54 @@ class TestEncryptDescriptor(Descriptor):
                 ['encrypt-read', 'encrypt-write'],
                 characteristic)
 
-    def ReadValue(self):
+    def ReadValue(self, options):
+        return [
+                dbus.Byte('T'), dbus.Byte('e'), dbus.Byte('s'), dbus.Byte('t')
+        ]
+
+
+class TestSecureCharacteristic(Characteristic):
+    """
+    Dummy test characteristic requiring secure connection.
+
+    """
+    TEST_CHRC_UUID = '12345678-1234-5678-1234-56789abcdef5'
+
+    def __init__(self, bus, index, service):
+        Characteristic.__init__(
+                self, bus, index,
+                self.TEST_CHRC_UUID,
+                ['secure-read', 'secure-write'],
+                service)
+        self.value = []
+        self.add_descriptor(TestEncryptDescriptor(bus, 2, self))
+        self.add_descriptor(
+                CharacteristicUserDescriptionDescriptor(bus, 3, self))
+
+    def ReadValue(self, options):
+        print('TestCharacteristic Read: ' + repr(self.value))
+        return self.value
+
+    def WriteValue(self, value, options):
+        print('TestCharacteristic Write: ' + repr(value))
+        self.value = value
+
+
+class TestSecureDescriptor(Descriptor):
+    """
+    Dummy test descriptor requiring secure connection. Returns a static value.
+
+    """
+    TEST_DESC_UUID = '12345678-1234-5678-1234-56789abcdef6'
+
+    def __init__(self, bus, index, characteristic):
+        Descriptor.__init__(
+                self, bus, index,
+                self.TEST_DESC_UUID,
+                ['secure-read', 'secure-write'],
+                characteristic)
+
+    def ReadValue(self, options):
         return [
                 dbus.Byte('T'), dbus.Byte('e'), dbus.Byte('s'), dbus.Byte('t')
         ]
index ad320b1..995a659 100755 (executable)
@@ -15,46 +15,116 @@ except ImportError:
   import gobject as GObject
 import bluezutils
 
-class GattProfile(dbus.service.Object):
-       @dbus.service.method("org.bluez.GattProfile1",
-                                       in_signature="", out_signature="")
-       def Release(self):
-               print("Release")
-               mainloop.quit()
+BLUEZ_SERVICE_NAME = 'org.bluez'
+GATT_MANAGER_IFACE = 'org.bluez.GattManager1'
+DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
+DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties'
 
-if __name__ == '__main__':
-       dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
+GATT_PROFILE_IFACE = 'org.bluez.GattProfile1'
+
+
+class InvalidArgsException(dbus.exceptions.DBusException):
+    _dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs'
+
+
+class Application(dbus.service.Object):
+    def __init__(self, bus):
+        self.path = '/'
+        self.profiles = []
+        dbus.service.Object.__init__(self, bus, self.path)
+
+    def get_path(self):
+        return dbus.ObjectPath(self.path)
+
+    def add_profile(self, profile):
+        self.profiles.append(profile)
+
+    @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')
+    def GetManagedObjects(self):
+        response = {}
+        print('GetManagedObjects')
+
+        for profile in self.profiles:
+            response[profile.get_path()] = profile.get_properties()
+
+        return response
+
+
+class Profile(dbus.service.Object):
+    PATH_BASE = '/org/bluez/example/profile'
 
-       bus = dbus.SystemBus()
+    def __init__(self, bus, uuids):
+        self.path = self.PATH_BASE
+        self.bus = bus
+        self.uuids = uuids
+        dbus.service.Object.__init__(self, bus, self.path)
+
+    def get_properties(self):
+        return {
+            GATT_PROFILE_IFACE: {
+                'UUIDs': self.uuids,
+            }
+        }
+
+    def get_path(self):
+        return dbus.ObjectPath(self.path)
+
+    @dbus.service.method(GATT_PROFILE_IFACE,
+                        in_signature="",
+                        out_signature="")
+    def Release(self):
+        print("Release")
+        mainloop.quit()
+
+    @dbus.service.method(DBUS_PROP_IFACE,
+                         in_signature='s',
+                         out_signature='a{sv}')
+    def GetAll(self, interface):
+        if interface != GATT_PROFILE_IFACE:
+            raise InvalidArgsException()
+
+        return self.get_properties[GATT_PROFILE_IFACE]
+
+
+def register_app_cb():
+    print('GATT application registered')
+
+
+def register_app_error_cb(error):
+    print('Failed to register application: ' + str(error))
+    mainloop.quit()
+
+if __name__ == '__main__':
+    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
 
-       path = bluezutils.find_adapter().object_path
+    bus = dbus.SystemBus()
 
-       manager = dbus.Interface(bus.get_object("org.bluez", path),
-                               "org.bluez.GattManager1")
+    path = bluezutils.find_adapter().object_path
 
-       option_list = [
-                       make_option("-u", "--uuid", action="store",
-                                       type="string", dest="uuid",
-                                       default=None),
-                       make_option("-p", "--path", action="store",
-                                       type="string", dest="path",
-                                       default="/foo/bar/profile"),
-                       ]
+    manager = dbus.Interface(bus.get_object("org.bluez", path),
+                            GATT_MANAGER_IFACE)
 
-       opts = dbus.Dictionary({ }, signature='sv')
+    option_list = [make_option("-u", "--uuid", action="store",
+                                type="string", dest="uuid",
+                                default=None),
+    ]
 
-       parser = OptionParser(option_list=option_list)
+    opts = dbus.Dictionary({}, signature='sv')
 
-       (options, args) = parser.parse_args()
+    parser = OptionParser(option_list=option_list)
 
-       profile = GattProfile(bus, options.path)
+    (options, args) = parser.parse_args()
 
-       mainloop = GObject.MainLoop()
+    mainloop = GObject.MainLoop()
 
-       if not options.uuid:
-               options.uuid = str(uuid.uuid4())
+    if not options.uuid:
+        options.uuid = str(uuid.uuid4())
 
-       uuids = { options.uuid }
-       manager.RegisterProfile(options.path, uuids, opts)
+    app = Application(bus)
+    profile = Profile(bus, [options.uuid])
+    app.add_profile(profile)
+    manager.RegisterApplication(app.get_path(), {},
+                                reply_handler=register_app_cb,
+                                error_handler=register_app_error_cb)
 
-       mainloop.run()
+    mainloop.run()