OSDN Git Service

Add PyHciAclConnection
authorZach Johnson <zachoverflow@google.com>
Sat, 29 Feb 2020 01:37:17 +0000 (17:37 -0800)
committerZach Johnson <zachoverflow@google.com>
Sat, 29 Feb 2020 01:39:49 +0000 (17:39 -0800)
Does not yet handle incoming data in an abstracted way.

Test: cert/run --host --test_filter=AclManagerTest
Change-Id: I68bbafdd65e5f76c8a368a5fd380634a4e93eaba

gd/hci/cert/acl_manager_test.py
gd/hci/cert/py_hci.py

index 4f6469c..66482e6 100644 (file)
@@ -39,20 +39,7 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):
     def setup_class(self):
         super().setup_class(dut_module='HCI_INTERFACES', cert_module='HCI')
 
-    def enqueue_acl_data(self, handle, pb_flag, b_flag, acl):
-        acl_msg = hci_facade.AclMsg(
-            handle=int(handle),
-            packet_boundary_flag=int(pb_flag),
-            broadcast_flag=int(b_flag),
-            data=acl)
-        self.cert.hci.SendAclData(acl_msg)
-
     def test_dut_connects(self):
-        self.cert.hci.register_for_events(
-            hci_packets.EventCode.CONNECTION_REQUEST,
-            hci_packets.EventCode.CONNECTION_COMPLETE,
-            hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED)
-
         with PyHci(self.cert) as cert_hci, \
             EventStream(self.dut.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream:
 
@@ -67,29 +54,9 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):
                             address=bytes(cert_address,
                                           'utf8')))) as connection_event_stream:
 
-                # Cert Accepts
-                connection_request = ConnectionRequestCapture()
-                assertThat(
-                    cert_hci.get_event_stream()).emits(connection_request)
-
-                cert_hci.send_command_with_status(
-                    hci_packets.AcceptConnectionRequestBuilder(
-                        connection_request.get().GetBdAddr(),
-                        hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE))
-
-                # Cert gets ConnectionComplete with a handle and sends ACL data
-                connection_complete = ConnectionCompleteCapture()
-                assertThat(
-                    cert_hci.get_event_stream()).emits(connection_complete)
-                cert_handle = connection_complete.get().GetConnectionHandle()
-
-                self.enqueue_acl_data(
-                    cert_handle, hci_packets.PacketBoundaryFlag.
-                    FIRST_AUTOMATICALLY_FLUSHABLE,
-                    hci_packets.BroadcastFlag.POINT_TO_POINT,
-                    bytes(
-                        b'\x26\x00\x07\x00This is just SomeAclData from the Cert'
-                    ))
+                cert_acl = cert_hci.accept_connection()
+                cert_acl.send_first(
+                    b'\x26\x00\x07\x00This is just SomeAclData from the Cert')
 
                 # DUT gets a connection complete event and sends and receives
                 connection_complete = ConnectionCompleteCapture()
@@ -109,11 +76,6 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):
                     lambda packet: b'SomeAclData' in packet.payload)
 
     def test_cert_connects(self):
-        self.cert.hci.register_for_events(
-            hci_packets.EventCode.ROLE_CHANGE,
-            hci_packets.EventCode.CONNECTION_COMPLETE,
-            hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED)
-
         with PyHci(self.cert) as cert_hci, \
             EventStream(self.dut.hci_acl_manager.FetchIncomingConnection(empty_proto.Empty())) as incoming_connection_stream, \
             EventStream(self.dut.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream:
@@ -124,15 +86,7 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):
             self.dut.neighbor.EnablePageScan(
                 neighbor_facade.EnableMsg(enabled=True))
 
-            # Cert connects
-            cert_hci.send_command_with_status(
-                hci_packets.CreateConnectionBuilder(
-                    dut_address.decode('utf-8'),
-                    0xcc18,  # Packet Type
-                    hci_packets.PageScanRepetitionMode.R1,
-                    0x0,
-                    hci_packets.ClockOffsetValid.INVALID,
-                    hci_packets.CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH))
+            cert_hci.initiate_connection(dut_address)
 
             # DUT gets a connection request
             connection_complete = ConnectionCompleteCapture()
@@ -146,16 +100,9 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):
                         b'\x29\x00\x07\x00This is just SomeMoreAclData from the DUT'
                     )))
 
-            connection_complete = ConnectionCompleteCapture()
-            assertThat(cert_hci.get_event_stream()).emits(connection_complete)
-            cert_handle = connection_complete.get().GetConnectionHandle()
-
-            self.enqueue_acl_data(
-                cert_handle,
-                hci_packets.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE,
-                hci_packets.BroadcastFlag.POINT_TO_POINT,
-                bytes(
-                    b'\x26\x00\x07\x00This is just SomeAclData from the Cert'))
+            cert_acl = cert_hci.complete_connection()
+            cert_acl.send_first(
+                b'\x26\x00\x07\x00This is just SomeAclData from the Cert')
 
             assertThat(cert_hci.get_acl_stream()).emits(
                 lambda packet: b'SomeMoreAclData' in packet.data)
@@ -163,11 +110,6 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):
                 lambda packet: b'SomeAclData' in packet.payload)
 
     def test_recombination_l2cap_packet(self):
-        self.cert.hci.register_for_events(
-            hci_packets.EventCode.CONNECTION_REQUEST,
-            hci_packets.EventCode.CONNECTION_COMPLETE,
-            hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED)
-
         with PyHci(self.cert) as cert_hci, \
             EventStream(self.dut.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream:
 
@@ -183,35 +125,10 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):
                             address=bytes(cert_address,
                                           'utf8')))) as connection_event_stream:
 
-                # Cert Accepts
-                connection_request = ConnectionRequestCapture()
-                assertThat(
-                    cert_hci.get_event_stream()).emits(connection_request)
-                cert_hci.send_command_with_status(
-                    hci_packets.AcceptConnectionRequestBuilder(
-                        connection_request.get().GetBdAddr(),
-                        hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE))
-
-                # Cert gets ConnectionComplete with a handle and sends ACL data
-                connection_complete = ConnectionCompleteCapture()
-                assertThat(
-                    cert_hci.get_event_stream()).emits(connection_complete)
-                cert_handle = connection_complete.get().GetConnectionHandle()
-
-                self.enqueue_acl_data(
-                    cert_handle, hci_packets.PacketBoundaryFlag.
-                    FIRST_AUTOMATICALLY_FLUSHABLE,
-                    hci_packets.BroadcastFlag.POINT_TO_POINT,
-                    bytes(b'\x06\x00\x07\x00Hello'))
-                self.enqueue_acl_data(
-                    cert_handle,
-                    hci_packets.PacketBoundaryFlag.CONTINUING_FRAGMENT,
-                    hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'!'))
-                self.enqueue_acl_data(
-                    cert_handle, hci_packets.PacketBoundaryFlag.
-                    FIRST_AUTOMATICALLY_FLUSHABLE,
-                    hci_packets.BroadcastFlag.POINT_TO_POINT,
-                    bytes(b'\xe8\x03\x07\x00' + b'Hello' * 200))
+                cert_acl = cert_hci.accept_connection()
+                cert_acl.send_first(b'\x06\x00\x07\x00Hello')
+                cert_acl.send_continuing(b'!')
+                cert_acl.send_first(b'\xe8\x03\x07\x00' + b'Hello' * 200)
 
                 # DUT gets a connection complete event and sends and receives
                 connection_complete = ConnectionCompleteCapture()
index bf61f79..548a12a 100644 (file)
 from google.protobuf import empty_pb2 as empty_proto
 from cert.event_stream import EventStream
 from captures import ReadBdAddrCompleteCapture
+from captures import ConnectionCompleteCapture
+from captures import ConnectionRequestCapture
 from bluetooth_packets_python3 import hci_packets
 from cert.truth import assertThat
+from hci.facade import facade_pb2 as hci_facade
+
+
+class PyHciAclConnection(object):
+
+    def __init__(self, handle, acl_stream, device):
+        self.handle = handle
+        self.acl_stream = acl_stream
+        self.device = device
+
+    def send(self, pb_flag, b_flag, data):
+        acl_msg = hci_facade.AclMsg(
+            handle=int(self.handle),
+            packet_boundary_flag=int(pb_flag),
+            broadcast_flag=int(b_flag),
+            data=data)
+        self.device.hci.SendAclData(acl_msg)
+
+    def send_first(self, data):
+        self.send(hci_packets.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE,
+                  hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(data))
+
+    def send_continuing(self, data):
+        self.send(hci_packets.PacketBoundaryFlag.CONTINUING_FRAGMENT,
+                  hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(data))
 
 
 class PyHci(object):
 
     def __init__(self, device):
         self.device = device
+
+        self.device.hci.register_for_events(
+            hci_packets.EventCode.ROLE_CHANGE,
+            hci_packets.EventCode.CONNECTION_REQUEST,
+            hci_packets.EventCode.CONNECTION_COMPLETE,
+            hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED)
+
         self.event_stream = self.device.hci.new_event_stream()
         self.acl_stream = EventStream(
             self.device.hci.FetchAclPackets(empty_proto.Empty()))
@@ -65,3 +99,30 @@ class PyHci(object):
         read_bd_addr = ReadBdAddrCompleteCapture()
         assertThat(self.event_stream).emits(read_bd_addr)
         return read_bd_addr.get().GetBdAddr()
+
+    def initiate_connection(self, remote_addr):
+        self.send_command_with_status(
+            hci_packets.CreateConnectionBuilder(
+                remote_addr.decode('utf-8'),
+                0xcc18,  # Packet Type
+                hci_packets.PageScanRepetitionMode.R1,
+                0x0,
+                hci_packets.ClockOffsetValid.INVALID,
+                hci_packets.CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH))
+
+    def accept_connection(self):
+        connection_request = ConnectionRequestCapture()
+        assertThat(self.event_stream).emits(connection_request)
+
+        self.send_command_with_status(
+            hci_packets.AcceptConnectionRequestBuilder(
+                connection_request.get().GetBdAddr(),
+                hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE))
+        return self.complete_connection()
+
+    def complete_connection(self):
+        connection_complete = ConnectionCompleteCapture()
+        assertThat(self.event_stream).emits(connection_complete)
+
+        handle = connection_complete.get().GetConnectionHandle()
+        return PyHciAclConnection(handle, self.acl_stream, self.device)