OSDN Git Service

Add some wait commands inside PyHal
authorZach Johnson <zachoverflow@google.com>
Sat, 14 Nov 2020 23:50:00 +0000 (15:50 -0800)
committerZach Johnson <zachoverflow@google.com>
Wed, 18 Nov 2020 22:14:20 +0000 (14:14 -0800)
Bug: 171749953
Tag: #gd-refactor
Test: gd/cert/run --host
Change-Id: Ic66d8fcd09f3c930de0a8b1063ec355e86df4d20

gd/cert/matchers.py
gd/cert/py_hal.py
gd/hal/cert/simple_hal_test.py

index a942f6e..a912973 100644 (file)
@@ -57,6 +57,32 @@ class HciMatchers(object):
                 return complete
 
     @staticmethod
+    def CommandStatus(opcode=None):
+        return lambda msg: HciMatchers._is_matching_command_status(msg.payload, opcode)
+
+    @staticmethod
+    def ExtractMatchingCommandStatus(packet_bytes, opcode=None):
+        return HciMatchers._extract_matching_command_complete(packet_bytes, opcode)
+
+    @staticmethod
+    def _is_matching_command_status(packet_bytes, opcode=None):
+        return HciMatchers._extract_matching_command_status(packet_bytes, opcode) is not None
+
+    @staticmethod
+    def _extract_matching_command_status(packet_bytes, opcode=None):
+        event = HciMatchers._extract_matching_event(packet_bytes, EventCode.COMMAND_STATUS)
+        if event is None:
+            return None
+        complete = hci_packets.CommandStatusView(event)
+        if opcode is None or complete is None:
+            return complete
+        else:
+            if complete.GetCommandOpCode() != opcode:
+                return None
+            else:
+                return complete
+
+    @staticmethod
     def EventWithCode(event_code):
         return lambda msg: HciMatchers._is_matching_event(msg.payload, event_code)
 
index 749b138..96fb169 100644 (file)
@@ -83,6 +83,7 @@ class PyHalAdvertisement(object):
         self.py_hal.send_hci_command(
             LeSetExtendedAdvertisingDataBuilder(self.handle, Operation.COMPLETE_ADVERTISEMENT,
                                                 FragmentPreference.CONTROLLER_SHOULD_NOT, [data]))
+        self.py_hal.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_DATA)
 
     def set_scan_response(self, shortened_name):
         data = GapData()
@@ -91,6 +92,7 @@ class PyHalAdvertisement(object):
         self.py_hal.send_hci_command(
             LeSetExtendedAdvertisingScanResponseBuilder(self.handle, Operation.COMPLETE_ADVERTISEMENT,
                                                         FragmentPreference.CONTROLLER_SHOULD_NOT, [data]))
+        self.py_hal.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_SCAN_RESPONSE)
 
     def start(self):
         enabled_set = EnabledSet()
@@ -98,8 +100,7 @@ class PyHalAdvertisement(object):
         enabled_set.duration = 0
         enabled_set.max_extended_advertising_events = 0
         self.py_hal.send_hci_command(LeSetExtendedAdvertisingEnableBuilder(Enable.ENABLED, [enabled_set]))
-        assertThat(self.py_hal.get_hci_event_stream()).emits(
-            HciMatchers.CommandComplete(OpCode.LE_SET_EXTENDED_ADVERTISING_ENABLE))
+        self.py_hal.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_ENABLE)
 
     def stop(self):
         enabled_set = EnabledSet()
@@ -107,8 +108,7 @@ class PyHalAdvertisement(object):
         enabled_set.duration = 0
         enabled_set.max_extended_advertising_events = 0
         self.py_hal.send_hci_command(LeSetExtendedAdvertisingEnableBuilder(Enable.DISABLED, [enabled_set]))
-        assertThat(self.py_hal.get_hci_event_stream()).emits(
-            HciMatchers.CommandComplete(OpCode.LE_SET_EXTENDED_ADVERTISING_ENABLE))
+        self.py_hal.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_ENABLE)
 
 
 class PyHal(Closable):
@@ -128,6 +128,12 @@ class PyHal(Closable):
     def get_hci_event_stream(self):
         return self.hci_event_stream
 
+    def wait_for_complete(self, opcode):
+        assertThat(self.hci_event_stream).emits(HciMatchers.CommandComplete(opcode))
+
+    def wait_for_status(self, opcode):
+        assertThat(self.hci_event_stream).emits(HciMatchers.CommandStatus(opcode))
+
     def get_acl_stream(self):
         return self.acl_stream
 
@@ -149,6 +155,7 @@ class PyHal(Closable):
 
     def set_random_le_address(self, addr):
         self.send_hci_command(hci_packets.LeSetRandomAddressBuilder(addr))
+        self.wait_for_complete(OpCode.LE_SET_RANDOM_ADDRESS)
 
     def set_scan_parameters(self):
         phy_scan_params = hci_packets.PhyScanParameters()
@@ -160,20 +167,23 @@ class PyHal(Closable):
             hci_packets.LeSetExtendedScanParametersBuilder(hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
                                                            hci_packets.LeScanningFilterPolicy.ACCEPT_ALL, 1,
                                                            [phy_scan_params]))
+        self.wait_for_complete(OpCode.LE_SET_EXTENDED_SCAN_PARAMETERS)
 
     def start_scanning(self):
         self.send_hci_command(
             hci_packets.LeSetExtendedScanEnableBuilder(hci_packets.Enable.ENABLED,
                                                        hci_packets.FilterDuplicates.DISABLED, 0, 0))
+        self.wait_for_complete(OpCode.LE_SET_EXTENDED_SCAN_ENABLE)
 
     def stop_scanning(self):
         self.send_hci_command(
             hci_packets.LeSetExtendedScanEnableBuilder(hci_packets.Enable.DISABLED,
                                                        hci_packets.FilterDuplicates.DISABLED, 0, 0))
+        self.wait_for_complete(OpCode.LE_SET_EXTENDED_SCAN_ENABLE)
 
     def reset(self):
         self.send_hci_command(hci_packets.ResetBuilder())
-        assertThat(self.hci_event_stream).emits(HciMatchers.CommandComplete(OpCode.RESET))
+        self.wait_for_complete(OpCode.RESET)
 
     def enable_inquiry_and_page_scan(self):
         self.send_hci_command(WriteScanEnableBuilder(ScanEnable.INQUIRY_AND_PAGE_SCAN))
@@ -218,6 +228,7 @@ class PyHal(Closable):
             hci_packets.LeExtendedCreateConnectionBuilder(
                 hci_packets.InitiatorFilterPolicy.USE_PEER_ADDRESS, hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
                 hci_packets.AddressType.RANDOM_DEVICE_ADDRESS, remote_addr, 1, [phy_scan_params]))
+        self.wait_for_status(OpCode.LE_EXTENDED_CREATE_CONNECTION)
 
     def add_to_connect_list(self, remote_addr):
         self.send_hci_command(
@@ -264,6 +275,9 @@ class PyHal(Closable):
             LeSetExtendedAdvertisingLegacyParametersBuilder(handle, properties, min_interval, max_interval, channel_map,
                                                             own_address_type, peer_address_type, peer_address,
                                                             filter_policy, tx_power, sid, scan_request_notification))
+        self.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_PARAMETERS)
 
         self.send_hci_command(LeSetExtendedAdvertisingRandomAddressBuilder(handle, own_address))
+        self.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_RANDOM_ADDRESS)
+
         return PyHalAdvertisement(handle, self)
index 81c4e5f..7839684 100644 (file)
@@ -103,7 +103,6 @@ class SimpleHalTest(GdBaseTestClass):
         self.dut_hal.stop_scanning()
 
     def test_le_connection_dut_advertises(self):
-        # Cert Connects
         self.cert_hal.set_random_le_address('0C:05:04:03:02:01')
         self.cert_hal.initiate_le_connection('0D:05:04:03:02:01')