From ab56cdce497e787c04ab5ee651e40eb9f530ca3b Mon Sep 17 00:00:00 2001 From: Zach Johnson Date: Fri, 28 Feb 2020 17:37:17 -0800 Subject: [PATCH] Add PyHciAclConnection Does not yet handle incoming data in an abstracted way. Test: cert/run --host --test_filter=AclManagerTest Change-Id: I68bbafdd65e5f76c8a368a5fd380634a4e93eaba --- gd/hci/cert/acl_manager_test.py | 105 +++++----------------------------------- gd/hci/cert/py_hci.py | 61 +++++++++++++++++++++++ 2 files changed, 72 insertions(+), 94 deletions(-) diff --git a/gd/hci/cert/acl_manager_test.py b/gd/hci/cert/acl_manager_test.py index 4f6469cd9..66482e661 100644 --- a/gd/hci/cert/acl_manager_test.py +++ b/gd/hci/cert/acl_manager_test.py @@ -39,20 +39,7 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): def setup_class(self): super().setup_class(dut_module='HCI_INTERFACES', cert_module='HCI') - def enqueue_acl_data(self, handle, pb_flag, b_flag, acl): - acl_msg = hci_facade.AclMsg( - handle=int(handle), - packet_boundary_flag=int(pb_flag), - broadcast_flag=int(b_flag), - data=acl) - self.cert.hci.SendAclData(acl_msg) - def test_dut_connects(self): - self.cert.hci.register_for_events( - hci_packets.EventCode.CONNECTION_REQUEST, - hci_packets.EventCode.CONNECTION_COMPLETE, - hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED) - with PyHci(self.cert) as cert_hci, \ EventStream(self.dut.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream: @@ -67,29 +54,9 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): address=bytes(cert_address, 'utf8')))) as connection_event_stream: - # Cert Accepts - connection_request = ConnectionRequestCapture() - assertThat( - cert_hci.get_event_stream()).emits(connection_request) - - cert_hci.send_command_with_status( - hci_packets.AcceptConnectionRequestBuilder( - connection_request.get().GetBdAddr(), - hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE)) - - # Cert gets ConnectionComplete with a handle and sends ACL data - connection_complete = ConnectionCompleteCapture() - assertThat( - cert_hci.get_event_stream()).emits(connection_complete) - cert_handle = connection_complete.get().GetConnectionHandle() - - self.enqueue_acl_data( - cert_handle, hci_packets.PacketBoundaryFlag. - FIRST_AUTOMATICALLY_FLUSHABLE, - hci_packets.BroadcastFlag.POINT_TO_POINT, - bytes( - b'\x26\x00\x07\x00This is just SomeAclData from the Cert' - )) + cert_acl = cert_hci.accept_connection() + cert_acl.send_first( + b'\x26\x00\x07\x00This is just SomeAclData from the Cert') # DUT gets a connection complete event and sends and receives connection_complete = ConnectionCompleteCapture() @@ -109,11 +76,6 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): lambda packet: b'SomeAclData' in packet.payload) def test_cert_connects(self): - self.cert.hci.register_for_events( - hci_packets.EventCode.ROLE_CHANGE, - hci_packets.EventCode.CONNECTION_COMPLETE, - hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED) - with PyHci(self.cert) as cert_hci, \ EventStream(self.dut.hci_acl_manager.FetchIncomingConnection(empty_proto.Empty())) as incoming_connection_stream, \ EventStream(self.dut.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream: @@ -124,15 +86,7 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): self.dut.neighbor.EnablePageScan( neighbor_facade.EnableMsg(enabled=True)) - # Cert connects - cert_hci.send_command_with_status( - hci_packets.CreateConnectionBuilder( - dut_address.decode('utf-8'), - 0xcc18, # Packet Type - hci_packets.PageScanRepetitionMode.R1, - 0x0, - hci_packets.ClockOffsetValid.INVALID, - hci_packets.CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH)) + cert_hci.initiate_connection(dut_address) # DUT gets a connection request connection_complete = ConnectionCompleteCapture() @@ -146,16 +100,9 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): b'\x29\x00\x07\x00This is just SomeMoreAclData from the DUT' ))) - connection_complete = ConnectionCompleteCapture() - assertThat(cert_hci.get_event_stream()).emits(connection_complete) - cert_handle = connection_complete.get().GetConnectionHandle() - - self.enqueue_acl_data( - cert_handle, - hci_packets.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE, - hci_packets.BroadcastFlag.POINT_TO_POINT, - bytes( - b'\x26\x00\x07\x00This is just SomeAclData from the Cert')) + cert_acl = cert_hci.complete_connection() + cert_acl.send_first( + b'\x26\x00\x07\x00This is just SomeAclData from the Cert') assertThat(cert_hci.get_acl_stream()).emits( lambda packet: b'SomeMoreAclData' in packet.data) @@ -163,11 +110,6 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): lambda packet: b'SomeAclData' in packet.payload) def test_recombination_l2cap_packet(self): - self.cert.hci.register_for_events( - hci_packets.EventCode.CONNECTION_REQUEST, - hci_packets.EventCode.CONNECTION_COMPLETE, - hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED) - with PyHci(self.cert) as cert_hci, \ EventStream(self.dut.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream: @@ -183,35 +125,10 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): address=bytes(cert_address, 'utf8')))) as connection_event_stream: - # Cert Accepts - connection_request = ConnectionRequestCapture() - assertThat( - cert_hci.get_event_stream()).emits(connection_request) - cert_hci.send_command_with_status( - hci_packets.AcceptConnectionRequestBuilder( - connection_request.get().GetBdAddr(), - hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE)) - - # Cert gets ConnectionComplete with a handle and sends ACL data - connection_complete = ConnectionCompleteCapture() - assertThat( - cert_hci.get_event_stream()).emits(connection_complete) - cert_handle = connection_complete.get().GetConnectionHandle() - - self.enqueue_acl_data( - cert_handle, hci_packets.PacketBoundaryFlag. - FIRST_AUTOMATICALLY_FLUSHABLE, - hci_packets.BroadcastFlag.POINT_TO_POINT, - bytes(b'\x06\x00\x07\x00Hello')) - self.enqueue_acl_data( - cert_handle, - hci_packets.PacketBoundaryFlag.CONTINUING_FRAGMENT, - hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'!')) - self.enqueue_acl_data( - cert_handle, hci_packets.PacketBoundaryFlag. - FIRST_AUTOMATICALLY_FLUSHABLE, - hci_packets.BroadcastFlag.POINT_TO_POINT, - bytes(b'\xe8\x03\x07\x00' + b'Hello' * 200)) + cert_acl = cert_hci.accept_connection() + cert_acl.send_first(b'\x06\x00\x07\x00Hello') + cert_acl.send_continuing(b'!') + cert_acl.send_first(b'\xe8\x03\x07\x00' + b'Hello' * 200) # DUT gets a connection complete event and sends and receives connection_complete = ConnectionCompleteCapture() diff --git a/gd/hci/cert/py_hci.py b/gd/hci/cert/py_hci.py index bf61f7917..548a12aa8 100644 --- a/gd/hci/cert/py_hci.py +++ b/gd/hci/cert/py_hci.py @@ -17,14 +17,48 @@ from google.protobuf import empty_pb2 as empty_proto from cert.event_stream import EventStream from captures import ReadBdAddrCompleteCapture +from captures import ConnectionCompleteCapture +from captures import ConnectionRequestCapture from bluetooth_packets_python3 import hci_packets from cert.truth import assertThat +from hci.facade import facade_pb2 as hci_facade + + +class PyHciAclConnection(object): + + def __init__(self, handle, acl_stream, device): + self.handle = handle + self.acl_stream = acl_stream + self.device = device + + def send(self, pb_flag, b_flag, data): + acl_msg = hci_facade.AclMsg( + handle=int(self.handle), + packet_boundary_flag=int(pb_flag), + broadcast_flag=int(b_flag), + data=data) + self.device.hci.SendAclData(acl_msg) + + def send_first(self, data): + self.send(hci_packets.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE, + hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(data)) + + def send_continuing(self, data): + self.send(hci_packets.PacketBoundaryFlag.CONTINUING_FRAGMENT, + hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(data)) class PyHci(object): def __init__(self, device): self.device = device + + self.device.hci.register_for_events( + hci_packets.EventCode.ROLE_CHANGE, + hci_packets.EventCode.CONNECTION_REQUEST, + hci_packets.EventCode.CONNECTION_COMPLETE, + hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED) + self.event_stream = self.device.hci.new_event_stream() self.acl_stream = EventStream( self.device.hci.FetchAclPackets(empty_proto.Empty())) @@ -65,3 +99,30 @@ class PyHci(object): read_bd_addr = ReadBdAddrCompleteCapture() assertThat(self.event_stream).emits(read_bd_addr) return read_bd_addr.get().GetBdAddr() + + def initiate_connection(self, remote_addr): + self.send_command_with_status( + hci_packets.CreateConnectionBuilder( + remote_addr.decode('utf-8'), + 0xcc18, # Packet Type + hci_packets.PageScanRepetitionMode.R1, + 0x0, + hci_packets.ClockOffsetValid.INVALID, + hci_packets.CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH)) + + def accept_connection(self): + connection_request = ConnectionRequestCapture() + assertThat(self.event_stream).emits(connection_request) + + self.send_command_with_status( + hci_packets.AcceptConnectionRequestBuilder( + connection_request.get().GetBdAddr(), + hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE)) + return self.complete_connection() + + def complete_connection(self): + connection_complete = ConnectionCompleteCapture() + assertThat(self.event_stream).emits(connection_complete) + + handle = connection_complete.get().GetConnectionHandle() + return PyHciAclConnection(handle, self.acl_stream, self.device) -- 2.11.0