OSDN Git Service

test: Add simple HFP test script
authorJohan Hedberg <johan.hedberg@intel.com>
Mon, 19 Nov 2012 18:46:31 +0000 (20:46 +0200)
committerJohan Hedberg <johan.hedberg@intel.com>
Mon, 19 Nov 2012 18:53:58 +0000 (20:53 +0200)
Makefile.tools
test/test-hfp [new file with mode: 0755]

index cf87ef5..422c57f 100644 (file)
@@ -209,4 +209,4 @@ EXTRA_DIST += test/sap_client.py test/hsplay test/hsmicro \
                test/test-health-sink test/service-record.dtd \
                test/service-did.xml test/service-spp.xml test/service-opp.xml \
                test/service-ftp.xml test/simple-player test/test-nap \
-               test/test-heartrate test/test-alert
+               test/test-heartrate test/test-alert test/test-hfp
diff --git a/test/test-hfp b/test/test-hfp
new file mode 100755 (executable)
index 0000000..9aa4ec0
--- /dev/null
@@ -0,0 +1,234 @@
+#!/usr/bin/python
+
+from __future__ import absolute_import, print_function, unicode_literals
+
+from gi.repository import GObject
+
+import os
+import sys
+import dbus
+import glib
+import dbus.service
+import dbus.mainloop.glib
+from optparse import OptionParser, make_option
+from socket import SOCK_SEQPACKET, socket
+
+mainloop = None
+audio_supported = True
+
+try:
+       from socket import AF_BLUETOOTH, BTPROTO_SCO
+except:
+       print("WARNING: python compiled without Bluetooth support"
+                                       " - audio will not be available")
+       audio_supported = False
+
+BUF_SIZE = 1024
+
+BDADDR_ANY = '00:00:00:00:00:00'
+
+HF_NREC                        = 0x0001
+HF_3WAY                        = 0x0002
+HF_CLI                 = 0x0004
+HF_VOICE_RECOGNITION   = 0x0008
+HF_REMOTE_VOL          = 0x0010
+HF_ENHANCED_STATUS     = 0x0020
+HF_ENHANCED_CONTROL    = 0x0040
+HF_CODEC_NEGOTIATION   = 0x0080
+
+AG_3WAY                        = 0x0001
+AG_NREC                        = 0x0002
+AG_VOICE_RECOGNITION   = 0x0004
+AG_INBAND_RING         = 0x0008
+AG_VOICE_TAG           = 0x0010
+AG_REJECT_CALL         = 0x0020
+AG_ENHANCED_STATUS     = 0x0040
+AG_ENHANCED_CONTROL    = 0x0080
+AG_EXTENDED_RESULT     = 0x0100
+AG_CODEC_NEGOTIATION   = 0x0200
+
+HF_FEATURES = (HF_3WAY | HF_CLI | HF_VOICE_RECOGNITION |
+                       HF_REMOTE_VOL | HF_ENHANCED_STATUS |
+                       HF_ENHANCED_CONTROL | HF_CODEC_NEGOTIATION)
+
+AVAIL_CODECS = "1,2"
+
+class HfpConnection:
+       slc_complete = False
+       fd = None
+       io_id = 0
+       version = 0
+       features = 0
+       pending = None
+
+       def slc_completed(self):
+               print("SLC establisment complete")
+               self.slc_complete = True
+
+       def slc_next_cmd(self, cmd):
+               if not cmd:
+                       self.send_cmd("AT+BRSF=%u" % (HF_FEATURES))
+               elif (cmd.startswith("AT+BRSF")):
+                       if (self.features & AG_CODEC_NEGOTIATION and
+                                       HF_FEATURES & HF_CODEC_NEGOTIATION):
+                               self.send_cmd("AT+BAC=%s" % (AVAIL_CODECS))
+                       else:
+                               self.send_cmd("AT+CIND=?")
+               elif (cmd.startswith("AT+BAC")):
+                       self.send_cmd("AT+CIND=?")
+               elif (cmd.startswith("AT+CIND=?")):
+                       self.send_cmd("AT+CIND?")
+               elif (cmd.startswith("AT+CIND?")):
+                       self.send_cmd("AT+CMER=3,0,0,1")
+               elif (cmd.startswith("AT+CMER=")):
+                       if (HF_FEATURES & HF_3WAY and self.features & AG_3WAY):
+                               self.send_cmd("AT+CHLD=?")
+                       else:
+                               self.slc_completed()
+               elif (cmd.startswith("AT+CHLD=?")):
+                       self.slc_completed()
+               else:
+                       print("Unknown SLC command completed: %s" % (cmd))
+
+       def io_cb(self, fd, cond):
+               buf = os.read(fd, BUF_SIZE)
+               buf = buf.strip()
+
+               print("Received: %s" % (buf))
+
+               if (buf == "OK" or buf == "ERROR"):
+                       cmd = self.pending
+                       self.pending = None
+
+                       if (not self.slc_complete):
+                               self.slc_next_cmd(cmd)
+
+                       return True
+
+               parts = buf.split(':')
+
+               if (parts[0] == "+BRSF"):
+                       self.features = int(parts[1])
+
+               return True
+
+       def send_cmd(self, cmd):
+               if (self.pending):
+                       print("ERROR: Another command is pending")
+                       return
+
+               print("Sending: %s" % (cmd))
+
+               os.write(self.fd, cmd + "\r\n")
+               self.pending = cmd
+
+       def __init__(self, fd, version, features):
+               self.fd = fd
+               self.version = version
+               self.features = features
+
+               print("Version 0x%04x Features 0x%04x" % (version, features))
+
+               self.io_id = glib.io_add_watch(fd, glib.IO_IN, self.io_cb)
+
+               self.slc_next_cmd(None)
+
+class HfpProfile(dbus.service.Object):
+       sco_socket = None
+       io_id = 0
+       conns = {}
+
+       def sco_cb(self, sock, cond):
+               (sco, peer) = sock.accept()
+               print("New SCO connection from %s" % (peer))
+
+       def init_sco(self, sock):
+               self.sco_socket = sock
+               self.io_id = glib.io_add_watch(sock, glib.IO_IN, self.sco_cb)
+
+       def __init__(self, bus, path, sco):
+               dbus.service.Object.__init__(self, bus, path)
+
+               if sco:
+                       self.init_sco(sco)
+
+       @dbus.service.method("org.bluez.Profile1",
+                                       in_signature="", out_signature="")
+       def Release(self):
+               print("Release")
+               mainloop.quit()
+
+       @dbus.service.method("org.bluez.Profile1",
+                                       in_signature="", out_signature="")
+       def Cancel(self):
+               print("Cancel")
+
+       @dbus.service.method("org.bluez.Profile1",
+                               in_signature="oha{sv}", out_signature="")
+       def NewConnection(self, path, fd, properties):
+               fd = fd.take()
+               version = 0x0105
+               features = 0
+               print("NewConnection(%s, %d)" % (path, fd))
+               for key in properties.keys():
+                       if key == "Version":
+                               version = properties[key]
+                       elif key == "Features":
+                               features = properties[key]
+
+               conn = HfpConnection(fd, version, features)
+
+               self.conns[path] = conn
+
+if __name__ == '__main__':
+       dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
+
+       bus = dbus.SystemBus()
+
+       manager = dbus.Interface(bus.get_object("org.bluez",
+                               "/org/bluez"), "org.bluez.ProfileManager1")
+
+       option_list = [
+                       make_option("-p", "--path", action="store",
+                                       type="string", dest="path",
+                                       default="/bluez/test/hfp"),
+                       make_option("-n", "--name", action="store",
+                                       type="string", dest="name",
+                                       default=None),
+                       make_option("-C", "--channel", action="store",
+                                       type="int", dest="channel",
+                                       default=None),
+                       ]
+
+       parser = OptionParser(option_list=option_list)
+
+       (options, args) = parser.parse_args()
+
+       mainloop = GObject.MainLoop()
+
+       opts = {
+                       "AutoConnect" : True,
+                       "Version" : dbus.UInt16(0x0106),
+                       "Features" : dbus.UInt16(HF_FEATURES),
+               }
+
+       if (options.name):
+               opts["Name"] = options.name
+
+       if (options.channel is not None):
+               opts["Channel"] = dbus.UInt16(options.channel)
+
+       if audio_supported:
+               sco = socket(AF_BLUETOOTH, SOCK_SEQPACKET, BTPROTO_SCO)
+               sco.bind(BDADDR_ANY)
+               sco.listen()
+       else:
+               sco = None
+
+       profile = HfpProfile(bus, options.path, sco)
+
+       manager.RegisterProfile(options.path, "hfp-hf", opts)
+
+       print("Profile registered - waiting for connections")
+
+       mainloop.run()