OSDN Git Service

HciCaptures: Add CommandComplete
authorMyles Watson <mylesgw@google.com>
Thu, 24 Sep 2020 15:51:14 +0000 (08:51 -0700)
committerMyles Watson <mylesgw@google.com>
Fri, 25 Sep 2020 22:47:55 +0000 (15:47 -0700)
Add public methods to match or extract events in matchers.py.

Call the same private static methods to extract or match events.

- CommandComplete
- LeMetaEvent
- LeConnectionComplete

Test: cert/run --host
Bug: 145832107
Tag: #gd-refactor
Change-Id: I308e97cfe60dbcd351f211ff836c0516019412db

gd/cert/captures.py
gd/cert/matchers.py

index cc50844..c10786c 100644 (file)
@@ -19,6 +19,7 @@ from bluetooth_packets_python3 import hci_packets
 from bluetooth_packets_python3 import l2cap_packets
 from bluetooth_packets_python3.l2cap_packets import CommandCode, LeCommandCode
 from cert.capture import Capture
+from cert.matchers import HciMatchers
 from cert.matchers import L2capMatchers
 from cert.matchers import SecurityMatchers
 from security.facade_pb2 import UiMsgType
@@ -65,35 +66,34 @@ class HciCaptures(object):
     @staticmethod
     def ReadBdAddrCompleteCapture():
         return Capture(
-            lambda packet: packet.event[0:5] == b'\x0e\x0a\x01\x09\x10', lambda packet: hci_packets.ReadBdAddrCompleteView(
-                hci_packets.CommandCompleteView(
-                    hci_packets.EventPacketView(bt_packets.PacketViewLittleEndian(list(packet.event))))))
+            HciMatchers.CommandComplete(hci_packets.OpCode.READ_BD_ADDR),
+            lambda packet: hci_packets.ReadBdAddrCompleteView(HciMatchers.ExtractMatchingCommandComplete(packet.event, hci_packets.OpCode.READ_BD_ADDR)))
 
     @staticmethod
     def ConnectionRequestCapture():
         return Capture(
-            lambda packet: packet.event[0:2] == b'\x04\x0a', lambda packet: hci_packets.ConnectionRequestView(
-                hci_packets.EventPacketView(bt_packets.PacketViewLittleEndian(list(packet.event)))))
+            HciMatchers.EventWithCode(hci_packets.EventCode.CONNECTION_REQUEST),
+            lambda packet: hci_packets.ConnectionRequestView(
+                HciMatchers.ExtractEventWithCode(packet.event, hci_packets.EventCode.CONNECTION_REQUEST)))
 
     @staticmethod
     def ConnectionCompleteCapture():
         return Capture(
-            lambda packet: packet.event[0:3] == b'\x03\x0b\x00', lambda packet: hci_packets.ConnectionCompleteView(
-                hci_packets.EventPacketView(bt_packets.PacketViewLittleEndian(list(packet.event)))))
+            HciMatchers.EventWithCode(hci_packets.EventCode.CONNECTION_COMPLETE),
+            lambda packet: hci_packets.ConnectionCompleteView(
+                HciMatchers.ExtractEventWithCode(packet.event, hci_packets.EventCode.CONNECTION_COMPLETE)))
 
     @staticmethod
     def DisconnectionCompleteCapture():
         return Capture(
-            lambda packet: packet.event[0:2] == b'\x05\x04', lambda packet: hci_packets.DisconnectionCompleteView(
-                hci_packets.EventPacketView(bt_packets.PacketViewLittleEndian(list(packet.event)))))
+            HciMatchers.EventWithCode(hci_packets.EventCode.DISCONNECTION_COMPLETE),
+            lambda packet: hci_packets.DisconnectionCompleteView(
+                HciMatchers.ExtractEventWithCode(packet.event, hci_packets.EventCode.DISCONNECTION_COMPLETE)))
 
     @staticmethod
     def LeConnectionCompleteCapture():
-        return Capture(
-            lambda packet: packet.event[0] == 0x3e and (packet.event[2] == 0x01 or packet.event[2] == 0x0a),
-            lambda packet: hci_packets.LeConnectionCompleteView(
-                hci_packets.LeMetaEventView(
-                    hci_packets.EventPacketView(bt_packets.PacketViewLittleEndian(list(packet.event))))))
+        return Capture(HciMatchers.LeConnectionComplete(),
+                       lambda packet: HciMatchers.ExtractLeConnectionComplete(packet.event))
 
 
 class L2capCaptures(object):
index dac7562..3a4af3c 100644 (file)
@@ -28,30 +28,86 @@ from bluetooth_packets_python3.l2cap_packets import LeCreditBasedConnectionRespo
 class HciMatchers(object):
 
     @staticmethod
-    def CommandComplete(opcode=None, num_complete=1):
-        return lambda msg: HciMatchers._is_matching_command_complete(msg.event, opcode, num_complete)
+    def CommandComplete(opcode=None):
+        return lambda msg: HciMatchers._is_matching_command_complete(msg.event, opcode)
 
     @staticmethod
-    def _is_matching_command_complete(packet_bytes, opcode=None, num_complete=1):
-        hci_event = HciMatchers.extract_hci_event_with_code(packet_bytes, EventCode.COMMAND_COMPLETE)
-        if hci_event is None:
-            return False
-        frame = hci_packets.CommandCompleteView(hci_event)
-        return (opcode is None or frame.GetCommandOpCode() == opcode) and\
-               frame.GetNumHciCommandPackets() == num_complete
+    def ExtractMatchingCommandComplete(packet_bytes, opcode=None):
+        return HciMatchers._extract_matching_command_complete(packet_bytes, opcode)
+
+    @staticmethod
+    def _is_matching_command_complete(packet_bytes, opcode=None):
+        return HciMatchers._extract_matching_command_complete(packet_bytes, opcode) is not None
+
+    @staticmethod
+    def _extract_matching_command_complete(packet_bytes, opcode=None):
+        event = HciMatchers._extract_matching_event(packet_bytes, EventCode.COMMAND_COMPLETE)
+        if event is None:
+            return None
+        complete = hci_packets.CommandCompleteView(event)
+        if opcode is None or complete is None:
+            return complete
+        else:
+            if complete.GetCommandOpCode() != opcode:
+                return None
+            else:
+                return complete
 
     @staticmethod
     def EventWithCode(event_code):
-        return lambda msg: HciMatchers.extract_hci_event_with_code(msg.event, event_code)
+        return lambda msg: HciMatchers._is_matching_event(msg.event, event_code)
 
     @staticmethod
-    def extract_hci_event_with_code(packet_bytes, event_code=None):
-        hci_event = hci_packets.EventPacketView(bt_packets.PacketViewLittleEndian(list(packet_bytes)))
-        if hci_event is None:
+    def ExtractEventWithCode(packet_bytes, event_code):
+        return HciMatchers._extract_matching_event(packet_bytes, event_code)
+
+    @staticmethod
+    def _is_matching_event(packet_bytes, event_code):
+        return HciMatchers._extract_matching_event(packet_bytes, event_code) is not None
+
+    @staticmethod
+    def _extract_matching_event(packet_bytes, event_code):
+        event = hci_packets.EventPacketView(bt_packets.PacketViewLittleEndian(list(packet_bytes)))
+        if event is None:
             return None
-        if event_code is not None and hci_event.GetEventCode() != event_code:
+        if event_code is not None and event.GetEventCode() != event_code:
             return None
-        return hci_event
+        return event
+
+    @staticmethod
+    def LeEventWithCode(subevent_code):
+        return lambda msg: HciMatchers._extract_matching_le_event(msg.event, subevent_code) is not None
+
+    @staticmethod
+    def ExtractLeEventWithCode(packet_bytes, subevent_code):
+        return HciMatchers._extract_matching_le_event(packet_bytes, subevent_code)
+
+    @staticmethod
+    def _extract_matching_le_event(packet_bytes, subevent_code):
+        event = hci_packets.LeMetaEventView(
+            HciMatchers._extract_matching_event(packet_bytes, hci_packets.EventCode.LE_META_EVENT))
+        if event is None:
+            return None
+        if event.GetSubeventCode() != subevent_code:
+            return None
+        return event
+
+    @staticmethod
+    def LeConnectionComplete():
+        return lambda msg: HciMatchers._extract_le_connection_complete(msg.event) is not None
+
+    @staticmethod
+    def ExtractLeConnectionComplete(packet_bytes):
+        return HciMatchers._extract_le_connection_complete(packet_bytes)
+
+    @staticmethod
+    def _extract_le_connection_complete(packet_bytes):
+        event = hci_packets.LeConnectionCompleteView(
+            HciMatchers._extract_matching_le_event(packet_bytes, hci_packets.SubeventCode.CONNECTION_COMPLETE))
+        if event is not None:
+            return event
+        return hci_packets.LeEnhancedConnectionCompleteView(
+            HciMatchers._extract_matching_le_event(packet_bytes, hci_packets.SubeventCode.ENHANCED_CONNECTION_COMPLETE))
 
     @staticmethod
     def LinkKeyRequest():
@@ -106,7 +162,7 @@ class NeighborMatchers(object):
 
     @staticmethod
     def _is_matching_inquiry_result(packet, address):
-        hci_event = HciMatchers.extract_hci_event_with_code(packet, EventCode.INQUIRY_RESULT)
+        hci_event = HciMatchers.ExtractEventWithCode(packet, EventCode.INQUIRY_RESULT)
         if hci_event is None:
             return False
         inquiry_view = hci_packets.InquiryResultView(hci_event)
@@ -121,7 +177,7 @@ class NeighborMatchers(object):
 
     @staticmethod
     def _is_matching_inquiry_result_with_rssi(packet, address):
-        hci_event = HciMatchers.extract_hci_event_with_code(packet, EventCode.INQUIRY_RESULT_WITH_RSSI)
+        hci_event = HciMatchers.ExtractEventWithCode(packet, EventCode.INQUIRY_RESULT_WITH_RSSI)
         if hci_event is None:
             return False
         inquiry_view = hci_packets.InquiryResultWithRssiView(hci_event)
@@ -136,7 +192,7 @@ class NeighborMatchers(object):
 
     @staticmethod
     def _is_matching_extended_inquiry_result(packet, address):
-        hci_event = HciMatchers.extract_hci_event_with_code(packet, EventCode.EXTENDED_INQUIRY_RESULT)
+        hci_event = HciMatchers.ExtractEventWithCode(packet, EventCode.EXTENDED_INQUIRY_RESULT)
         if hci_event is None:
             return False
         extended_view = hci_packets.ExtendedInquiryResultView(hci_event)