OSDN Git Service

Add PyHci, to simplify interactions with HCI (not HCI_INTERFACES)
authorZach Johnson <zachoverflow@google.com>
Sat, 29 Feb 2020 00:59:04 +0000 (16:59 -0800)
committerZach Johnson <zachoverflow@google.com>
Sat, 29 Feb 2020 01:00:45 +0000 (17:00 -0800)
Test: cert/run --host --test_filter=AclManagerTest
Change-Id: Id9b7d89aba3369b32fa17759c912439ba43d37b3

gd/hci/cert/acl_manager_test.py
gd/hci/cert/py_hci.py [new file with mode: 0644]

index e3d475d..ab5dc4e 100644 (file)
@@ -31,6 +31,7 @@ from bluetooth_packets_python3 import hci_packets
 from captures import ReadBdAddrCompleteCapture
 from captures import ConnectionCompleteCapture
 from captures import ConnectionRequestCapture
+from py_hci import PyHci
 
 
 class AclManagerTest(GdFacadeOnlyBaseTestClass):
@@ -52,20 +53,18 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):
             hci_packets.EventCode.CONNECTION_COMPLETE,
             hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED)
 
-        with self.cert.hci.new_event_stream() as cert_hci_event_stream, \
-            EventStream(self.cert.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \
+        with PyHci(self.cert) as cert_hci, \
             EventStream(self.dut.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream:
 
             # CERT Enables scans and gets its address
-            self.cert.hci.send_command_with_complete(
+            cert_hci.send_command_with_complete(
                 hci_packets.WriteScanEnableBuilder(
                     hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN))
 
-            self.cert.hci.send_command_with_complete(
-                hci_packets.ReadBdAddrBuilder())
+            cert_hci.send_command_with_complete(hci_packets.ReadBdAddrBuilder())
 
             read_bd_addr = ReadBdAddrCompleteCapture()
-            assertThat(cert_hci_event_stream).emits(read_bd_addr)
+            assertThat(cert_hci.get_event_stream()).emits(read_bd_addr)
             cert_address = read_bd_addr.get().GetBdAddr()
 
             with EventStream(
@@ -78,16 +77,18 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):
 
                 # Cert Accepts
                 connection_request = ConnectionRequestCapture()
-                assertThat(cert_hci_event_stream).emits(connection_request)
+                assertThat(
+                    cert_hci.get_event_stream()).emits(connection_request)
 
-                self.cert.hci.send_command_with_status(
+                cert_hci.send_command_with_status(
                     hci_packets.AcceptConnectionRequestBuilder(
                         connection_request.get().GetBdAddr(),
                         hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE))
 
                 # Cert gets ConnectionComplete with a handle and sends ACL data
                 connection_complete = ConnectionCompleteCapture()
-                assertThat(cert_hci_event_stream).emits(connection_complete)
+                assertThat(
+                    cert_hci.get_event_stream()).emits(connection_complete)
                 cert_handle = connection_complete.get().GetConnectionHandle()
 
                 self.enqueue_acl_data(
@@ -110,7 +111,7 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):
                             b'\x29\x00\x07\x00This is just SomeMoreAclData from the DUT'
                         )))
 
-                assertThat(cert_acl_data_stream).emits(
+                assertThat(cert_hci.get_acl_stream()).emits(
                     lambda packet: b'SomeMoreAclData' in packet.data)
                 assertThat(acl_data_stream).emits(
                     lambda packet: b'SomeAclData' in packet.payload)
@@ -121,8 +122,7 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):
             hci_packets.EventCode.CONNECTION_COMPLETE,
             hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED)
 
-        with self.cert.hci.new_event_stream() as cert_hci_event_stream, \
-            EventStream(self.cert.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \
+        with PyHci(self.cert) as cert_hci, \
             EventStream(self.dut.hci_acl_manager.FetchIncomingConnection(empty_proto.Empty())) as incoming_connection_stream, \
             EventStream(self.dut.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream:
 
@@ -133,7 +133,7 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):
                 neighbor_facade.EnableMsg(enabled=True))
 
             # Cert connects
-            self.cert.hci.send_command_with_status(
+            cert_hci.send_command_with_status(
                 hci_packets.CreateConnectionBuilder(
                     dut_address.decode('utf-8'),
                     0xcc18,  # Packet Type
@@ -155,7 +155,7 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):
                     )))
 
             connection_complete = ConnectionCompleteCapture()
-            assertThat(cert_hci_event_stream).emits(connection_complete)
+            assertThat(cert_hci.get_event_stream()).emits(connection_complete)
             cert_handle = connection_complete.get().GetConnectionHandle()
 
             self.enqueue_acl_data(
@@ -165,7 +165,7 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):
                 bytes(
                     b'\x26\x00\x07\x00This is just SomeAclData from the Cert'))
 
-            assertThat(cert_acl_data_stream).emits(
+            assertThat(cert_hci.get_acl_stream()).emits(
                 lambda packet: b'SomeMoreAclData' in packet.data)
             assertThat(acl_data_stream).emits(
                 lambda packet: b'SomeAclData' in packet.payload)
@@ -176,20 +176,18 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):
             hci_packets.EventCode.CONNECTION_COMPLETE,
             hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED)
 
-        with self.cert.hci.new_event_stream() as cert_hci_event_stream, \
-            EventStream(self.cert.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \
+        with PyHci(self.cert) as cert_hci, \
             EventStream(self.dut.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream:
 
             # CERT Enables scans and gets its address
-            self.cert.hci.send_command_with_complete(
+            cert_hci.send_command_with_complete(
                 hci_packets.WriteScanEnableBuilder(
                     hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN))
 
-            self.cert.hci.send_command_with_complete(
-                hci_packets.ReadBdAddrBuilder())
+            cert_hci.send_command_with_complete(hci_packets.ReadBdAddrBuilder())
 
             read_bd_addr = ReadBdAddrCompleteCapture()
-            assertThat(cert_hci_event_stream).emits(read_bd_addr)
+            assertThat(cert_hci.get_event_stream()).emits(read_bd_addr)
             cert_address = read_bd_addr.get().GetBdAddr()
 
             with EventStream(
@@ -202,15 +200,17 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass):
 
                 # Cert Accepts
                 connection_request = ConnectionRequestCapture()
-                assertThat(cert_hci_event_stream).emits(connection_request)
-                self.cert.hci.send_command_with_status(
+                assertThat(
+                    cert_hci.get_event_stream()).emits(connection_request)
+                cert_hci.send_command_with_status(
                     hci_packets.AcceptConnectionRequestBuilder(
                         connection_request.get().GetBdAddr(),
                         hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE))
 
                 # Cert gets ConnectionComplete with a handle and sends ACL data
                 connection_complete = ConnectionCompleteCapture()
-                assertThat(cert_hci_event_stream).emits(connection_complete)
+                assertThat(
+                    cert_hci.get_event_stream()).emits(connection_complete)
                 cert_handle = connection_complete.get().GetConnectionHandle()
 
                 self.enqueue_acl_data(
diff --git a/gd/hci/cert/py_hci.py b/gd/hci/cert/py_hci.py
new file mode 100644 (file)
index 0000000..e79b43f
--- /dev/null
@@ -0,0 +1,53 @@
+#!/usr/bin/env python3
+#
+#   Copyright 2020 - The Android Open Source Project
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+
+from google.protobuf import empty_pb2 as empty_proto
+from cert.event_stream import EventStream
+
+
+class PyHci(object):
+
+    def __init__(self, device):
+        self.device = device
+        self.event_stream = self.device.hci.new_event_stream()
+        self.acl_stream = EventStream(
+            self.device.hci.FetchAclPackets(empty_proto.Empty()))
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, type, value, traceback):
+        self.clean_up()
+        return traceback is None
+
+    def __del__(self):
+        self.clean_up()
+
+    def clean_up(self):
+        self.event_stream.shutdown()
+        self.acl_stream.shutdown()
+
+    def get_event_stream(self):
+        return self.event_stream
+
+    def get_acl_stream(self):
+        return self.acl_stream
+
+    def send_command_with_complete(self, command):
+        self.device.hci.send_command_with_complete(command)
+
+    def send_command_with_status(self, command):
+        self.device.hci.send_command_with_status(command)