OSDN Git Service

Add information response matchers
authorZach Johnson <zachoverflow@google.com>
Tue, 3 Mar 2020 20:43:04 +0000 (12:43 -0800)
committerZach Johnson <zachoverflow@google.com>
Tue, 3 Mar 2020 22:32:28 +0000 (14:32 -0800)
Test: cert/run --host --test_filter=L2capTest
Change-Id: I989b35eb6ed05f1d845d332e238a44eb136f966a

gd/cert/matchers.py
gd/l2cap/classic/cert/l2cap_test.py

index 596f38d..a30becf 100644 (file)
@@ -18,6 +18,7 @@ import bluetooth_packets_python3 as bt_packets
 from bluetooth_packets_python3 import l2cap_packets
 from bluetooth_packets_python3.l2cap_packets import CommandCode
 from bluetooth_packets_python3.l2cap_packets import ConnectionResponseResult
+from bluetooth_packets_python3.l2cap_packets import InformationRequestInfoType
 import logging
 
 
@@ -73,6 +74,13 @@ class L2capMatchers(object):
         return lambda packet: L2capMatchers._basic_frame_for(packet, scid)
 
     @staticmethod
+    def InformationResponseExtendedFeatures(supports_ertm=None,
+                                            supports_streaming=None,
+                                            supports_fcs=None,
+                                            supports_fixed_channels=None):
+        return lambda packet: L2capMatchers._is_matching_information_response_extended_features(packet, supports_ertm, supports_streaming, supports_fcs, supports_fixed_channels)
+
+    @staticmethod
     def _basic_frame(packet):
         if packet is None:
             return None
@@ -163,3 +171,35 @@ class L2capMatchers(object):
         response = l2cap_packets.DisconnectionResponseView(frame)
         return response.GetSourceCid() == scid and response.GetDestinationCid(
         ) == dcid
+
+    @staticmethod
+    def _information_response_with_type(packet, info_type):
+        frame = L2capMatchers.control_frame_with_code(
+            packet, CommandCode.INFORMATION_RESPONSE)
+        if frame is None:
+            return None
+        response = l2cap_packets.InformationResponseView(frame)
+        if response.GetInfoType() != info_type:
+            return None
+        return response
+
+    @staticmethod
+    def _is_matching_information_response_extended_features(
+            packet, supports_ertm, supports_streaming, supports_fcs,
+            supports_fixed_channels):
+        frame = L2capMatchers._information_response_with_type(
+            packet, InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED)
+        if frame is None:
+            return False
+        features = l2cap_packets.InformationResponseExtendedFeaturesView(frame)
+        if supports_ertm is not None and features.GetEnhancedRetransmissionMode(
+        ) != supports_ertm:
+            return False
+        if supports_streaming is not None and features.GetStreamingMode != supports_streaming:
+            return False
+        if supports_fcs is not None and features.GetFcsOption() != supports_fcs:
+            return False
+        if supports_fixed_channels is not None and features.GetFixedChannels(
+        ) != supports_fixed_channels:
+            return False
+        return True
index 4cbf982..115295c 100644 (file)
@@ -37,6 +37,7 @@ from bluetooth_packets_python3.l2cap_packets import Final
 from bluetooth_packets_python3.l2cap_packets import CommandCode
 from bluetooth_packets_python3.l2cap_packets import SupervisoryFunction
 from bluetooth_packets_python3.l2cap_packets import Poll
+from bluetooth_packets_python3.l2cap_packets import InformationRequestInfoType
 from cert_l2cap import CertL2cap
 
 # Assemble a sample packet. TODO: Use RawBuilder
@@ -271,19 +272,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
             1, information_request)
         self.cert_send_b_frame(information_request_l2cap)
 
-        def is_correct_information_response(l2cap_packet):
-            l2cap_control_view = l2cap_packets.ControlView(
-                l2cap_packet.GetPayload())
-            if l2cap_control_view.GetCode(
-            ) != l2cap_packets.CommandCode.INFORMATION_RESPONSE:
-                return False
-            information_response_view = l2cap_packets.InformationResponseView(
-                l2cap_control_view)
-            return information_response_view.GetInfoType(
-            ) == l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED
-
         assertThat(self.cert_l2cap.get_control_channel()).emits(
-            is_correct_information_response)
+            L2capMatchers.InformationResponseExtendedFeatures())
 
     def test_extended_feature_info_response_ertm(self):
         """
@@ -301,23 +291,9 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
             1, information_request)
         self.cert_send_b_frame(information_request_l2cap)
 
-        def is_correct_information_response(l2cap_packet):
-            l2cap_control_view = l2cap_packets.ControlView(
-                l2cap_packet.GetPayload())
-            if l2cap_control_view.GetCode(
-            ) != l2cap_packets.CommandCode.INFORMATION_RESPONSE:
-                return False
-            information_response_view = l2cap_packets.InformationResponseView(
-                l2cap_control_view)
-            if information_response_view.GetInfoType(
-            ) != l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED:
-                return False
-            extended_features_view = l2cap_packets.InformationResponseExtendedFeaturesView(
-                information_response_view)
-            return extended_features_view.GetEnhancedRetransmissionMode()
-
         assertThat(self.cert_l2cap.get_control_channel()).emits(
-            is_correct_information_response)
+            L2capMatchers.InformationResponseExtendedFeatures(
+                supports_ertm=True))
 
     def test_extended_feature_info_response_streaming(self):
         """
@@ -335,27 +311,9 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
             1, information_request)
         self.cert_send_b_frame(information_request_l2cap)
 
-        def is_correct_information_response(l2cap_packet):
-            packet_bytes = l2cap_packet.payload
-            l2cap_view = l2cap_packets.BasicFrameView(
-                bt_packets.PacketViewLittleEndian(list(packet_bytes)))
-            if l2cap_view.GetChannelId() != 1:
-                return False
-            l2cap_control_view = l2cap_packets.ControlView(
-                l2cap_view.GetPayload())
-            if l2cap_control_view.GetCode(
-            ) != l2cap_packets.CommandCode.INFORMATION_RESPONSE:
-                return False
-            information_response_view = l2cap_packets.InformationResponseView(
-                l2cap_control_view)
-            if information_response_view.GetInfoType(
-            ) != l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED:
-                return False
-            extended_features_view = l2cap_packets.InformationResponseExtendedFeaturesView(
-                information_response_view)
-            return extended_features_view.GetStreamingMode()
-
-        assertThat(self.cert_acl).emits(is_correct_information_response)
+        assertThat(self.cert_l2cap.get_control_channel()).emits(
+            L2capMatchers.InformationResponseExtendedFeatures(
+                supports_streaming=True))
 
     def test_extended_feature_info_response_fcs(self):
         """
@@ -373,23 +331,9 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
             1, information_request)
         self.cert_send_b_frame(information_request_l2cap)
 
-        def is_correct_information_response(l2cap_packet):
-            l2cap_control_view = l2cap_packets.ControlView(
-                l2cap_packet.GetPayload())
-            if l2cap_control_view.GetCode(
-            ) != l2cap_packets.CommandCode.INFORMATION_RESPONSE:
-                return False
-            information_response_view = l2cap_packets.InformationResponseView(
-                l2cap_control_view)
-            if information_response_view.GetInfoType(
-            ) != l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED:
-                return False
-            extended_features_view = l2cap_packets.InformationResponseExtendedFeaturesView(
-                information_response_view)
-            return extended_features_view.GetFcsOption()
-
         assertThat(self.cert_l2cap.get_control_channel()).emits(
-            is_correct_information_response)
+            L2capMatchers.InformationResponseExtendedFeatures(
+                supports_fcs=True))
 
     def test_extended_feature_info_response_fixed_channels(self):
         """
@@ -406,27 +350,9 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
             1, information_request)
         self.cert_send_b_frame(information_request_l2cap)
 
-        def is_correct_information_response(l2cap_packet):
-            packet_bytes = l2cap_packet.payload
-            l2cap_view = l2cap_packets.BasicFrameView(
-                bt_packets.PacketViewLittleEndian(list(packet_bytes)))
-            if l2cap_view.GetChannelId() != 1:
-                return False
-            l2cap_control_view = l2cap_packets.ControlView(
-                l2cap_view.GetPayload())
-            if l2cap_control_view.GetCode(
-            ) != l2cap_packets.CommandCode.INFORMATION_RESPONSE:
-                return False
-            information_response_view = l2cap_packets.InformationResponseView(
-                l2cap_control_view)
-            if information_response_view.GetInfoType(
-            ) != l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED:
-                return False
-            extended_features_view = l2cap_packets.InformationResponseExtendedFeaturesView(
-                information_response_view)
-            return extended_features_view.GetFixedChannels()
-
-        assertThat(self.cert_acl).emits(is_correct_information_response)
+        assertThat(self.cert_l2cap.get_control_channel()).emits(
+            L2capMatchers.InformationResponseExtendedFeatures(
+                supports_fixed_channels=True))
 
     def test_config_channel_not_use_FCS(self):
         """