OSDN Git Service

Add advertisement abstraction in PyHal
authorZach Johnson <zachoverflow@google.com>
Fri, 13 Nov 2020 18:16:32 +0000 (10:16 -0800)
committerZach Johnson <zachoverflow@google.com>
Wed, 18 Nov 2020 22:12:43 +0000 (14:12 -0800)
Bug: 171749953
Tag: #gd-refactor
Test: gd/cert/run --host
Change-Id: I07d3bb46876f86fb2aba74b49cddde7425703937

gd/cert/py_hal.py
gd/hal/cert/simple_hal_test.py
gd/hci/cert/direct_hci_test.py

index 59841bb..4c065ae 100644 (file)
@@ -30,6 +30,26 @@ from bluetooth_packets_python3 import RawBuilder
 from bluetooth_packets_python3.hci_packets import BroadcastFlag
 from bluetooth_packets_python3.hci_packets import PacketBoundaryFlag
 from bluetooth_packets_python3 import hci_packets
+from cert.matchers import HciMatchers
+from bluetooth_packets_python3.hci_packets import FilterDuplicates
+from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingLegacyParametersBuilder
+from bluetooth_packets_python3.hci_packets import LegacyAdvertisingProperties
+from bluetooth_packets_python3.hci_packets import PeerAddressType
+from bluetooth_packets_python3.hci_packets import AdvertisingFilterPolicy
+from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingRandomAddressBuilder
+from bluetooth_packets_python3.hci_packets import GapData
+from bluetooth_packets_python3.hci_packets import GapDataType
+from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingDataBuilder
+from bluetooth_packets_python3.hci_packets import Operation
+from bluetooth_packets_python3.hci_packets import OwnAddressType
+from bluetooth_packets_python3.hci_packets import LeScanningFilterPolicy
+from bluetooth_packets_python3.hci_packets import Enable
+from bluetooth_packets_python3.hci_packets import FragmentPreference
+from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingScanResponseBuilder
+from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingEnableBuilder
+from bluetooth_packets_python3.hci_packets import LeSetExtendedScanEnableBuilder
+from bluetooth_packets_python3.hci_packets import EnabledSet
+from bluetooth_packets_python3.hci_packets import OpCode
 
 
 class PyHalAclConnection(IEventStream):
@@ -50,6 +70,47 @@ class PyHalAclConnection(IEventStream):
         return self.our_acl_stream.get_event_queue()
 
 
+class PyHalAdvertisement(object):
+
+    def __init__(self, handle, py_hal):
+        self.handle = handle
+        self.py_hal = py_hal
+
+    def set_data(self, complete_name):
+        data = GapData()
+        data.data_type = GapDataType.COMPLETE_LOCAL_NAME
+        data.data = list(bytes(complete_name))
+        self.py_hal.send_hci_command(
+            LeSetExtendedAdvertisingDataBuilder(self.handle, Operation.COMPLETE_ADVERTISEMENT,
+                                                FragmentPreference.CONTROLLER_SHOULD_NOT, [data]))
+
+    def set_scan_response(self, shortened_name):
+        data = GapData()
+        data.data_type = GapDataType.SHORTENED_LOCAL_NAME
+        data.data = list(bytes(shortened_name))
+        self.py_hal.send_hci_command(
+            LeSetExtendedAdvertisingScanResponseBuilder(self.handle, Operation.COMPLETE_ADVERTISEMENT,
+                                                        FragmentPreference.CONTROLLER_SHOULD_NOT, [data]))
+
+    def start(self):
+        enabled_set = EnabledSet()
+        enabled_set.advertising_handle = self.handle
+        enabled_set.duration = 0
+        enabled_set.max_extended_advertising_events = 0
+        self.py_hal.send_hci_command(LeSetExtendedAdvertisingEnableBuilder(Enable.ENABLED, [enabled_set]))
+        assertThat(self.py_hal.get_hci_event_stream()).emits(
+            HciMatchers.CommandComplete(OpCode.LE_SET_EXTENDED_ADVERTISING_ENABLE))
+
+    def stop(self):
+        enabled_set = EnabledSet()
+        enabled_set.advertising_handle = self.handle
+        enabled_set.duration = 0
+        enabled_set.max_extended_advertising_events = 0
+        self.py_hal.send_hci_command(LeSetExtendedAdvertisingEnableBuilder(Enable.DISABLED, [enabled_set]))
+        assertThat(self.py_hal.get_hci_event_stream()).emits(
+            HciMatchers.CommandComplete(OpCode.LE_SET_EXTENDED_ADVERTISING_ENABLE))
+
+
 class PyHal(Closable):
 
     def __init__(self, device):
@@ -114,3 +175,26 @@ class PyHal(Closable):
 
         handle = connection_complete.get().GetConnectionHandle()
         return PyHalAclConnection(handle, self.acl_stream, self.device)
+
+    def create_advertisement(self,
+                             handle,
+                             own_address,
+                             properties=LegacyAdvertisingProperties.ADV_IND,
+                             min_interval=400,
+                             max_interval=450,
+                             channel_map=7,
+                             own_address_type=OwnAddressType.RANDOM_DEVICE_ADDRESS,
+                             peer_address_type=PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS,
+                             peer_address='00:00:00:00:00:00',
+                             filter_policy=AdvertisingFilterPolicy.ALL_DEVICES,
+                             tx_power=0xF8,
+                             sid=1,
+                             scan_request_notification=Enable.DISABLED):
+
+        self.send_hci_command(
+            LeSetExtendedAdvertisingLegacyParametersBuilder(handle, properties, min_interval, max_interval, channel_map,
+                                                            own_address_type, peer_address_type, peer_address,
+                                                            filter_policy, tx_power, sid, scan_request_notification))
+
+        self.send_hci_command(LeSetExtendedAdvertisingRandomAddressBuilder(handle, own_address))
+        return PyHalAdvertisement(handle, self)
index 4b92632..6eb6e3c 100644 (file)
@@ -94,48 +94,20 @@ class SimpleHalTest(GdBaseTestClass):
             hci_packets.LeSetExtendedScanEnableBuilder(hci_packets.Enable.ENABLED,
                                                        hci_packets.FilterDuplicates.DISABLED, 0, 0))
 
-        # CERT Advertises
-        advertising_handle = 0
-        self.cert_hal.send_hci_command(
-            hci_packets.LeSetExtendedAdvertisingLegacyParametersBuilder(
-                advertising_handle,
-                hci_packets.LegacyAdvertisingProperties.ADV_IND,
-                512,
-                768,
-                7,
-                hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
-                hci_packets.PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS,
-                'A6:A5:A4:A3:A2:A1',
-                hci_packets.AdvertisingFilterPolicy.ALL_DEVICES,
-                0x7F,
-                0,  # SID
-                hci_packets.Enable.DISABLED  # Scan request notification
-            ))
-
-        self.cert_hal.send_hci_command(
-            hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder(advertising_handle, '0C:05:04:03:02:01'))
-
-        gap_name = hci_packets.GapData()
-        gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
-        gap_name.data = list(bytes(b'Im_A_Cert'))
-
-        self.cert_hal.send_hci_command(
-            hci_packets.LeSetExtendedAdvertisingDataBuilder(
-                advertising_handle, hci_packets.Operation.COMPLETE_ADVERTISEMENT,
-                hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_name]))
-        enabled_set = hci_packets.EnabledSet()
-        enabled_set.advertising_handle = advertising_handle
-        enabled_set.duration = 0
-        enabled_set.max_extended_advertising_events = 0
-        self.cert_hal.send_hci_command(
-            hci_packets.LeSetExtendedAdvertisingEnableBuilder(hci_packets.Enable.ENABLED, [enabled_set]))
+        advertisement = self.cert_hal.create_advertisement(
+            0,
+            '0C:05:04:03:02:01',
+            min_interval=512,
+            max_interval=768,
+            peer_address='A6:A5:A4:A3:A2:A1',
+            tx_power=0x7f,
+            sid=1)
+        advertisement.set_data(b'Im_A_Cert')
+        advertisement.start()
 
         assertThat(self.dut_hal.get_hci_event_stream()).emits(lambda packet: b'Im_A_Cert' in packet.payload)
 
-        # Disable Advertising
-        self.cert_hal.send_hci_command(
-            hci_packets.LeSetExtendedAdvertisingEnableBuilder(hci_packets.Enable.DISABLED, [enabled_set]))
-
+        advertisement.stop()
         # Disable Scanning
         self.dut_hal.send_hci_command(
             hci_packets.LeSetExtendedScanEnableBuilder(hci_packets.Enable.ENABLED,
index c9ba1b9..3a3fb8d 100644 (file)
@@ -244,39 +244,16 @@ class DirectHciTest(GdBaseTestClass):
                                               OwnAddressType.RANDOM_DEVICE_ADDRESS, AddressType.RANDOM_DEVICE_ADDRESS,
                                               'BA:D5:A4:A3:A2:A1', 1, [phy_scan_params]))
 
-        # CERT Advertises
-        advertising_handle = 1
-        self.cert_hal.send_hci_command(
-            LeSetExtendedAdvertisingLegacyParametersBuilder(
-                advertising_handle,
-                LegacyAdvertisingProperties.ADV_IND,
-                512,
-                768,
-                7,
-                OwnAddressType.RANDOM_DEVICE_ADDRESS,
-                PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS,
-                'A6:A5:A4:A3:A2:A1',
-                AdvertisingFilterPolicy.ALL_DEVICES,
-                0x7F,
-                0,  # SID
-                Enable.DISABLED  # Scan request notification
-            ))
-
-        self.cert_hal.send_hci_command(
-            LeSetExtendedAdvertisingRandomAddressBuilder(advertising_handle, '0C:05:04:03:02:01'))
-
-        gap_name = GapData()
-        gap_name.data_type = GapDataType.COMPLETE_LOCAL_NAME
-        gap_name.data = list(bytes(b'Im_A_Cert'))
-
-        self.cert_hal.send_hci_command(
-            LeSetExtendedAdvertisingDataBuilder(advertising_handle, Operation.COMPLETE_ADVERTISEMENT,
-                                                FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_name]))
-        enabled_set = EnabledSet()
-        enabled_set.advertising_handle = 1
-        enabled_set.duration = 0
-        enabled_set.max_extended_advertising_events = 0
-        self.cert_hal.send_hci_command(LeSetExtendedAdvertisingEnableBuilder(Enable.ENABLED, [enabled_set]))
+        advertisement = self.cert_hal.create_advertisement(
+            1,
+            '0C:05:04:03:02:01',
+            min_interval=512,
+            max_interval=768,
+            peer_address='A6:A5:A4:A3:A2:A1',
+            tx_power=0x7f,
+            sid=0)
+        advertisement.set_data(b'Im_A_Cert')
+        advertisement.start()
 
         # LeConnectionComplete
         self._verify_le_connection_complete()