From: Johan Hedberg Date: Mon, 19 Nov 2012 18:46:31 +0000 (+0200) Subject: test: Add simple HFP test script X-Git-Tag: android-x86-4.4-r3~12092 X-Git-Url: http://git.osdn.net/view?a=commitdiff_plain;h=2e82b4bc2f0ed281e9ab2de0bada2173871b90aa;p=android-x86%2Fexternal-bluetooth-bluez.git test: Add simple HFP test script --- diff --git a/Makefile.tools b/Makefile.tools index cf87ef530..422c57f7f 100644 --- a/Makefile.tools +++ b/Makefile.tools @@ -209,4 +209,4 @@ EXTRA_DIST += test/sap_client.py test/hsplay test/hsmicro \ test/test-health-sink test/service-record.dtd \ test/service-did.xml test/service-spp.xml test/service-opp.xml \ test/service-ftp.xml test/simple-player test/test-nap \ - test/test-heartrate test/test-alert + test/test-heartrate test/test-alert test/test-hfp diff --git a/test/test-hfp b/test/test-hfp new file mode 100755 index 000000000..9aa4ec0e2 --- /dev/null +++ b/test/test-hfp @@ -0,0 +1,234 @@ +#!/usr/bin/python + +from __future__ import absolute_import, print_function, unicode_literals + +from gi.repository import GObject + +import os +import sys +import dbus +import glib +import dbus.service +import dbus.mainloop.glib +from optparse import OptionParser, make_option +from socket import SOCK_SEQPACKET, socket + +mainloop = None +audio_supported = True + +try: + from socket import AF_BLUETOOTH, BTPROTO_SCO +except: + print("WARNING: python compiled without Bluetooth support" + " - audio will not be available") + audio_supported = False + +BUF_SIZE = 1024 + +BDADDR_ANY = '00:00:00:00:00:00' + +HF_NREC = 0x0001 +HF_3WAY = 0x0002 +HF_CLI = 0x0004 +HF_VOICE_RECOGNITION = 0x0008 +HF_REMOTE_VOL = 0x0010 +HF_ENHANCED_STATUS = 0x0020 +HF_ENHANCED_CONTROL = 0x0040 +HF_CODEC_NEGOTIATION = 0x0080 + +AG_3WAY = 0x0001 +AG_NREC = 0x0002 +AG_VOICE_RECOGNITION = 0x0004 +AG_INBAND_RING = 0x0008 +AG_VOICE_TAG = 0x0010 +AG_REJECT_CALL = 0x0020 +AG_ENHANCED_STATUS = 0x0040 +AG_ENHANCED_CONTROL = 0x0080 +AG_EXTENDED_RESULT = 0x0100 +AG_CODEC_NEGOTIATION = 0x0200 + +HF_FEATURES = (HF_3WAY | HF_CLI | HF_VOICE_RECOGNITION | + HF_REMOTE_VOL | HF_ENHANCED_STATUS | + HF_ENHANCED_CONTROL | HF_CODEC_NEGOTIATION) + +AVAIL_CODECS = "1,2" + +class HfpConnection: + slc_complete = False + fd = None + io_id = 0 + version = 0 + features = 0 + pending = None + + def slc_completed(self): + print("SLC establisment complete") + self.slc_complete = True + + def slc_next_cmd(self, cmd): + if not cmd: + self.send_cmd("AT+BRSF=%u" % (HF_FEATURES)) + elif (cmd.startswith("AT+BRSF")): + if (self.features & AG_CODEC_NEGOTIATION and + HF_FEATURES & HF_CODEC_NEGOTIATION): + self.send_cmd("AT+BAC=%s" % (AVAIL_CODECS)) + else: + self.send_cmd("AT+CIND=?") + elif (cmd.startswith("AT+BAC")): + self.send_cmd("AT+CIND=?") + elif (cmd.startswith("AT+CIND=?")): + self.send_cmd("AT+CIND?") + elif (cmd.startswith("AT+CIND?")): + self.send_cmd("AT+CMER=3,0,0,1") + elif (cmd.startswith("AT+CMER=")): + if (HF_FEATURES & HF_3WAY and self.features & AG_3WAY): + self.send_cmd("AT+CHLD=?") + else: + self.slc_completed() + elif (cmd.startswith("AT+CHLD=?")): + self.slc_completed() + else: + print("Unknown SLC command completed: %s" % (cmd)) + + def io_cb(self, fd, cond): + buf = os.read(fd, BUF_SIZE) + buf = buf.strip() + + print("Received: %s" % (buf)) + + if (buf == "OK" or buf == "ERROR"): + cmd = self.pending + self.pending = None + + if (not self.slc_complete): + self.slc_next_cmd(cmd) + + return True + + parts = buf.split(':') + + if (parts[0] == "+BRSF"): + self.features = int(parts[1]) + + return True + + def send_cmd(self, cmd): + if (self.pending): + print("ERROR: Another command is pending") + return + + print("Sending: %s" % (cmd)) + + os.write(self.fd, cmd + "\r\n") + self.pending = cmd + + def __init__(self, fd, version, features): + self.fd = fd + self.version = version + self.features = features + + print("Version 0x%04x Features 0x%04x" % (version, features)) + + self.io_id = glib.io_add_watch(fd, glib.IO_IN, self.io_cb) + + self.slc_next_cmd(None) + +class HfpProfile(dbus.service.Object): + sco_socket = None + io_id = 0 + conns = {} + + def sco_cb(self, sock, cond): + (sco, peer) = sock.accept() + print("New SCO connection from %s" % (peer)) + + def init_sco(self, sock): + self.sco_socket = sock + self.io_id = glib.io_add_watch(sock, glib.IO_IN, self.sco_cb) + + def __init__(self, bus, path, sco): + dbus.service.Object.__init__(self, bus, path) + + if sco: + self.init_sco(sco) + + @dbus.service.method("org.bluez.Profile1", + in_signature="", out_signature="") + def Release(self): + print("Release") + mainloop.quit() + + @dbus.service.method("org.bluez.Profile1", + in_signature="", out_signature="") + def Cancel(self): + print("Cancel") + + @dbus.service.method("org.bluez.Profile1", + in_signature="oha{sv}", out_signature="") + def NewConnection(self, path, fd, properties): + fd = fd.take() + version = 0x0105 + features = 0 + print("NewConnection(%s, %d)" % (path, fd)) + for key in properties.keys(): + if key == "Version": + version = properties[key] + elif key == "Features": + features = properties[key] + + conn = HfpConnection(fd, version, features) + + self.conns[path] = conn + +if __name__ == '__main__': + dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) + + bus = dbus.SystemBus() + + manager = dbus.Interface(bus.get_object("org.bluez", + "/org/bluez"), "org.bluez.ProfileManager1") + + option_list = [ + make_option("-p", "--path", action="store", + type="string", dest="path", + default="/bluez/test/hfp"), + make_option("-n", "--name", action="store", + type="string", dest="name", + default=None), + make_option("-C", "--channel", action="store", + type="int", dest="channel", + default=None), + ] + + parser = OptionParser(option_list=option_list) + + (options, args) = parser.parse_args() + + mainloop = GObject.MainLoop() + + opts = { + "AutoConnect" : True, + "Version" : dbus.UInt16(0x0106), + "Features" : dbus.UInt16(HF_FEATURES), + } + + if (options.name): + opts["Name"] = options.name + + if (options.channel is not None): + opts["Channel"] = dbus.UInt16(options.channel) + + if audio_supported: + sco = socket(AF_BLUETOOTH, SOCK_SEQPACKET, BTPROTO_SCO) + sco.bind(BDADDR_ANY) + sco.listen() + else: + sco = None + + profile = HfpProfile(bus, options.path, sco) + + manager.RegisterProfile(options.path, "hfp-hf", opts) + + print("Profile registered - waiting for connections") + + mainloop.run()