From 3d626de7a762e419b15fbda8ad1f9445c75e27d0 Mon Sep 17 00:00:00 2001 From: Zach Johnson Date: Fri, 28 Feb 2020 16:59:04 -0800 Subject: [PATCH] Add PyHci, to simplify interactions with HCI (not HCI_INTERFACES) Test: cert/run --host --test_filter=AclManagerTest Change-Id: Id9b7d89aba3369b32fa17759c912439ba43d37b3 --- gd/hci/cert/acl_manager_test.py | 48 ++++++++++++++++++------------------- gd/hci/cert/py_hci.py | 53 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 77 insertions(+), 24 deletions(-) create mode 100644 gd/hci/cert/py_hci.py diff --git a/gd/hci/cert/acl_manager_test.py b/gd/hci/cert/acl_manager_test.py index e3d475d74..ab5dc4e88 100644 --- a/gd/hci/cert/acl_manager_test.py +++ b/gd/hci/cert/acl_manager_test.py @@ -31,6 +31,7 @@ from bluetooth_packets_python3 import hci_packets from captures import ReadBdAddrCompleteCapture from captures import ConnectionCompleteCapture from captures import ConnectionRequestCapture +from py_hci import PyHci class AclManagerTest(GdFacadeOnlyBaseTestClass): @@ -52,20 +53,18 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): hci_packets.EventCode.CONNECTION_COMPLETE, hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED) - with self.cert.hci.new_event_stream() as cert_hci_event_stream, \ - EventStream(self.cert.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \ + with PyHci(self.cert) as cert_hci, \ EventStream(self.dut.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream: # CERT Enables scans and gets its address - self.cert.hci.send_command_with_complete( + cert_hci.send_command_with_complete( hci_packets.WriteScanEnableBuilder( hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN)) - self.cert.hci.send_command_with_complete( - hci_packets.ReadBdAddrBuilder()) + cert_hci.send_command_with_complete(hci_packets.ReadBdAddrBuilder()) read_bd_addr = ReadBdAddrCompleteCapture() - assertThat(cert_hci_event_stream).emits(read_bd_addr) + assertThat(cert_hci.get_event_stream()).emits(read_bd_addr) cert_address = read_bd_addr.get().GetBdAddr() with EventStream( @@ -78,16 +77,18 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): # Cert Accepts connection_request = ConnectionRequestCapture() - assertThat(cert_hci_event_stream).emits(connection_request) + assertThat( + cert_hci.get_event_stream()).emits(connection_request) - self.cert.hci.send_command_with_status( + cert_hci.send_command_with_status( hci_packets.AcceptConnectionRequestBuilder( connection_request.get().GetBdAddr(), hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE)) # Cert gets ConnectionComplete with a handle and sends ACL data connection_complete = ConnectionCompleteCapture() - assertThat(cert_hci_event_stream).emits(connection_complete) + assertThat( + cert_hci.get_event_stream()).emits(connection_complete) cert_handle = connection_complete.get().GetConnectionHandle() self.enqueue_acl_data( @@ -110,7 +111,7 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): b'\x29\x00\x07\x00This is just SomeMoreAclData from the DUT' ))) - assertThat(cert_acl_data_stream).emits( + assertThat(cert_hci.get_acl_stream()).emits( lambda packet: b'SomeMoreAclData' in packet.data) assertThat(acl_data_stream).emits( lambda packet: b'SomeAclData' in packet.payload) @@ -121,8 +122,7 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): hci_packets.EventCode.CONNECTION_COMPLETE, hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED) - with self.cert.hci.new_event_stream() as cert_hci_event_stream, \ - EventStream(self.cert.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \ + with PyHci(self.cert) as cert_hci, \ EventStream(self.dut.hci_acl_manager.FetchIncomingConnection(empty_proto.Empty())) as incoming_connection_stream, \ EventStream(self.dut.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream: @@ -133,7 +133,7 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): neighbor_facade.EnableMsg(enabled=True)) # Cert connects - self.cert.hci.send_command_with_status( + cert_hci.send_command_with_status( hci_packets.CreateConnectionBuilder( dut_address.decode('utf-8'), 0xcc18, # Packet Type @@ -155,7 +155,7 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): ))) connection_complete = ConnectionCompleteCapture() - assertThat(cert_hci_event_stream).emits(connection_complete) + assertThat(cert_hci.get_event_stream()).emits(connection_complete) cert_handle = connection_complete.get().GetConnectionHandle() self.enqueue_acl_data( @@ -165,7 +165,7 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): bytes( b'\x26\x00\x07\x00This is just SomeAclData from the Cert')) - assertThat(cert_acl_data_stream).emits( + assertThat(cert_hci.get_acl_stream()).emits( lambda packet: b'SomeMoreAclData' in packet.data) assertThat(acl_data_stream).emits( lambda packet: b'SomeAclData' in packet.payload) @@ -176,20 +176,18 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): hci_packets.EventCode.CONNECTION_COMPLETE, hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED) - with self.cert.hci.new_event_stream() as cert_hci_event_stream, \ - EventStream(self.cert.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \ + with PyHci(self.cert) as cert_hci, \ EventStream(self.dut.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream: # CERT Enables scans and gets its address - self.cert.hci.send_command_with_complete( + cert_hci.send_command_with_complete( hci_packets.WriteScanEnableBuilder( hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN)) - self.cert.hci.send_command_with_complete( - hci_packets.ReadBdAddrBuilder()) + cert_hci.send_command_with_complete(hci_packets.ReadBdAddrBuilder()) read_bd_addr = ReadBdAddrCompleteCapture() - assertThat(cert_hci_event_stream).emits(read_bd_addr) + assertThat(cert_hci.get_event_stream()).emits(read_bd_addr) cert_address = read_bd_addr.get().GetBdAddr() with EventStream( @@ -202,15 +200,17 @@ class AclManagerTest(GdFacadeOnlyBaseTestClass): # Cert Accepts connection_request = ConnectionRequestCapture() - assertThat(cert_hci_event_stream).emits(connection_request) - self.cert.hci.send_command_with_status( + assertThat( + cert_hci.get_event_stream()).emits(connection_request) + cert_hci.send_command_with_status( hci_packets.AcceptConnectionRequestBuilder( connection_request.get().GetBdAddr(), hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE)) # Cert gets ConnectionComplete with a handle and sends ACL data connection_complete = ConnectionCompleteCapture() - assertThat(cert_hci_event_stream).emits(connection_complete) + assertThat( + cert_hci.get_event_stream()).emits(connection_complete) cert_handle = connection_complete.get().GetConnectionHandle() self.enqueue_acl_data( diff --git a/gd/hci/cert/py_hci.py b/gd/hci/cert/py_hci.py new file mode 100644 index 000000000..e79b43fc2 --- /dev/null +++ b/gd/hci/cert/py_hci.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python3 +# +# Copyright 2020 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from google.protobuf import empty_pb2 as empty_proto +from cert.event_stream import EventStream + + +class PyHci(object): + + def __init__(self, device): + self.device = device + self.event_stream = self.device.hci.new_event_stream() + self.acl_stream = EventStream( + self.device.hci.FetchAclPackets(empty_proto.Empty())) + + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + self.clean_up() + return traceback is None + + def __del__(self): + self.clean_up() + + def clean_up(self): + self.event_stream.shutdown() + self.acl_stream.shutdown() + + def get_event_stream(self): + return self.event_stream + + def get_acl_stream(self): + return self.acl_stream + + def send_command_with_complete(self, command): + self.device.hci.send_command_with_complete(command) + + def send_command_with_status(self, command): + self.device.hci.send_command_with_status(command) -- 2.11.0