OSDN Git Service

Add an advertisment abstraction to PyHci
authorZach Johnson <zachoverflow@google.com>
Fri, 13 Nov 2020 06:23:41 +0000 (22:23 -0800)
committerZach Johnson <zachoverflow@google.com>
Wed, 18 Nov 2020 22:12:43 +0000 (14:12 -0800)
Bug: 171749953
Tag: #gd-refactor
Test: gd/cert/run --host
Change-Id: I919e9769023f53ac645313a151fdff8e70417e26

gd/cert/py_hci.py
gd/hci/cert/direct_hci_test.py

index 2d14aaa..cae98ea 100644 (file)
@@ -24,6 +24,26 @@ from cert.captures import HciCaptures
 from bluetooth_packets_python3 import hci_packets
 from cert.truth import assertThat
 from hci.facade import facade_pb2 as hci_facade
+from cert.matchers import HciMatchers
+from bluetooth_packets_python3.hci_packets import FilterDuplicates
+from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingLegacyParametersBuilder
+from bluetooth_packets_python3.hci_packets import LegacyAdvertisingProperties
+from bluetooth_packets_python3.hci_packets import PeerAddressType
+from bluetooth_packets_python3.hci_packets import AdvertisingFilterPolicy
+from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingRandomAddressBuilder
+from bluetooth_packets_python3.hci_packets import GapData
+from bluetooth_packets_python3.hci_packets import GapDataType
+from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingDataBuilder
+from bluetooth_packets_python3.hci_packets import Operation
+from bluetooth_packets_python3.hci_packets import OwnAddressType
+from bluetooth_packets_python3.hci_packets import LeScanningFilterPolicy
+from bluetooth_packets_python3.hci_packets import Enable
+from bluetooth_packets_python3.hci_packets import FragmentPreference
+from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingScanResponseBuilder
+from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingEnableBuilder
+from bluetooth_packets_python3.hci_packets import LeSetExtendedScanEnableBuilder
+from bluetooth_packets_python3.hci_packets import EnabledSet
+from bluetooth_packets_python3.hci_packets import OpCode
 
 
 class PyHciAclConnection(IEventStream):
@@ -51,6 +71,38 @@ class PyHciAclConnection(IEventStream):
         return self.our_acl_stream.get_event_queue()
 
 
+class PyHciAdvertisement(object):
+
+    def __init__(self, handle, py_hci):
+        self.handle = handle
+        self.py_hci = py_hci
+
+    def set_data(self, complete_name):
+        data = GapData()
+        data.data_type = GapDataType.COMPLETE_LOCAL_NAME
+        data.data = list(bytes(complete_name))
+        self.py_hci.send_command_with_complete(
+            LeSetExtendedAdvertisingDataBuilder(self.handle, Operation.COMPLETE_ADVERTISEMENT,
+                                                FragmentPreference.CONTROLLER_SHOULD_NOT, [data]))
+
+    def set_scan_response(self, shortened_name):
+        data = GapData()
+        data.data_type = GapDataType.SHORTENED_LOCAL_NAME
+        data.data = list(bytes(shortened_name))
+        self.py_hci.send_command_with_complete(
+            LeSetExtendedAdvertisingScanResponseBuilder(self.handle, Operation.COMPLETE_ADVERTISEMENT,
+                                                        FragmentPreference.CONTROLLER_SHOULD_NOT, [data]))
+
+    def start(self):
+        enabled_set = EnabledSet()
+        enabled_set.advertising_handle = self.handle
+        enabled_set.duration = 0
+        enabled_set.max_extended_advertising_events = 0
+        self.py_hci.send_command_with_complete(LeSetExtendedAdvertisingEnableBuilder(Enable.ENABLED, [enabled_set]))
+        assertThat(self.py_hci.get_event_stream()).emits(
+            HciMatchers.CommandComplete(OpCode.LE_SET_EXTENDED_ADVERTISING_ENABLE))
+
+
 class PyHci(Closable):
 
     event_stream = None
@@ -139,3 +191,26 @@ class PyHci(Closable):
         if self.acl_stream is None:
             raise Exception("Please construct '%s' with acl_streaming=True!" % self.__class__.__name__)
         return PyHciAclConnection(handle, self.acl_stream, self.device)
+
+    def create_advertisement(self,
+                             handle,
+                             own_address,
+                             properties=LegacyAdvertisingProperties.ADV_IND,
+                             min_interval=400,
+                             max_interval=450,
+                             channel_map=7,
+                             own_address_type=OwnAddressType.RANDOM_DEVICE_ADDRESS,
+                             peer_address_type=PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS,
+                             peer_address='00:00:00:00:00:00',
+                             filter_policy=AdvertisingFilterPolicy.ALL_DEVICES,
+                             tx_power=0xF8,
+                             sid=1,
+                             scan_request_notification=Enable.DISABLED):
+
+        self.send_command_with_complete(
+            LeSetExtendedAdvertisingLegacyParametersBuilder(handle, properties, min_interval, max_interval, channel_map,
+                                                            own_address_type, peer_address_type, peer_address,
+                                                            filter_policy, tx_power, sid, scan_request_notification))
+
+        self.send_command_with_complete(LeSetExtendedAdvertisingRandomAddressBuilder(handle, own_address))
+        return PyHciAdvertisement(handle, self)
index c0e6a04..c9ba1b9 100644 (file)
@@ -215,52 +215,10 @@ class DirectHciTest(GdBaseTestClass):
                                               OwnAddressType.RANDOM_DEVICE_ADDRESS, AddressType.RANDOM_DEVICE_ADDRESS,
                                               '0D:05:04:03:02:01', 1, [phy_scan_params]))
 
-        # DUT Advertises
-        advertising_handle = 0
-        self.dut_hci.send_command_with_complete(
-            LeSetExtendedAdvertisingLegacyParametersBuilder(
-                advertising_handle,
-                LegacyAdvertisingProperties.ADV_IND,
-                400,
-                450,
-                7,
-                OwnAddressType.RANDOM_DEVICE_ADDRESS,
-                PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS,
-                '00:00:00:00:00:00',
-                AdvertisingFilterPolicy.ALL_DEVICES,
-                0xF8,
-                1,  #SID
-                Enable.DISABLED  # Scan request notification
-            ))
-
-        self.dut_hci.send_command_with_complete(
-            LeSetExtendedAdvertisingRandomAddressBuilder(advertising_handle, '0D:05:04:03:02:01'))
-
-        gap_name = GapData()
-        gap_name.data_type = GapDataType.COMPLETE_LOCAL_NAME
-        gap_name.data = list(bytes(b'Im_The_DUT'))
-
-        self.dut_hci.send_command_with_complete(
-            LeSetExtendedAdvertisingDataBuilder(advertising_handle, Operation.COMPLETE_ADVERTISEMENT,
-                                                FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_name]))
-
-        gap_short_name = GapData()
-        gap_short_name.data_type = GapDataType.SHORTENED_LOCAL_NAME
-        gap_short_name.data = list(bytes(b'Im_The_D'))
-
-        self.dut_hci.send_command_with_complete(
-            LeSetExtendedAdvertisingScanResponseBuilder(advertising_handle, Operation.COMPLETE_ADVERTISEMENT,
-                                                        FragmentPreference.CONTROLLER_SHOULD_NOT, [gap_short_name]))
-
-        enabled_set = EnabledSet()
-        enabled_set.advertising_handle = advertising_handle
-        enabled_set.duration = 0
-        enabled_set.max_extended_advertising_events = 0
-        self.dut_hci.send_command_with_complete(LeSetExtendedAdvertisingEnableBuilder(Enable.ENABLED, [enabled_set]))
-
-        # Check for success of Enable
-        assertThat(self.dut_hci.get_event_stream()).emits(
-            HciMatchers.CommandComplete(OpCode.LE_SET_EXTENDED_ADVERTISING_ENABLE))
+        advertisement = self.dut_hci.create_advertisement(0, '0D:05:04:03:02:01')
+        advertisement.set_data(b'Im_The_DUT')
+        advertisement.set_scan_response(b'Im_The_D')
+        advertisement.start()
 
         (dut_handle, cert_handle) = self._verify_le_connection_complete()