From f835af01b1fa78f4c2772f6ab43438be7cc01763 Mon Sep 17 00:00:00 2001 From: Myles Watson Date: Thu, 7 Feb 2019 15:21:25 -0800 Subject: [PATCH] RootCanal: Add more scripts Test: Connect to HCI sockets and LinkLayer sockets Change-Id: Ice4cee21f295bbb3a3fe35cfe44073028e65d811 --- vendor_libs/test_vendor_lib/scripts/hci_socket.py | 441 +++++++++++++++++++++ .../test_vendor_lib/scripts/link_layer_socket.py | 192 +++++++++ .../scripts/send_simple_commands.py | 220 ++++++++++ .../scripts/simple_link_layer_socket.py | 197 +++++++++ .../test_vendor_lib/scripts/simple_stack.py | 238 +++++++++++ 5 files changed, 1288 insertions(+) create mode 100644 vendor_libs/test_vendor_lib/scripts/hci_socket.py create mode 100644 vendor_libs/test_vendor_lib/scripts/link_layer_socket.py create mode 100644 vendor_libs/test_vendor_lib/scripts/send_simple_commands.py create mode 100644 vendor_libs/test_vendor_lib/scripts/simple_link_layer_socket.py create mode 100644 vendor_libs/test_vendor_lib/scripts/simple_stack.py diff --git a/vendor_libs/test_vendor_lib/scripts/hci_socket.py b/vendor_libs/test_vendor_lib/scripts/hci_socket.py new file mode 100644 index 000000000..0c425af93 --- /dev/null +++ b/vendor_libs/test_vendor_lib/scripts/hci_socket.py @@ -0,0 +1,441 @@ +# +# Copyright 2015 The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Script for sending data to a port. + +This script provides a simple shell interface for sending data at run-time to a +port. + +Usage: + 1. Choose a port to use. Use 'adb forward tcp: + tcp:' to forward the port to the device. + 2. In a separate shell, build and push the test vendor library to the device + using the script mentioned in option A (i.e. without the --test-channel flag + set). + 3. Once logcat has started, turn Bluetooth on from the device. + 4. Run this program, in the shell from step 1, the port, also from step 1, + as arguments. + + scapy is the tool we use to build packets in Python. + + >>> d = HCI_Hdr(type=1) / HCI_Command_Hdr(opcode = 0x1004) / + Raw(load='\x01') + >>> print(d) + >> + >>> raw(d) + '\x01\x04\x10\x01\x01' + >>> hexdump(d) + 0000 0104100101 ..... + + + >>> pkt = HCI_Hdr('\x02\x02\x20\x0a\x00\x06\x00\x01\x00') / + L2CAP_CmdHdr(code=10, id=2, len=2) /L2CAP_InfoReq(type=2) + >>> pkt + >>>> + >>> pkt = HCI_Hdr(type='ACL Data') / HCI_ACL_Hdr(handle=2, PB=0, BC=2, + len=10) / L2CAP_Hdr(len=6, cid='control') / L2CAP_CmdHdr(code='info_req', + id=2, len=2) / L2CAP_InfoReq(type='FEAT_MASK') + >>> raw(pkt) + '\x02\x02 \n\x00\x06\x00\x01\x00\n\x02\x02\x00\x02\x00' + >>> hexdump(pkt) + 0000 0202200A00060001000A0202000200 .. ............ + + +""" + +#!/usr/bin/env python + +import binascii +import cmd +import queue +import random +import socket +import string +import struct +import sys +from scapy.all import * +""" Add some more SCAPY stuff""" + + +class HCI_Cmd_Create_Connection(Packet): + name = 'Create Connection' + fields_desc = [ + LEMACField('addr', None), + LEShortField('packet_type', 8), + ByteEnumField('page_scan_repetition_mode', 0, { + 0: 'R0', + 1: 'R1', + 2: 'R2' + }), + ByteEnumField('rsvd', 0, {0: 'Reserved'}), + LEShortField('clock_offset', 0), + ByteEnumField('allow_role_switch', 1, { + 0: 'false', + 1: 'true' + }), + ] + + +class HCI_Cmd_Inquiry(Packet): + name = 'Inquiry' + fields_desc = [ + X3BytesField('LAP', 0x9e8b0), + ByteField('length', 1), + ByteField('max_responses', 0), + ] + + +bind_layers(HCI_Command_Hdr, HCI_Cmd_Inquiry, opcode=0x0401) +bind_layers(HCI_Command_Hdr, HCI_Cmd_Create_Connection, opcode=0x0405) + + +class HCI_Event_Inquiry_Result(Packet): + name = 'Inquiry Result' + fields_desc = [ + ByteField('num_responses', 0), + LEMACField('addr', None), + ByteEnumField('page_scan_repetition_mode', 0, { + 0: 'R0', + 1: 'R1', + 2: 'R2' + }), + LEShortEnumField('rsvd', 0, {0: 'Reserved'}), + X3BytesField('class_of_device', 0), + LEShortField('clock_offset', 0), + ] + + +class HCI_Event_Connection_Complete(Packet): + name = 'Connection Complete' + fields_desc = [ + ByteField('status', 0), + LEShortField('handle', 0xffff), + LEMACField('addr', None), + ByteField('link_type', 1), + ByteField('encryption_mode', 0), + ] + + +class HCI_Event_Remote_Name_Request_Complete(Packet): + name = 'Remote Name Request Complete' + fields_desc = [ + ByteField('status', 0), + LEMACField('addr', None), + ] + + +class HCI_Event_Read_Remote_Supported_Features_Complete(Packet): + name = 'Read Remote Supported Features Complete' + fields_desc = [ + ByteField('status', 0), + LEShortField('handle', 0xffff), + XLELongField('features', 0x0123456789abcdef), + ] + + +class HCI_Event_Read_Remote_Version_Information_Complete(Packet): + name = 'Read Remote Version Information Complete' + fields_desc = [ + ByteField('status', 0), + LEShortField('handle', 0xffff), + ByteField('version', 0), + LEShortField('manufacturer_name', 0), + LEShortField('subversion', 0), + ] + + +class HCI_Event_Read_Clock_Offset_Complete(Packet): + name = 'Read Clock Offset Complete' + fields_desc = [ + ByteField('status', 0), + LEShortField('handle', 0xffff), + LEShortField('offset', 0xffff), + ] + + +class HCI_Event_Read_Remote_Extended_Features_Complete(Packet): + name = 'Read Remote Supported Features Complete' + fields_desc = [ + ByteField('status', 0), + LEShortField('handle', 0xffff), + ByteField('page_number', 0), + ByteField('max_page_number', 0), + XLELongField('features', 0x0123456789abcdef), + ] + + +class HCI_Event_Extended_Inquiry_Result(Packet): + name = 'Extended Inquiry Result' + fields_desc = [ + ByteField('num_responses', 1), + LEMACField('addr', None), + ByteEnumField('page_scan_repetition_mode', 0, { + 0: 'R0', + 1: 'R1', + 2: 'R2' + }), + ByteEnumField('rsvd', 0, {0: 'Reserved'}), + X3BytesField('class_of_device', 0), + LEShortField('clock_offset', 0), + SignedByteField('rssi', -20), + PacketListField('extended_inquiry_response', [], EIR_Hdr, 1), + ] + + +bind_layers(HCI_Event_Hdr, HCI_Event_Inquiry_Result, code=0x02) +bind_layers(HCI_Event_Hdr, HCI_Event_Connection_Complete, code=0x03) +bind_layers(HCI_Event_Hdr, HCI_Event_Remote_Name_Request_Complete, code=0x07) +bind_layers( + HCI_Event_Hdr, HCI_Event_Read_Remote_Supported_Features_Complete, code=0x0b) +bind_layers( + HCI_Event_Hdr, + HCI_Event_Read_Remote_Version_Information_Complete, + code=0x0c) +bind_layers(HCI_Event_Hdr, HCI_Event_Read_Clock_Offset_Complete, code=0x1c) +bind_layers( + HCI_Event_Hdr, HCI_Event_Read_Remote_Extended_Features_Complete, code=0x23) +bind_layers(HCI_Event_Hdr, HCI_Event_Extended_Inquiry_Result, code=0x2f) +""" END SCAPY stuff""" + + +class HCISocket(SuperSocket): + """Simple wrapper class for a socket object. + + Attributes: + socket: The underlying socket created for the specified address and port. + """ + + def __init__(self, port): + self.done_ = False + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.connect(('localhost', port)) + self.ins = self.outs = s + self.packets_ = queue.Queue() + self.rx_thread_ = threading.Thread(target=self.rx_thread_body) + self.rx_thread_.start() + + def rx_bytes(self, size): + while not self.done_: + raw_bytes = b'' + while len(raw_bytes) < size and not self.done_: + more_raw_bytes = self.ins.recv(min(size - len(raw_bytes), 2048)) + if more_raw_bytes: + raw_bytes += more_raw_bytes + return raw_bytes + + def rx_thread_body(self): + while not self.done_: + payload_length = 0 + # Read the type + type_byte = self.rx_bytes(1) + if not type_byte: + continue + # Read the Header + header = b'' + if type_byte == b'\x01': # Command + header = self.rx_bytes(3) + if not header: + continue + payload_length = header[2] + elif type_byte == b'\x02': # ACL + header = self.rx_bytes(4) + if not header: + continue + payload_length = header[3] << 8 + payload_length |= header[2] + elif type_byte == b'\x03': # SCO + header = self.rx_bytes(3) + if not header: + continue + payload_length = header[2] + elif type_byte == b'\x04': # Event + header = self.rx_bytes(2) + if not header: + continue + payload_length = header[1] + else: + self.done_ = True + print('Rx: type_byte ' + hex(type_byte[0])) + # Read the Payload + payload = self.rx_bytes(payload_length) if payload_length != 0 else b'' + packet_bytes = type_byte + header + payload + packet = HCI_Hdr(packet_bytes) + print('Rx: ' + packet.__repr__()) + self.packets_.put(packet) + + def get_packet(self): + if self.packets_.empty(): + return False + return self.packets_.get() + + def tell_rx_thread_to_quit(self): + self.done_ = True + self.rx_thread_.join() + + +class HCIShell(cmd.Cmd): + """Shell for sending binary data to a port. + + """ + + def __init__(self, hci): + cmd.Cmd.__init__(self) + self._hci = hci + + def do_send(self, args): + """Arguments: dev_type_str Add a new device of type dev_type_str. + + """ + self._hci.send_binary(args.split()) + + def do_connect(self, args): + """Arguments: bluetooth_address xx:xx:xx:xx:xx:xx, timeout (seconds) + + """ + split_args = args.split() + address = split_args[0] if len(split_args) > 0 else 'NULL' + timeout = int(split_args[1]) if len(split_args) > 1 else 2 + num_responses = 0 + connect = HCI_Hdr(type='Command') / HCI_Command_Hdr( + opcode=0x0405) / HCI_Cmd_Create_Connection(addr=address) + self._hci.send(connect) + status = None + while status == None: + response = self._hci.get_packet() + if response == False: + continue + if response[HCI_Hdr].type == HCI_Hdr( + type='Event' + ).type and response[HCI_Event_Hdr].code == 0xf and response[ + HCI_Event_Command_Status].opcode == connect[HCI_Command_Hdr].opcode: + status = response[HCI_Event_Command_Status].status + if status != HCI_Event_Command_Status(status='pending').status: + print('Connection failed with status = ' + str(status)) + return + + handle = None + while handle == None: + connection_complete = self._hci.get_packet() + if connection_complete == False: + continue + if (connection_complete[HCI_Hdr].type == HCI_Hdr(type='Event').type) and ( + connection_complete[HCI_Event_Hdr].code == 0x3): + status = connection_complete[HCI_Event_Connection_Complete].status + if status != 0: + print('Connection complete with failed status = ' + str(status)) + return + handle = connection_complete[HCI_Event_Connection_Complete].handle + print('Connection established with handle ' + str(handle)) + connection_complete.show() + hexdump(connection_complete) + + l2cap_done = False + while l2cap_done == None: + l2cap_req = self._hci.get_packet() + if l2cap_req == False: + continue + if (l2cap_req[HCI_Hdr].type == HCI_Hdr(type='ACL Data').type) and ( + l2cap_req[L2CAP_Hdr].cid == L2CAP_Hdr(cid='control').cid) and ( + l2cap_req[L2CAP_CmdHdr].code == L2CAP_CmdHdr(code='info_req').code + ) and (l2cap_req[L2CAP_InfoReq].type == L2CAP_InfoReq( + type='FEAT_MASK').type): + print('Send Features packet' + HCI_Hdr(type='ACL Data') / HCI_ACL_Hdr( + handle=l2cap_req[HCI_ACL_Hdr].handle, PB=0, BC=2, len=16) / + L2CAP_Hdr(len=12, cid='control') / + L2CAP_CmdHdr(code='info_resp', id=146, len=8) / L2CAP_InfoResp( + type=l2cap_req[L2CAP_InfoResp].type, + result='success', + data=b'\xb8\x00\x00\x00').__repr__()) + self._hci.send( + HCI_Hdr(type='ACL Data') / HCI_ACL_Hdr( + handle=l2cap_req[HCI_ACL_Hdr].handle, PB=0, BC=2, len=16) / + L2CAP_Hdr(len=12, cid='control') / L2CAP_CmdHdr( + code='info_resp', id=146, len=8) / L2CAP_InfoResp( + type=l2cap_req[L2CAP_InfoResp].type, + result='success', + data=b'\xb8\x00\x00\x00')) + + def do_le_scan(self, args): + """Arguments: enable (0 or 1), filter duplicates (0 or 1) Print the scan responses from reachable devices + + """ + split_args = args.split() + enable = int(split_args[0]) if len(split_args) > 0 else 1 + filter_dups = int(split_args[1]) if len(split_args) > 1 else 1 + set_scan_enable = HCI_Hdr(type=1) / HCI_Command_Hdr( + opcode=0x200c) / HCI_Cmd_LE_Set_Scan_Enable( + enable=enable, filter_dups=filter_dups) + print('Tx: ' + set_scan_enable.__repr__()) + self._hci.send(set_scan_enable) + + def do_scan(self, args): + """Arguments: timeout (seconds), max_results Print the scan responses from reachable devices + + """ + split_args = args.split() + scan_time = int(split_args[0]) if len(split_args) > 0 else 0 + max_responses = int(split_args[1]) if len(split_args) > 1 else 0 + num_responses = 0 + inquiry = HCI_Hdr(type='Command') / HCI_Command_Hdr( + opcode=0x0401) / HCI_Cmd_Inquiry( + length=scan_time, max_responses=max_responses) + print('Tx: ' + inquiry.__repr__()) + self._hci.send(inquiry) + + def do_quit(self, args): + """Arguments: None. + + Exits. + """ + self._hci.tell_rx_thread_to_quit() + self._hci.close() + print('Goodbye.') + return True + + def do_help(self, args): + """Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr. + + """ + if (len(args) == 0): + cmd.Cmd.do_help(self, args) + + +def main(argv): + if len(argv) != 2: + print('Usage: python hci_socket.py [port]') + return + try: + port = int(argv[1]) + except ValueError: + print('Error parsing port.') + else: + try: + hci = HCISocket(port) + except socket.error as e: + print('Error connecting to socket: %s' % e) + except: + print('Error creating (check arguments).') + else: + hci_shell = HCIShell(hci) + hci_shell.prompt = '$ ' + hci_shell.cmdloop('Welcome to the RootCanal HCI Console \n' + + 'Type \'help\' for more information.') + + +if __name__ == '__main__': + main(sys.argv) diff --git a/vendor_libs/test_vendor_lib/scripts/link_layer_socket.py b/vendor_libs/test_vendor_lib/scripts/link_layer_socket.py new file mode 100644 index 000000000..c7b304520 --- /dev/null +++ b/vendor_libs/test_vendor_lib/scripts/link_layer_socket.py @@ -0,0 +1,192 @@ +# +# Copyright 2015 The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Script for sending data to a port. + +This script provides a simple shell interface for sending data at run-time to a +port. + +Usage: + 1. Choose a port to use. Use 'adb forward tcp: + tcp:' to forward the port to the device. + 2. In a separate shell, build and push the test vendor library to the device + using the script mentioned in option A (i.e. without the --test-channel flag + set). + 3. Once logcat has started, turn Bluetooth on from the device. + 4. Run this program, in the shell from step 1, the port, also from step 1, + as arguments. + + scapy is the tool we use to build packets in Python. + + >>> d = HCI_Hdr(type=1) / HCI_Command_Hdr(opcode = 0x1004) / + Raw(load='\x01') + >>> print(d) + >> + >>> raw(d) + '\x01\x04\x10\x01\x01' + >>> hexdump(d) + 0000 0104100101 ..... + + + >>> pkt = HCI_Hdr('\x02\x02\x20\x0a\x00\x06\x00\x01\x00') / + L2CAP_CmdHdr(code=10, id=2, len=2) /L2CAP_InfoReq(type=2) + >>> pkt + >>>> + >>> pkt = HCI_Hdr(type='ACL Data') / HCI_ACL_Hdr(handle=2, PB=0, BC=2, + len=10) / L2CAP_Hdr(len=6, cid='control') / L2CAP_CmdHdr(code='info_req', + id=2, len=2) / L2CAP_InfoReq(type='FEAT_MASK') + >>> raw(pkt) + '\x02\x02 \n\x00\x06\x00\x01\x00\n\x02\x02\x00\x02\x00' + >>> hexdump(pkt) + 0000 0202200A00060001000A0202000200 .. ............ + + +""" + +#!/usr/bin/env python + +import binascii +import cmd +import queue +import random +import socket +import string +import struct +import sys + +class LinkLayerSocket(object): + """Simple wrapper class for a socket object. + + Attributes: + socket: The underlying socket created for the specified address and port. + """ + + def __init__(self, port): + print('port = ' + port) + self.done_ = False + self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._socket.connect(('localhost', port)) + # Should it be a non-blocking socket? + # self._socket.setblocking(0) + self.packets_ = queue.Queue() + self.rx_thread_ = threading.Thread(target=self.rx_thread_body) + self.rx_thread_.start() + + def rx_bytes(self, size): + while not self.done_: + raw_bytes = b'' + while len(raw_bytes) < size and not self.done_: + more_raw_bytes = self._socket.recv(min(size - len(raw_bytes), 2048)) + if more_raw_bytes: + raw_bytes += more_raw_bytes + return raw_bytes + + def rx_thread_body(self): + while not self.done_: + payload_length = 0 + # Read the size (4B), the type (1B), and the addresses (2*6B) + header = self.rx_bytes(17) + if not header: + continue + payload_length = header[0] + payload_length |= header[1] << 8 + payload_length |= header[2] << 16 + payload_length |= header[3] << 24 + print('Rx: type_byte ' + hex(header[4])) + print('Rx: from ' + hex(header[5]) + ':' + hex(header[6]) + ':' + hex(header[7]) + ':' + hex(header[8]) + ':' + hex(header[9]) + ':' + hex(header[10])) + print('Rx: to ' + hex(header[11]) + ':' + hex(header[12]) + ':' + hex(header[13]) + ':' + hex(header[14]) + ':' + hex(header[15]) + ':' + hex(header[16])) + # Read the Payload + payload = self.rx_bytes(payload_length) if payload_length != 0 else b'' + packet_bytes = header + payload + self.packets_.put(packet_bytes) + + def get_packet(self): + if self.packets_.empty(): + return False + return self.packets_.get() + + def send_binary(self, args): + joined_args = ''.join(arg for arg in args) + print(joined_args) + packet = binascii.a2b_hex(joined_args) + if self._done: + return + self._connection.send(packet) + + def tell_rx_thread_to_quit(self): + self.done_ = True + self.rx_thread_.join() + + +class LinkLayerShell(cmd.Cmd): + """Shell for sending binary data to a port. + + """ + + def __init__(self, link_layer): + cmd.Cmd.__init__(self) + self._link_layer = link_layer + + def do_send(self, args): + """Arguments: binary representation of a packet. + + """ + self._link_layer.send_binary(args.split()) + + def do_quit(self, args): + """Arguments: None. + + Exits. + """ + self._link_layer.tell_rx_thread_to_quit() + self._link_layer.close() + print('Goodbye.') + return True + + def do_help(self, args): + """Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr. + + """ + if (len(args) == 0): + cmd.Cmd.do_help(self, args) + + +def main(argv): + if len(argv) != 2: + print('Usage: python link_layer_socket.py [port]') + return + try: + port = int(argv[1]) + except ValueError: + print('Error parsing port.') + else: + try: + link_layer = LinkLayerSocket(port) + except socket.error as e: + print('Error connecting to socket: %s' % e) + except: + print('Error creating (check arguments).') + else: + link_layer_shell = LinkLayerShell(link_layer) + link_layer_shell.prompt = '$ ' + link_layer_shell.cmdloop('Welcome to the RootCanal LinkLayer Console \n' + + 'Type \'help\' for more information.') + + +if __name__ == '__main__': + main(sys.argv) diff --git a/vendor_libs/test_vendor_lib/scripts/send_simple_commands.py b/vendor_libs/test_vendor_lib/scripts/send_simple_commands.py new file mode 100644 index 000000000..cadeab3c7 --- /dev/null +++ b/vendor_libs/test_vendor_lib/scripts/send_simple_commands.py @@ -0,0 +1,220 @@ +# +# Copyright 2015 The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Script for sending data to a port. + +This script provides a simple shell interface for sending data at run-time to a +port. + +Usage: + 1. Choose a port to use. Use 'adb forward tcp: + tcp:' to forward the port to the device. + 2. In a separate shell, build and push the test vendor library to the device + using the script mentioned in option A (i.e. without the --test-channel flag + set). + 3. Once logcat has started, turn Bluetooth on from the device. + 4. Run this program, in the shell from step 1, the port, also from step 1, + as arguments. + + scapy is the tool we use to build packets in Python. + + >>> d = HCI_Hdr(type=1) / HCI_Command_Hdr(opcode = 0x1004) / + Raw(load='\x01') + >>> print(d) + >> + >>> raw(d) + '\x01\x04\x10\x01\x01' + >>> hexdump(d) + 0000 0104100101 ..... + + + >>> pkt = HCI_Hdr('\x02\x02\x20\x0a\x00\x06\x00\x01\x00') / + L2CAP_CmdHdr(code=10, id=2, len=2) /L2CAP_InfoReq(type=2) + >>> pkt + >>>> + >>> pkt = HCI_Hdr(type="ACL Data") / HCI_ACL_Hdr(handle=2, PB=0, BC=2, + len=10) / L2CAP_Hdr(len=6, cid="control") / L2CAP_CmdHdr(code="info_req", + id=2, len=2) / L2CAP_InfoReq(type="FEAT_MASK") + >>> raw(pkt) + '\x02\x02 \n\x00\x06\x00\x01\x00\n\x02\x02\x00\x02\x00' + >>> hexdump(pkt) + 0000 0202200A00060001000A0202000200 .. ............ + + +""" + +#!/usr/bin/env python + +import binascii +import cmd +import random +import socket +import string +import struct +import sys +from scapy.all import * + +DEVICE_NAME_LENGTH = 6 +DEVICE_ADDRESS_LENGTH = 6 + + +# Used to generate fake device names and addresses during discovery. +def generate_random_name(): + return ''.join(random.SystemRandom().choice(string.ascii_uppercase + \ + string.digits) for _ in range(DEVICE_NAME_LENGTH)) + + +def generate_random_address(): + return ''.join(random.SystemRandom().choice(string.digits) for _ in \ + range(DEVICE_ADDRESS_LENGTH)) + + +class Connection(object): + """Simple wrapper class for a socket object. + + Attributes: + socket: The underlying socket created for the specified address and port. + """ + + def __init__(self, port): + self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._socket.connect(('localhost', port)) + self._socket.setblocking(0) + + def close(self): + self._socket.close() + + def send(self, data): + self._socket.sendall(data) + + def receive(self, size): + return self._socket.recv(size) + + +class RawPort(object): + """Checks outgoing commands and sends them once verified. + + Attributes: + connection: The connection to the HCI port. + """ + + def __init__(self, port): + self._connection = Connection(port) + self._closed = False + + def close(self): + self._connection.close() + self._closed = True + + def send_binary(self, args): + joined_args = ''.join(arg for arg in args) + print(joined_args) + packet = binascii.a2b_hex(joined_args) + if self._closed: + return + self._connection.send(packet) + received = self.receive_response() + received_bytes = bytearray(received) + print(raw(received_bytes)) + + def receive_response(self): + if self._closed: + return + size_chars = self._connection.receive(4) + if not size_chars: + print('Debug: No response') + return False + size_bytes = bytearray(size_chars) + response_size = 0 + for i in range(0, len(size_chars) - 1): + response_size |= ord(size_chars[i]) << (8 * i) + response = self._connection.receive(response_size) + return response + + def lint_command(self, name, args, name_size, args_size): + assert name_size == len(name) and args_size == len(args) + try: + name.encode('utf-8') + for arg in args: + arg.encode('utf-8') + except UnicodeError: + print('Unrecognized characters.') + raise + if name_size > 255 or args_size > 255: + raise ValueError # Size must be encodable in one octet. + for arg in args: + if len(arg) > 255: + raise ValueError # Size must be encodable in one octet. + + +class RawPortShell(cmd.Cmd): + """Shell for sending binary data to a port. + + """ + + def __init__(self, raw_port): + cmd.Cmd.__init__(self) + self._raw_port = raw_port + + def do_send(self, args): + """Arguments: dev_type_str Add a new device of type dev_type_str. + + """ + self._raw_port.send_binary(args.split()) + + def do_quit(self, args): + """Arguments: None. + + Exits. + """ + self._raw_port.close() + print('Goodbye.') + return True + + def do_help(self, args): + """Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr. + + """ + if (len(args) == 0): + cmd.Cmd.do_help(self, args) + + +def main(argv): + if len(argv) != 2: + print('Usage: python raw_port.py [port]') + return + try: + port = int(argv[1]) + except ValueError: + print('Error parsing port.') + else: + try: + raw_port = RawPort(port) + except (socket.error, e): + print('Error connecting to socket: %s' % e) + except: + print('Error creating (check arguments).') + else: + raw_port_shell = RawPortShell(raw_port) + raw_port_shell.prompt = '$ ' + raw_port_shell.cmdloop('Welcome to the RootCanal Console \n' + + 'Type \'help\' for more information.') + + +if __name__ == '__main__': + main(sys.argv) diff --git a/vendor_libs/test_vendor_lib/scripts/simple_link_layer_socket.py b/vendor_libs/test_vendor_lib/scripts/simple_link_layer_socket.py new file mode 100644 index 000000000..982aee2dc --- /dev/null +++ b/vendor_libs/test_vendor_lib/scripts/simple_link_layer_socket.py @@ -0,0 +1,197 @@ +# +# Copyright 2018 The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Script for sending data to a port. + +This script provides a simple shell interface for sending data at run-time to a +port. + +Usage: + 1. Choose a port to use. Use 'adb forward tcp: + tcp:' to forward the port to the device. + 2. In a separate shell, build and push the test vendor library to the device + using the script mentioned in option A (i.e. without the --test-channel flag + set). + 3. Once logcat has started, turn Bluetooth on from the device. + 4. Run this program, in the shell from step 1, the port, also from step 1, + as arguments. + + + length type source dest addr_type adv_type EIR Data + + adv public connectable Name (TATA) +send 18000000 07 010203040506 000000000000 00 00 050954415441 + 020106 + + length adv source dest public non-conn Name (TETE) +send 18000000 07 010203040507 000000000000 00 03 050954455445 + 020106 + + length scan_rsp source dest public scan_rsp Name (TFTF) +send 18000000 09 010203040506 4de24c67454b 00 04 050954465446 + 020106 + +""" + +#!/usr/bin/env python + +import binascii +import cmd +import random +import socket +import string +import struct +import sys + +DEVICE_NAME_LENGTH = 6 +DEVICE_ADDRESS_LENGTH = 6 + + +# Used to generate fake device names and addresses during discovery. +def generate_random_name(): + return ''.join(random.SystemRandom().choice(string.ascii_uppercase + \ + string.digits) for _ in range(DEVICE_NAME_LENGTH)) + + +def generate_random_address(): + return ''.join(random.SystemRandom().choice(string.digits) for _ in \ + range(DEVICE_ADDRESS_LENGTH)) + + +class Connection(object): + """Simple wrapper class for a socket object. + + Attributes: + socket: The underlying socket created for the specified address and port. + """ + + def __init__(self, port): + self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._socket.connect(('localhost', port)) + self._socket.setblocking(0) + + def close(self): + self._socket.close() + + def send(self, data): + self._socket.sendall(data) + + def receive(self, size): + return self._socket.recv(size) + + +class RawPort(object): + """Checks outgoing commands and sends them once verified. + + Attributes: + connection: The connection to the HCI port. + """ + + def __init__(self, port): + self._connection = Connection(port) + self._closed = False + + def close(self): + self._connection.close() + self._closed = True + + def send_binary(self, args): + joined_args = ''.join(arg for arg in args) + print(joined_args) + packet = binascii.a2b_hex(joined_args) + if self._closed: + return + self._connection.send(packet) + + def receive_response(self): + if self._closed: + return + size_chars = self._connection.receive(4) + if not size_chars: + print('Debug: No response') + return False + size_bytes = bytearray(size_chars) + response_size = 0 + for i in range(0, len(size_chars) - 1): + response_size |= ord(size_chars[i]) << (8 * i) + response = self._connection.receive(response_size) + return response + + def lint_command(self, name, args, name_size, args_size): + assert name_size == len(name) and args_size == len(args) + try: + name.encode('utf-8') + for arg in args: + arg.encode('utf-8') + except UnicodeError: + print('Unrecognized characters.') + raise + if name_size > 255 or args_size > 255: + raise ValueError # Size must be encodable in one octet. + for arg in args: + if len(arg) > 255: + raise ValueError # Size must be encodable in one octet. + + +class RawPortShell(cmd.Cmd): + """Shell for sending binary data to a port.""" + + def __init__(self, raw_port): + cmd.Cmd.__init__(self) + self._raw_port = raw_port + + def do_send(self, args): + """Arguments: dev_type_str Add a new device of type dev_type_str.""" + self._raw_port.send_binary(args.split()) + + def do_quit(self, args): + """Arguments: None. + + Exits. + """ + self._raw_port.close() + print('Goodbye.') + return True + + def do_help(self, args): + """Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr.""" + if (len(args) == 0): + cmd.Cmd.do_help(self, args) + + +def main(argv): + if len(argv) != 2: + print('Usage: python raw_port.py [port]') + return + try: + port = int(argv[1]) + except ValueError: + print('Error parsing port.') + else: + try: + raw_port = RawPort(port) + except (socket.error, e): + print('Error connecting to socket: %s' % e) + except: + print('Error creating (check arguments).') + else: + raw_port_shell = RawPortShell(raw_port) + raw_port_shell.prompt = '$ ' + raw_port_shell.cmdloop('Welcome to the RootCanal Console \n' + + 'Type \'help\' for more information.') + + +if __name__ == '__main__': + main(sys.argv) diff --git a/vendor_libs/test_vendor_lib/scripts/simple_stack.py b/vendor_libs/test_vendor_lib/scripts/simple_stack.py new file mode 100644 index 000000000..4f42bf2c6 --- /dev/null +++ b/vendor_libs/test_vendor_lib/scripts/simple_stack.py @@ -0,0 +1,238 @@ +# +# Copyright 2015 The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Script for sending data to a port. + +This script provides a simple shell interface for sending data at run-time to a +port. + +Usage: + 1. Choose a port to use. Use 'adb forward tcp: + tcp:' to forward the port to the device. + 2. In a separate shell, build and push the test vendor library to the device + using the script mentioned in option A (i.e. without the --test-channel flag + set). + 3. Once logcat has started, turn Bluetooth on from the device. + 4. Run this program, in the shell from step 1, the port, also from step 1, + as arguments. + + scapy is the tool we use to build packets in Python. + + >>> d = HCI_Hdr(type=1) / HCI_Command_Hdr(opcode = 0x1004) / + Raw(load='\x01') + >>> print(d) + >> + >>> raw(d) + '\x01\x04\x10\x01\x01' + >>> hexdump(d) + 0000 0104100101 ..... + + + >>> pkt = HCI_Hdr('\x02\x02\x20\x0a\x00\x06\x00\x01\x00') / + L2CAP_CmdHdr(code=10, id=2, len=2) /L2CAP_InfoReq(type=2) + >>> pkt + >>>> + >>> pkt = HCI_Hdr(type="ACL Data") / HCI_ACL_Hdr(handle=2, PB=0, BC=2, + len=10) / L2CAP_Hdr(len=6, cid="control") / L2CAP_CmdHdr(code="info_req", + id=2, len=2) / L2CAP_InfoReq(type="FEAT_MASK") + >>> raw(pkt) + '\x02\x02 \n\x00\x06\x00\x01\x00\n\x02\x02\x00\x02\x00' + >>> hexdump(pkt) + 0000 0202200A00060001000A0202000200 .. ............ + + +""" + +#!/usr/bin/env python + +import binascii +import cmd +import random +import socket +import string +import struct +import sys +from scapy.all import * +""" Add some more SCAPY stuff""" + + +class HCI_Cmd_Connect(Packet): + name = "Connect" + fields_desc = [ + ByteEnumField("filter", 0, {0: "address"}), + LEShortField("packet_type", 8), + ByteEnumField("page_scan_repetition_mode", 0, { + 0: "R0", + 1: "R1", + 2: "R2" + }), + ByteEnumField("page_scan_repetition_mode", 0, {0: "Reserved"}), + LEShortField("clock_offset", 0), + ByteEnumField("allow_role_switch", 0, { + 0: "false", + 1: "true" + }), + ] + + +class HCI_Cmd_Inquiry(Packet): + name = "Inquiry" + fields_desc = [ + XByteField("LAP0", 0), + XByteField("LAP1", 0x8B), + XByteField("LAP2", 0x9E), + ByteField("length", 1), + ByteField("max_responses", 0), + ] + + +""" END SCAPY stuff""" + + +class Connection(object): + """Simple wrapper class for a socket object. + + Attributes: + socket: The underlying socket created for the specified address and port. + """ + + def __init__(self, port): + self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._socket.connect(("localhost", port)) + + def close(self): + self._socket.close() + + def send(self, data): + self._socket.sendall(data) + + def receive(self, size): + return self._socket.recv(size) + + +class RawPort(object): + """Converts outgoing packets to binary and sends them. + + Attributes: + connection: The connection to the HCI port. + """ + + def __init__(self, port): + self._connection = Connection(port) + + def close(self): + self._connection.close() + + def send_binary(self, args): + joined_args = "".join(arg for arg in args) + print(joined_args) + packet = binascii.a2b_hex(joined_args) + self._connection.send(packet) + + def receive_response(self): + ready_to_read, ready_to_write, in_error = \ + select.select( + [ self._connection._socket ], + [ ], + [ self._connection._socket ], + 1.5) + if len(in_error) > 0: + print("Error") + return False + if len(ready_to_read) > 0: + print("Ready to Read") + type_str = self._connection.receive(512) + print(len(type_str)) + print(type_str) + return type_str + print("Returning false at the end") + return False + + +class RawPortShell(cmd.Cmd): + """Shell for sending binary data to a port. + + """ + + def __init__(self, raw_port): + cmd.Cmd.__init__(self) + self._raw_port = raw_port + + def do_send(self, args): + """Arguments: dev_type_str Add a new device of type dev_type_str. + + """ + self._raw_port.send_binary(args.split()) + + def do_scan(self, args): + """Arguments: timeout (seconds) Print the scan responses from reachable devices + + """ + self._raw_port.send_binary(args.split()) + + def do_quit(self, args): + """Arguments: None. + + Exits. + """ + self._raw_port.close() + print("Goodbye.") + return True + + def do_help(self, args): + """Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr. + + """ + if (len(args) == 0): + cmd.Cmd.do_help(self, args) + + def postcmd(self, stop, line): + """Called after each command stop : whether we will stop after this command line : the previous input line Return True to stop, False to continue + + """ + if stop: + return True + response = self._raw_port.receive_response() + print(response) + return False + + +def main(argv): + if len(argv) != 2: + print("Usage: python raw_port.py [port]") + return + try: + port = int(argv[1]) + except ValueError: + print("Error parsing port.") + else: + try: + raw_port = RawPort(port) + except (socket.error, e): + print("Error connecting to socket: %s" % e) + except: + print("Error creating (check arguments).") + else: + raw_port_shell = RawPortShell(raw_port) + raw_port_shell.prompt = "$ " + raw_port_shell.cmdloop("Welcome to the RootCanal Console \n" + + 'Type \'help\' for more information.') + + +if __name__ == "__main__": + main(sys.argv) -- 2.11.0