OSDN Git Service

RootCanal: Add more scripts
authorMyles Watson <mylesgw@google.com>
Thu, 7 Feb 2019 23:21:25 +0000 (15:21 -0800)
committerHansong Zhang <hsz@google.com>
Thu, 28 Mar 2019 22:01:10 +0000 (15:01 -0700)
Test: Connect to HCI sockets and LinkLayer sockets
Change-Id: Ice4cee21f295bbb3a3fe35cfe44073028e65d811

vendor_libs/test_vendor_lib/scripts/hci_socket.py [new file with mode: 0644]
vendor_libs/test_vendor_lib/scripts/link_layer_socket.py [new file with mode: 0644]
vendor_libs/test_vendor_lib/scripts/send_simple_commands.py [new file with mode: 0644]
vendor_libs/test_vendor_lib/scripts/simple_link_layer_socket.py [new file with mode: 0644]
vendor_libs/test_vendor_lib/scripts/simple_stack.py [new file with mode: 0644]

diff --git a/vendor_libs/test_vendor_lib/scripts/hci_socket.py b/vendor_libs/test_vendor_lib/scripts/hci_socket.py
new file mode 100644 (file)
index 0000000..0c425af
--- /dev/null
@@ -0,0 +1,441 @@
+#
+# Copyright 2015 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Script for sending data to a port.
+
+This script provides a simple shell interface for sending data at run-time to a
+port.
+
+Usage:
+    1. Choose a port to use. Use 'adb forward tcp:<port>
+    tcp:<port>' to forward the port to the device.
+    2. In a separate shell, build and push the test vendor library to the device
+    using the script mentioned in option A (i.e. without the --test-channel flag
+    set).
+    3. Once logcat has started, turn Bluetooth on from the device.
+    4. Run this program, in the shell from step 1,  the port, also from step 1,
+    as arguments.
+
+    scapy is the tool we use to build packets in Python.
+
+    >>> d = HCI_Hdr(type=1) / HCI_Command_Hdr(opcode = 0x1004) /
+    Raw(load='\x01')
+    >>> print(d)
+    <HCI_Hdr  type=Command |<HCI_Command_Hdr  opcode=0x1004 |<Raw  load='\x01'
+    |>>>
+    >>> raw(d)
+    '\x01\x04\x10\x01\x01'
+    >>> hexdump(d)
+    0000  0104100101                       .....
+
+
+    >>> pkt = HCI_Hdr('\x02\x02\x20\x0a\x00\x06\x00\x01\x00') /
+    L2CAP_CmdHdr(code=10, id=2, len=2) /L2CAP_InfoReq(type=2)
+    >>> pkt
+    <HCI_Hdr  type=ACL Data |<HCI_ACL_Hdr  handle=2 PB=0 BC=2 len=10 |<L2CAP_Hdr
+    len=6 cid=control |<L2CAP_CmdHdr  code=info_req id=2 len=2 |<L2CAP_InfoReq
+    type=FEAT_MASK |>>>>>
+    >>> pkt = HCI_Hdr(type='ACL Data') / HCI_ACL_Hdr(handle=2, PB=0, BC=2,
+    len=10) / L2CAP_Hdr(len=6, cid='control') / L2CAP_CmdHdr(code='info_req',
+    id=2, len=2) / L2CAP_InfoReq(type='FEAT_MASK')
+    >>> raw(pkt)
+    '\x02\x02 \n\x00\x06\x00\x01\x00\n\x02\x02\x00\x02\x00'
+    >>> hexdump(pkt)
+    0000  0202200A00060001000A0202000200   .. ............
+
+
+"""
+
+#!/usr/bin/env python
+
+import binascii
+import cmd
+import queue
+import random
+import socket
+import string
+import struct
+import sys
+from scapy.all import *
+""" Add some more SCAPY stuff"""
+
+
+class HCI_Cmd_Create_Connection(Packet):
+  name = 'Create Connection'
+  fields_desc = [
+      LEMACField('addr', None),
+      LEShortField('packet_type', 8),
+      ByteEnumField('page_scan_repetition_mode', 0, {
+          0: 'R0',
+          1: 'R1',
+          2: 'R2'
+      }),
+      ByteEnumField('rsvd', 0, {0: 'Reserved'}),
+      LEShortField('clock_offset', 0),
+      ByteEnumField('allow_role_switch', 1, {
+          0: 'false',
+          1: 'true'
+      }),
+  ]
+
+
+class HCI_Cmd_Inquiry(Packet):
+  name = 'Inquiry'
+  fields_desc = [
+      X3BytesField('LAP', 0x9e8b0),
+      ByteField('length', 1),
+      ByteField('max_responses', 0),
+  ]
+
+
+bind_layers(HCI_Command_Hdr, HCI_Cmd_Inquiry, opcode=0x0401)
+bind_layers(HCI_Command_Hdr, HCI_Cmd_Create_Connection, opcode=0x0405)
+
+
+class HCI_Event_Inquiry_Result(Packet):
+  name = 'Inquiry Result'
+  fields_desc = [
+      ByteField('num_responses', 0),
+      LEMACField('addr', None),
+      ByteEnumField('page_scan_repetition_mode', 0, {
+          0: 'R0',
+          1: 'R1',
+          2: 'R2'
+      }),
+      LEShortEnumField('rsvd', 0, {0: 'Reserved'}),
+      X3BytesField('class_of_device', 0),
+      LEShortField('clock_offset', 0),
+  ]
+
+
+class HCI_Event_Connection_Complete(Packet):
+  name = 'Connection Complete'
+  fields_desc = [
+      ByteField('status', 0),
+      LEShortField('handle', 0xffff),
+      LEMACField('addr', None),
+      ByteField('link_type', 1),
+      ByteField('encryption_mode', 0),
+  ]
+
+
+class HCI_Event_Remote_Name_Request_Complete(Packet):
+  name = 'Remote Name Request Complete'
+  fields_desc = [
+      ByteField('status', 0),
+      LEMACField('addr', None),
+  ]
+
+
+class HCI_Event_Read_Remote_Supported_Features_Complete(Packet):
+  name = 'Read Remote Supported Features Complete'
+  fields_desc = [
+      ByteField('status', 0),
+      LEShortField('handle', 0xffff),
+      XLELongField('features', 0x0123456789abcdef),
+  ]
+
+
+class HCI_Event_Read_Remote_Version_Information_Complete(Packet):
+  name = 'Read Remote Version Information Complete'
+  fields_desc = [
+      ByteField('status', 0),
+      LEShortField('handle', 0xffff),
+      ByteField('version', 0),
+      LEShortField('manufacturer_name', 0),
+      LEShortField('subversion', 0),
+  ]
+
+
+class HCI_Event_Read_Clock_Offset_Complete(Packet):
+  name = 'Read Clock Offset Complete'
+  fields_desc = [
+      ByteField('status', 0),
+      LEShortField('handle', 0xffff),
+      LEShortField('offset', 0xffff),
+  ]
+
+
+class HCI_Event_Read_Remote_Extended_Features_Complete(Packet):
+  name = 'Read Remote Supported Features Complete'
+  fields_desc = [
+      ByteField('status', 0),
+      LEShortField('handle', 0xffff),
+      ByteField('page_number', 0),
+      ByteField('max_page_number', 0),
+      XLELongField('features', 0x0123456789abcdef),
+  ]
+
+
+class HCI_Event_Extended_Inquiry_Result(Packet):
+  name = 'Extended Inquiry Result'
+  fields_desc = [
+      ByteField('num_responses', 1),
+      LEMACField('addr', None),
+      ByteEnumField('page_scan_repetition_mode', 0, {
+          0: 'R0',
+          1: 'R1',
+          2: 'R2'
+      }),
+      ByteEnumField('rsvd', 0, {0: 'Reserved'}),
+      X3BytesField('class_of_device', 0),
+      LEShortField('clock_offset', 0),
+      SignedByteField('rssi', -20),
+      PacketListField('extended_inquiry_response', [], EIR_Hdr, 1),
+  ]
+
+
+bind_layers(HCI_Event_Hdr, HCI_Event_Inquiry_Result, code=0x02)
+bind_layers(HCI_Event_Hdr, HCI_Event_Connection_Complete, code=0x03)
+bind_layers(HCI_Event_Hdr, HCI_Event_Remote_Name_Request_Complete, code=0x07)
+bind_layers(
+    HCI_Event_Hdr, HCI_Event_Read_Remote_Supported_Features_Complete, code=0x0b)
+bind_layers(
+    HCI_Event_Hdr,
+    HCI_Event_Read_Remote_Version_Information_Complete,
+    code=0x0c)
+bind_layers(HCI_Event_Hdr, HCI_Event_Read_Clock_Offset_Complete, code=0x1c)
+bind_layers(
+    HCI_Event_Hdr, HCI_Event_Read_Remote_Extended_Features_Complete, code=0x23)
+bind_layers(HCI_Event_Hdr, HCI_Event_Extended_Inquiry_Result, code=0x2f)
+""" END SCAPY stuff"""
+
+
+class HCISocket(SuperSocket):
+  """Simple wrapper class for a socket object.
+
+  Attributes:
+    socket: The underlying socket created for the specified address and port.
+  """
+
+  def __init__(self, port):
+    self.done_ = False
+    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+    s.connect(('localhost', port))
+    self.ins = self.outs = s
+    self.packets_ = queue.Queue()
+    self.rx_thread_ = threading.Thread(target=self.rx_thread_body)
+    self.rx_thread_.start()
+
+  def rx_bytes(self, size):
+    while not self.done_:
+      raw_bytes = b''
+      while len(raw_bytes) < size and not self.done_:
+        more_raw_bytes = self.ins.recv(min(size - len(raw_bytes), 2048))
+        if more_raw_bytes:
+          raw_bytes += more_raw_bytes
+      return raw_bytes
+
+  def rx_thread_body(self):
+    while not self.done_:
+      payload_length = 0
+      # Read the type
+      type_byte = self.rx_bytes(1)
+      if not type_byte:
+        continue
+      # Read the Header
+      header = b''
+      if type_byte == b'\x01':  # Command
+        header = self.rx_bytes(3)
+        if not header:
+          continue
+        payload_length = header[2]
+      elif type_byte == b'\x02':  # ACL
+        header = self.rx_bytes(4)
+        if not header:
+          continue
+        payload_length = header[3] << 8
+        payload_length |= header[2]
+      elif type_byte == b'\x03':  # SCO
+        header = self.rx_bytes(3)
+        if not header:
+          continue
+        payload_length = header[2]
+      elif type_byte == b'\x04':  # Event
+        header = self.rx_bytes(2)
+        if not header:
+          continue
+        payload_length = header[1]
+      else:
+        self.done_ = True
+        print('Rx: type_byte ' + hex(type_byte[0]))
+      # Read the Payload
+      payload = self.rx_bytes(payload_length) if payload_length != 0 else b''
+      packet_bytes = type_byte + header + payload
+      packet = HCI_Hdr(packet_bytes)
+      print('Rx: ' + packet.__repr__())
+      self.packets_.put(packet)
+
+  def get_packet(self):
+    if self.packets_.empty():
+      return False
+    return self.packets_.get()
+
+  def tell_rx_thread_to_quit(self):
+    self.done_ = True
+    self.rx_thread_.join()
+
+
+class HCIShell(cmd.Cmd):
+  """Shell for sending binary data to a port.
+
+  """
+
+  def __init__(self, hci):
+    cmd.Cmd.__init__(self)
+    self._hci = hci
+
+  def do_send(self, args):
+    """Arguments: dev_type_str Add a new device of type dev_type_str.
+
+    """
+    self._hci.send_binary(args.split())
+
+  def do_connect(self, args):
+    """Arguments: bluetooth_address xx:xx:xx:xx:xx:xx, timeout (seconds)
+
+    """
+    split_args = args.split()
+    address = split_args[0] if len(split_args) > 0 else 'NULL'
+    timeout = int(split_args[1]) if len(split_args) > 1 else 2
+    num_responses = 0
+    connect = HCI_Hdr(type='Command') / HCI_Command_Hdr(
+        opcode=0x0405) / HCI_Cmd_Create_Connection(addr=address)
+    self._hci.send(connect)
+    status = None
+    while status == None:
+      response = self._hci.get_packet()
+      if response == False:
+        continue
+      if response[HCI_Hdr].type == HCI_Hdr(
+          type='Event'
+      ).type and response[HCI_Event_Hdr].code == 0xf and response[
+          HCI_Event_Command_Status].opcode == connect[HCI_Command_Hdr].opcode:
+        status = response[HCI_Event_Command_Status].status
+    if status != HCI_Event_Command_Status(status='pending').status:
+      print('Connection failed with status = ' + str(status))
+      return
+
+    handle = None
+    while handle == None:
+      connection_complete = self._hci.get_packet()
+      if connection_complete == False:
+        continue
+      if (connection_complete[HCI_Hdr].type == HCI_Hdr(type='Event').type) and (
+          connection_complete[HCI_Event_Hdr].code == 0x3):
+        status = connection_complete[HCI_Event_Connection_Complete].status
+        if status != 0:
+          print('Connection complete with failed status = ' + str(status))
+          return
+        handle = connection_complete[HCI_Event_Connection_Complete].handle
+        print('Connection established with handle ' + str(handle))
+        connection_complete.show()
+        hexdump(connection_complete)
+
+    l2cap_done = False
+    while l2cap_done == None:
+      l2cap_req = self._hci.get_packet()
+      if l2cap_req == False:
+        continue
+      if (l2cap_req[HCI_Hdr].type == HCI_Hdr(type='ACL Data').type) and (
+          l2cap_req[L2CAP_Hdr].cid == L2CAP_Hdr(cid='control').cid) and (
+              l2cap_req[L2CAP_CmdHdr].code == L2CAP_CmdHdr(code='info_req').code
+          ) and (l2cap_req[L2CAP_InfoReq].type == L2CAP_InfoReq(
+              type='FEAT_MASK').type):
+        print('Send Features packet' + HCI_Hdr(type='ACL Data') / HCI_ACL_Hdr(
+            handle=l2cap_req[HCI_ACL_Hdr].handle, PB=0, BC=2, len=16) /
+              L2CAP_Hdr(len=12, cid='control') /
+              L2CAP_CmdHdr(code='info_resp', id=146, len=8) / L2CAP_InfoResp(
+                  type=l2cap_req[L2CAP_InfoResp].type,
+                  result='success',
+                  data=b'\xb8\x00\x00\x00').__repr__())
+        self._hci.send(
+            HCI_Hdr(type='ACL Data') / HCI_ACL_Hdr(
+                handle=l2cap_req[HCI_ACL_Hdr].handle, PB=0, BC=2, len=16) /
+            L2CAP_Hdr(len=12, cid='control') / L2CAP_CmdHdr(
+                code='info_resp', id=146, len=8) / L2CAP_InfoResp(
+                    type=l2cap_req[L2CAP_InfoResp].type,
+                    result='success',
+                    data=b'\xb8\x00\x00\x00'))
+
+  def do_le_scan(self, args):
+    """Arguments: enable (0 or 1), filter duplicates (0 or 1) Print the scan responses from reachable devices
+
+    """
+    split_args = args.split()
+    enable = int(split_args[0]) if len(split_args) > 0 else 1
+    filter_dups = int(split_args[1]) if len(split_args) > 1 else 1
+    set_scan_enable = HCI_Hdr(type=1) / HCI_Command_Hdr(
+        opcode=0x200c) / HCI_Cmd_LE_Set_Scan_Enable(
+            enable=enable, filter_dups=filter_dups)
+    print('Tx: ' + set_scan_enable.__repr__())
+    self._hci.send(set_scan_enable)
+
+  def do_scan(self, args):
+    """Arguments: timeout (seconds), max_results Print the scan responses from reachable devices
+
+    """
+    split_args = args.split()
+    scan_time = int(split_args[0]) if len(split_args) > 0 else 0
+    max_responses = int(split_args[1]) if len(split_args) > 1 else 0
+    num_responses = 0
+    inquiry = HCI_Hdr(type='Command') / HCI_Command_Hdr(
+        opcode=0x0401) / HCI_Cmd_Inquiry(
+            length=scan_time, max_responses=max_responses)
+    print('Tx: ' + inquiry.__repr__())
+    self._hci.send(inquiry)
+
+  def do_quit(self, args):
+    """Arguments: None.
+
+    Exits.
+    """
+    self._hci.tell_rx_thread_to_quit()
+    self._hci.close()
+    print('Goodbye.')
+    return True
+
+  def do_help(self, args):
+    """Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr.
+
+    """
+    if (len(args) == 0):
+      cmd.Cmd.do_help(self, args)
+
+
+def main(argv):
+  if len(argv) != 2:
+    print('Usage: python hci_socket.py [port]')
+    return
+  try:
+    port = int(argv[1])
+  except ValueError:
+    print('Error parsing port.')
+  else:
+    try:
+      hci = HCISocket(port)
+    except socket.error as e:
+      print('Error connecting to socket: %s' % e)
+    except:
+      print('Error creating (check arguments).')
+    else:
+      hci_shell = HCIShell(hci)
+      hci_shell.prompt = '$ '
+      hci_shell.cmdloop('Welcome to the RootCanal HCI Console \n' +
+                        'Type \'help\' for more information.')
+
+
+if __name__ == '__main__':
+  main(sys.argv)
diff --git a/vendor_libs/test_vendor_lib/scripts/link_layer_socket.py b/vendor_libs/test_vendor_lib/scripts/link_layer_socket.py
new file mode 100644 (file)
index 0000000..c7b3045
--- /dev/null
@@ -0,0 +1,192 @@
+#
+# Copyright 2015 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Script for sending data to a port.
+
+This script provides a simple shell interface for sending data at run-time to a
+port.
+
+Usage:
+    1. Choose a port to use. Use 'adb forward tcp:<port>
+    tcp:<port>' to forward the port to the device.
+    2. In a separate shell, build and push the test vendor library to the device
+    using the script mentioned in option A (i.e. without the --test-channel flag
+    set).
+    3. Once logcat has started, turn Bluetooth on from the device.
+    4. Run this program, in the shell from step 1,  the port, also from step 1,
+    as arguments.
+
+    scapy is the tool we use to build packets in Python.
+
+    >>> d = HCI_Hdr(type=1) / HCI_Command_Hdr(opcode = 0x1004) /
+    Raw(load='\x01')
+    >>> print(d)
+    <HCI_Hdr  type=Command |<HCI_Command_Hdr  opcode=0x1004 |<Raw  load='\x01'
+    |>>>
+    >>> raw(d)
+    '\x01\x04\x10\x01\x01'
+    >>> hexdump(d)
+    0000  0104100101                       .....
+
+
+    >>> pkt = HCI_Hdr('\x02\x02\x20\x0a\x00\x06\x00\x01\x00') /
+    L2CAP_CmdHdr(code=10, id=2, len=2) /L2CAP_InfoReq(type=2)
+    >>> pkt
+    <HCI_Hdr  type=ACL Data |<HCI_ACL_Hdr  handle=2 PB=0 BC=2 len=10 |<L2CAP_Hdr
+    len=6 cid=control |<L2CAP_CmdHdr  code=info_req id=2 len=2 |<L2CAP_InfoReq
+    type=FEAT_MASK |>>>>>
+    >>> pkt = HCI_Hdr(type='ACL Data') / HCI_ACL_Hdr(handle=2, PB=0, BC=2,
+    len=10) / L2CAP_Hdr(len=6, cid='control') / L2CAP_CmdHdr(code='info_req',
+    id=2, len=2) / L2CAP_InfoReq(type='FEAT_MASK')
+    >>> raw(pkt)
+    '\x02\x02 \n\x00\x06\x00\x01\x00\n\x02\x02\x00\x02\x00'
+    >>> hexdump(pkt)
+    0000  0202200A00060001000A0202000200   .. ............
+
+
+"""
+
+#!/usr/bin/env python
+
+import binascii
+import cmd
+import queue
+import random
+import socket
+import string
+import struct
+import sys
+
+class LinkLayerSocket(object):
+  """Simple wrapper class for a socket object.
+
+  Attributes:
+    socket: The underlying socket created for the specified address and port.
+  """
+
+  def __init__(self, port):
+    print('port = ' + port)
+    self.done_ = False
+    self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+    self._socket.connect(('localhost', port))
+    # Should it be a non-blocking socket?
+    # self._socket.setblocking(0)
+    self.packets_ = queue.Queue()
+    self.rx_thread_ = threading.Thread(target=self.rx_thread_body)
+    self.rx_thread_.start()
+
+  def rx_bytes(self, size):
+    while not self.done_:
+      raw_bytes = b''
+      while len(raw_bytes) < size and not self.done_:
+        more_raw_bytes = self._socket.recv(min(size - len(raw_bytes), 2048))
+        if more_raw_bytes:
+          raw_bytes += more_raw_bytes
+      return raw_bytes
+
+  def rx_thread_body(self):
+    while not self.done_:
+      payload_length = 0
+      # Read the size (4B), the type (1B), and the addresses (2*6B)
+      header = self.rx_bytes(17)
+      if not header:
+        continue
+      payload_length = header[0]
+      payload_length |= header[1] << 8
+      payload_length |= header[2] << 16
+      payload_length |= header[3] << 24
+      print('Rx: type_byte ' + hex(header[4]))
+      print('Rx: from ' + hex(header[5]) + ':' + hex(header[6]) + ':' + hex(header[7]) + ':' + hex(header[8]) + ':' + hex(header[9]) + ':' + hex(header[10]))
+      print('Rx: to ' + hex(header[11]) + ':' + hex(header[12]) + ':' + hex(header[13]) + ':' + hex(header[14]) + ':' + hex(header[15]) + ':' + hex(header[16]))
+      # Read the Payload
+      payload = self.rx_bytes(payload_length) if payload_length != 0 else b''
+      packet_bytes = header + payload
+      self.packets_.put(packet_bytes)
+
+  def get_packet(self):
+    if self.packets_.empty():
+      return False
+    return self.packets_.get()
+
+  def send_binary(self, args):
+    joined_args = ''.join(arg for arg in args)
+    print(joined_args)
+    packet = binascii.a2b_hex(joined_args)
+    if self._done:
+      return
+    self._connection.send(packet)
+
+  def tell_rx_thread_to_quit(self):
+    self.done_ = True
+    self.rx_thread_.join()
+
+
+class LinkLayerShell(cmd.Cmd):
+  """Shell for sending binary data to a port.
+
+  """
+
+  def __init__(self, link_layer):
+    cmd.Cmd.__init__(self)
+    self._link_layer = link_layer
+
+  def do_send(self, args):
+    """Arguments: binary representation of a packet.
+
+    """
+    self._link_layer.send_binary(args.split())
+
+  def do_quit(self, args):
+    """Arguments: None.
+
+    Exits.
+    """
+    self._link_layer.tell_rx_thread_to_quit()
+    self._link_layer.close()
+    print('Goodbye.')
+    return True
+
+  def do_help(self, args):
+    """Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr.
+
+    """
+    if (len(args) == 0):
+      cmd.Cmd.do_help(self, args)
+
+
+def main(argv):
+  if len(argv) != 2:
+    print('Usage: python link_layer_socket.py [port]')
+    return
+  try:
+    port = int(argv[1])
+  except ValueError:
+    print('Error parsing port.')
+  else:
+    try:
+      link_layer = LinkLayerSocket(port)
+    except socket.error as e:
+      print('Error connecting to socket: %s' % e)
+    except:
+      print('Error creating (check arguments).')
+    else:
+      link_layer_shell = LinkLayerShell(link_layer)
+      link_layer_shell.prompt = '$ '
+      link_layer_shell.cmdloop('Welcome to the RootCanal LinkLayer Console \n' +
+                        'Type \'help\' for more information.')
+
+
+if __name__ == '__main__':
+  main(sys.argv)
diff --git a/vendor_libs/test_vendor_lib/scripts/send_simple_commands.py b/vendor_libs/test_vendor_lib/scripts/send_simple_commands.py
new file mode 100644 (file)
index 0000000..cadeab3
--- /dev/null
@@ -0,0 +1,220 @@
+#
+# Copyright 2015 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Script for sending data to a port.
+
+This script provides a simple shell interface for sending data at run-time to a
+port.
+
+Usage:
+    1. Choose a port to use. Use 'adb forward tcp:<port>
+    tcp:<port>' to forward the port to the device.
+    2. In a separate shell, build and push the test vendor library to the device
+    using the script mentioned in option A (i.e. without the --test-channel flag
+    set).
+    3. Once logcat has started, turn Bluetooth on from the device.
+    4. Run this program, in the shell from step 1,  the port, also from step 1,
+    as arguments.
+
+    scapy is the tool we use to build packets in Python.
+
+    >>> d = HCI_Hdr(type=1) / HCI_Command_Hdr(opcode = 0x1004) /
+    Raw(load='\x01')
+    >>> print(d)
+    <HCI_Hdr  type=Command |<HCI_Command_Hdr  opcode=0x1004 |<Raw  load='\x01'
+    |>>>
+    >>> raw(d)
+    '\x01\x04\x10\x01\x01'
+    >>> hexdump(d)
+    0000  0104100101                       .....
+
+
+    >>> pkt = HCI_Hdr('\x02\x02\x20\x0a\x00\x06\x00\x01\x00') /
+    L2CAP_CmdHdr(code=10, id=2, len=2) /L2CAP_InfoReq(type=2)
+    >>> pkt
+    <HCI_Hdr  type=ACL Data |<HCI_ACL_Hdr  handle=2 PB=0 BC=2 len=10 |<L2CAP_Hdr
+    len=6 cid=control |<L2CAP_CmdHdr  code=info_req id=2 len=2 |<L2CAP_InfoReq
+    type=FEAT_MASK |>>>>>
+    >>> pkt = HCI_Hdr(type="ACL Data") / HCI_ACL_Hdr(handle=2, PB=0, BC=2,
+    len=10) / L2CAP_Hdr(len=6, cid="control") / L2CAP_CmdHdr(code="info_req",
+    id=2, len=2) / L2CAP_InfoReq(type="FEAT_MASK")
+    >>> raw(pkt)
+    '\x02\x02 \n\x00\x06\x00\x01\x00\n\x02\x02\x00\x02\x00'
+    >>> hexdump(pkt)
+    0000  0202200A00060001000A0202000200   .. ............
+
+
+"""
+
+#!/usr/bin/env python
+
+import binascii
+import cmd
+import random
+import socket
+import string
+import struct
+import sys
+from scapy.all import *
+
+DEVICE_NAME_LENGTH = 6
+DEVICE_ADDRESS_LENGTH = 6
+
+
+# Used to generate fake device names and addresses during discovery.
+def generate_random_name():
+  return ''.join(random.SystemRandom().choice(string.ascii_uppercase + \
+    string.digits) for _ in range(DEVICE_NAME_LENGTH))
+
+
+def generate_random_address():
+  return ''.join(random.SystemRandom().choice(string.digits) for _ in \
+    range(DEVICE_ADDRESS_LENGTH))
+
+
+class Connection(object):
+  """Simple wrapper class for a socket object.
+
+  Attributes:
+    socket: The underlying socket created for the specified address and port.
+  """
+
+  def __init__(self, port):
+    self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+    self._socket.connect(('localhost', port))
+    self._socket.setblocking(0)
+
+  def close(self):
+    self._socket.close()
+
+  def send(self, data):
+    self._socket.sendall(data)
+
+  def receive(self, size):
+    return self._socket.recv(size)
+
+
+class RawPort(object):
+  """Checks outgoing commands and sends them once verified.
+
+  Attributes:
+    connection: The connection to the HCI port.
+  """
+
+  def __init__(self, port):
+    self._connection = Connection(port)
+    self._closed = False
+
+  def close(self):
+    self._connection.close()
+    self._closed = True
+
+  def send_binary(self, args):
+    joined_args = ''.join(arg for arg in args)
+    print(joined_args)
+    packet = binascii.a2b_hex(joined_args)
+    if self._closed:
+      return
+    self._connection.send(packet)
+    received = self.receive_response()
+    received_bytes = bytearray(received)
+    print(raw(received_bytes))
+
+  def receive_response(self):
+    if self._closed:
+      return
+    size_chars = self._connection.receive(4)
+    if not size_chars:
+      print('Debug: No response')
+      return False
+    size_bytes = bytearray(size_chars)
+    response_size = 0
+    for i in range(0, len(size_chars) - 1):
+      response_size |= ord(size_chars[i]) << (8 * i)
+    response = self._connection.receive(response_size)
+    return response
+
+  def lint_command(self, name, args, name_size, args_size):
+    assert name_size == len(name) and args_size == len(args)
+    try:
+      name.encode('utf-8')
+      for arg in args:
+        arg.encode('utf-8')
+    except UnicodeError:
+      print('Unrecognized characters.')
+      raise
+    if name_size > 255 or args_size > 255:
+      raise ValueError  # Size must be encodable in one octet.
+    for arg in args:
+      if len(arg) > 255:
+        raise ValueError  # Size must be encodable in one octet.
+
+
+class RawPortShell(cmd.Cmd):
+  """Shell for sending binary data to a port.
+
+  """
+
+  def __init__(self, raw_port):
+    cmd.Cmd.__init__(self)
+    self._raw_port = raw_port
+
+  def do_send(self, args):
+    """Arguments: dev_type_str Add a new device of type dev_type_str.
+
+    """
+    self._raw_port.send_binary(args.split())
+
+  def do_quit(self, args):
+    """Arguments: None.
+
+    Exits.
+    """
+    self._raw_port.close()
+    print('Goodbye.')
+    return True
+
+  def do_help(self, args):
+    """Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr.
+
+    """
+    if (len(args) == 0):
+      cmd.Cmd.do_help(self, args)
+
+
+def main(argv):
+  if len(argv) != 2:
+    print('Usage: python raw_port.py [port]')
+    return
+  try:
+    port = int(argv[1])
+  except ValueError:
+    print('Error parsing port.')
+  else:
+    try:
+      raw_port = RawPort(port)
+    except (socket.error, e):
+      print('Error connecting to socket: %s' % e)
+    except:
+      print('Error creating (check arguments).')
+    else:
+      raw_port_shell = RawPortShell(raw_port)
+      raw_port_shell.prompt = '$ '
+      raw_port_shell.cmdloop('Welcome to the RootCanal Console \n' +
+                             'Type \'help\' for more information.')
+
+
+if __name__ == '__main__':
+  main(sys.argv)
diff --git a/vendor_libs/test_vendor_lib/scripts/simple_link_layer_socket.py b/vendor_libs/test_vendor_lib/scripts/simple_link_layer_socket.py
new file mode 100644 (file)
index 0000000..982aee2
--- /dev/null
@@ -0,0 +1,197 @@
+#
+# Copyright 2018 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Script for sending data to a port.
+
+This script provides a simple shell interface for sending data at run-time to a
+port.
+
+Usage:
+    1. Choose a port to use. Use 'adb forward tcp:<port>
+    tcp:<port>' to forward the port to the device.
+    2. In a separate shell, build and push the test vendor library to the device
+    using the script mentioned in option A (i.e. without the --test-channel flag
+    set).
+    3. Once logcat has started, turn Bluetooth on from the device.
+    4. Run this program, in the shell from step 1,  the port, also from step 1,
+    as arguments.
+
+
+     length   type   source    dest      addr_type  adv_type     EIR Data
+
+              adv                          public  connectable  Name (TATA)
+send 18000000 07 010203040506 000000000000   00      00         050954415441
+                                                                020106
+
+     length   adv   source    dest        public   non-conn     Name (TETE)
+send 18000000 07 010203040507 000000000000   00       03        050954455445
+                                                                020106
+
+     length scan_rsp  source    dest        public  scan_rsp    Name (TFTF)
+send 18000000 09 010203040506 4de24c67454b    00      04        050954465446
+                                                                020106
+
+"""
+
+#!/usr/bin/env python
+
+import binascii
+import cmd
+import random
+import socket
+import string
+import struct
+import sys
+
+DEVICE_NAME_LENGTH = 6
+DEVICE_ADDRESS_LENGTH = 6
+
+
+# Used to generate fake device names and addresses during discovery.
+def generate_random_name():
+  return ''.join(random.SystemRandom().choice(string.ascii_uppercase + \
+    string.digits) for _ in range(DEVICE_NAME_LENGTH))
+
+
+def generate_random_address():
+  return ''.join(random.SystemRandom().choice(string.digits) for _ in \
+    range(DEVICE_ADDRESS_LENGTH))
+
+
+class Connection(object):
+  """Simple wrapper class for a socket object.
+
+  Attributes:
+    socket: The underlying socket created for the specified address and port.
+  """
+
+  def __init__(self, port):
+    self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+    self._socket.connect(('localhost', port))
+    self._socket.setblocking(0)
+
+  def close(self):
+    self._socket.close()
+
+  def send(self, data):
+    self._socket.sendall(data)
+
+  def receive(self, size):
+    return self._socket.recv(size)
+
+
+class RawPort(object):
+  """Checks outgoing commands and sends them once verified.
+
+  Attributes:
+    connection: The connection to the HCI port.
+  """
+
+  def __init__(self, port):
+    self._connection = Connection(port)
+    self._closed = False
+
+  def close(self):
+    self._connection.close()
+    self._closed = True
+
+  def send_binary(self, args):
+    joined_args = ''.join(arg for arg in args)
+    print(joined_args)
+    packet = binascii.a2b_hex(joined_args)
+    if self._closed:
+      return
+    self._connection.send(packet)
+
+  def receive_response(self):
+    if self._closed:
+      return
+    size_chars = self._connection.receive(4)
+    if not size_chars:
+      print('Debug: No response')
+      return False
+    size_bytes = bytearray(size_chars)
+    response_size = 0
+    for i in range(0, len(size_chars) - 1):
+      response_size |= ord(size_chars[i]) << (8 * i)
+    response = self._connection.receive(response_size)
+    return response
+
+  def lint_command(self, name, args, name_size, args_size):
+    assert name_size == len(name) and args_size == len(args)
+    try:
+      name.encode('utf-8')
+      for arg in args:
+        arg.encode('utf-8')
+    except UnicodeError:
+      print('Unrecognized characters.')
+      raise
+    if name_size > 255 or args_size > 255:
+      raise ValueError  # Size must be encodable in one octet.
+    for arg in args:
+      if len(arg) > 255:
+        raise ValueError  # Size must be encodable in one octet.
+
+
+class RawPortShell(cmd.Cmd):
+  """Shell for sending binary data to a port."""
+
+  def __init__(self, raw_port):
+    cmd.Cmd.__init__(self)
+    self._raw_port = raw_port
+
+  def do_send(self, args):
+    """Arguments: dev_type_str Add a new device of type dev_type_str."""
+    self._raw_port.send_binary(args.split())
+
+  def do_quit(self, args):
+    """Arguments: None.
+
+    Exits.
+    """
+    self._raw_port.close()
+    print('Goodbye.')
+    return True
+
+  def do_help(self, args):
+    """Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr."""
+    if (len(args) == 0):
+      cmd.Cmd.do_help(self, args)
+
+
+def main(argv):
+  if len(argv) != 2:
+    print('Usage: python raw_port.py [port]')
+    return
+  try:
+    port = int(argv[1])
+  except ValueError:
+    print('Error parsing port.')
+  else:
+    try:
+      raw_port = RawPort(port)
+    except (socket.error, e):
+      print('Error connecting to socket: %s' % e)
+    except:
+      print('Error creating (check arguments).')
+    else:
+      raw_port_shell = RawPortShell(raw_port)
+      raw_port_shell.prompt = '$ '
+      raw_port_shell.cmdloop('Welcome to the RootCanal Console \n' +
+                             'Type \'help\' for more information.')
+
+
+if __name__ == '__main__':
+  main(sys.argv)
diff --git a/vendor_libs/test_vendor_lib/scripts/simple_stack.py b/vendor_libs/test_vendor_lib/scripts/simple_stack.py
new file mode 100644 (file)
index 0000000..4f42bf2
--- /dev/null
@@ -0,0 +1,238 @@
+#
+# Copyright 2015 The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Script for sending data to a port.
+
+This script provides a simple shell interface for sending data at run-time to a
+port.
+
+Usage:
+    1. Choose a port to use. Use 'adb forward tcp:<port>
+    tcp:<port>' to forward the port to the device.
+    2. In a separate shell, build and push the test vendor library to the device
+    using the script mentioned in option A (i.e. without the --test-channel flag
+    set).
+    3. Once logcat has started, turn Bluetooth on from the device.
+    4. Run this program, in the shell from step 1,  the port, also from step 1,
+    as arguments.
+
+    scapy is the tool we use to build packets in Python.
+
+    >>> d = HCI_Hdr(type=1) / HCI_Command_Hdr(opcode = 0x1004) /
+    Raw(load='\x01')
+    >>> print(d)
+    <HCI_Hdr  type=Command |<HCI_Command_Hdr  opcode=0x1004 |<Raw  load='\x01'
+    |>>>
+    >>> raw(d)
+    '\x01\x04\x10\x01\x01'
+    >>> hexdump(d)
+    0000  0104100101                       .....
+
+
+    >>> pkt = HCI_Hdr('\x02\x02\x20\x0a\x00\x06\x00\x01\x00') /
+    L2CAP_CmdHdr(code=10, id=2, len=2) /L2CAP_InfoReq(type=2)
+    >>> pkt
+    <HCI_Hdr  type=ACL Data |<HCI_ACL_Hdr  handle=2 PB=0 BC=2 len=10 |<L2CAP_Hdr
+    len=6 cid=control |<L2CAP_CmdHdr  code=info_req id=2 len=2 |<L2CAP_InfoReq
+    type=FEAT_MASK |>>>>>
+    >>> pkt = HCI_Hdr(type="ACL Data") / HCI_ACL_Hdr(handle=2, PB=0, BC=2,
+    len=10) / L2CAP_Hdr(len=6, cid="control") / L2CAP_CmdHdr(code="info_req",
+    id=2, len=2) / L2CAP_InfoReq(type="FEAT_MASK")
+    >>> raw(pkt)
+    '\x02\x02 \n\x00\x06\x00\x01\x00\n\x02\x02\x00\x02\x00'
+    >>> hexdump(pkt)
+    0000  0202200A00060001000A0202000200   .. ............
+
+
+"""
+
+#!/usr/bin/env python
+
+import binascii
+import cmd
+import random
+import socket
+import string
+import struct
+import sys
+from scapy.all import *
+""" Add some more SCAPY stuff"""
+
+
+class HCI_Cmd_Connect(Packet):
+  name = "Connect"
+  fields_desc = [
+      ByteEnumField("filter", 0, {0: "address"}),
+      LEShortField("packet_type", 8),
+      ByteEnumField("page_scan_repetition_mode", 0, {
+          0: "R0",
+          1: "R1",
+          2: "R2"
+      }),
+      ByteEnumField("page_scan_repetition_mode", 0, {0: "Reserved"}),
+      LEShortField("clock_offset", 0),
+      ByteEnumField("allow_role_switch", 0, {
+          0: "false",
+          1: "true"
+      }),
+  ]
+
+
+class HCI_Cmd_Inquiry(Packet):
+  name = "Inquiry"
+  fields_desc = [
+      XByteField("LAP0", 0),
+      XByteField("LAP1", 0x8B),
+      XByteField("LAP2", 0x9E),
+      ByteField("length", 1),
+      ByteField("max_responses", 0),
+  ]
+
+
+""" END SCAPY stuff"""
+
+
+class Connection(object):
+  """Simple wrapper class for a socket object.
+
+  Attributes:
+    socket: The underlying socket created for the specified address and port.
+  """
+
+  def __init__(self, port):
+    self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+    self._socket.connect(("localhost", port))
+
+  def close(self):
+    self._socket.close()
+
+  def send(self, data):
+    self._socket.sendall(data)
+
+  def receive(self, size):
+    return self._socket.recv(size)
+
+
+class RawPort(object):
+  """Converts outgoing packets to binary and sends them.
+
+  Attributes:
+    connection: The connection to the HCI port.
+  """
+
+  def __init__(self, port):
+    self._connection = Connection(port)
+
+  def close(self):
+    self._connection.close()
+
+  def send_binary(self, args):
+    joined_args = "".join(arg for arg in args)
+    print(joined_args)
+    packet = binascii.a2b_hex(joined_args)
+    self._connection.send(packet)
+
+  def receive_response(self):
+    ready_to_read, ready_to_write, in_error = \
+               select.select(
+                  [ self._connection._socket ],
+                  [ ],
+                  [ self._connection._socket ],
+                  1.5)
+    if len(in_error) > 0:
+      print("Error")
+      return False
+    if len(ready_to_read) > 0:
+      print("Ready to Read")
+      type_str = self._connection.receive(512)
+      print(len(type_str))
+      print(type_str)
+      return type_str
+    print("Returning false at the end")
+    return False
+
+
+class RawPortShell(cmd.Cmd):
+  """Shell for sending binary data to a port.
+
+  """
+
+  def __init__(self, raw_port):
+    cmd.Cmd.__init__(self)
+    self._raw_port = raw_port
+
+  def do_send(self, args):
+    """Arguments: dev_type_str Add a new device of type dev_type_str.
+
+    """
+    self._raw_port.send_binary(args.split())
+
+  def do_scan(self, args):
+    """Arguments: timeout (seconds) Print the scan responses from reachable devices
+
+    """
+    self._raw_port.send_binary(args.split())
+
+  def do_quit(self, args):
+    """Arguments: None.
+
+    Exits.
+    """
+    self._raw_port.close()
+    print("Goodbye.")
+    return True
+
+  def do_help(self, args):
+    """Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr.
+
+    """
+    if (len(args) == 0):
+      cmd.Cmd.do_help(self, args)
+
+  def postcmd(self, stop, line):
+    """Called after each command stop : whether we will stop after this command line : the previous input line Return True to stop, False to continue
+
+    """
+    if stop:
+      return True
+    response = self._raw_port.receive_response()
+    print(response)
+    return False
+
+
+def main(argv):
+  if len(argv) != 2:
+    print("Usage: python raw_port.py [port]")
+    return
+  try:
+    port = int(argv[1])
+  except ValueError:
+    print("Error parsing port.")
+  else:
+    try:
+      raw_port = RawPort(port)
+    except (socket.error, e):
+      print("Error connecting to socket: %s" % e)
+    except:
+      print("Error creating (check arguments).")
+    else:
+      raw_port_shell = RawPortShell(raw_port)
+      raw_port_shell.prompt = "$ "
+      raw_port_shell.cmdloop("Welcome to the RootCanal Console \n" +
+                             'Type \'help\' for more information.')
+
+
+if __name__ == "__main__":
+  main(sys.argv)