OSDN Git Service

Start migrating l2cap packet matchers to their own file
authorZach Johnson <zachoverflow@google.com>
Mon, 2 Mar 2020 05:26:18 +0000 (21:26 -0800)
committerZach Johnson <zachoverflow@google.com>
Mon, 2 Mar 2020 05:27:57 +0000 (21:27 -0800)
Test: cert/run --host --test_filter=L2capTest
Change-Id: Ia3aab45455042b10273a0741702faa7ea743c7b4

gd/cert/matchers.py [new file with mode: 0644]
gd/l2cap/classic/cert/l2cap_test.py

diff --git a/gd/cert/matchers.py b/gd/cert/matchers.py
new file mode 100644 (file)
index 0000000..c8b0511
--- /dev/null
@@ -0,0 +1,63 @@
+#!/usr/bin/env python3
+#
+#   Copyright 2020 - The Android Open Source Project
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+
+import bluetooth_packets_python3 as bt_packets
+from bluetooth_packets_python3 import l2cap_packets
+from bluetooth_packets_python3.l2cap_packets import CommandCode
+
+
+class L2capMatchers(object):
+
+    @staticmethod
+    def ConnectionRequest():
+        return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.CONNECTION_REQUEST)
+
+    @staticmethod
+    def ConfigurationResponse():
+        return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.CONFIGURATION_RESPONSE)
+
+    @staticmethod
+    def ConfigurationRequest():
+        return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.CONFIGURATION_REQUEST)
+
+    @staticmethod
+    def DisconnectionRequest():
+        return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.DISCONNECTION_REQUEST)
+
+    @staticmethod
+    def _basic_frame(packet):
+        if packet is None:
+            return None
+        return l2cap_packets.BasicFrameView(
+            bt_packets.PacketViewLittleEndian(list(packet.payload)))
+
+    @staticmethod
+    def _control_frame(packet):
+        frame = L2capMatchers._basic_frame(packet)
+        if frame is None or frame.GetChannelId() != 1:
+            return None
+        return l2cap_packets.ControlView(frame.GetPayload())
+
+    @staticmethod
+    def _control_frame_with_code(packet, code):
+        frame = L2capMatchers._control_frame(packet)
+        if frame is None or frame.GetCode() != code:
+            return None
+        return frame
+
+    @staticmethod
+    def _is_control_frame_with_code(packet, code):
+        return L2capMatchers._control_frame_with_code(packet, code) is not None
index 873dd83..27c494b 100644 (file)
@@ -23,6 +23,7 @@ from cert.truth import assertThat
 from cert.closable import safeClose
 from cert.py_l2cap import PyL2cap
 from cert.py_acl_manager import PyAclManager
+from cert.matchers import L2capMatchers
 from facade import common_pb2
 from facade import rootservice_pb2 as facade_rootservice
 from google.protobuf import empty_pb2 as empty_proto
@@ -283,40 +284,6 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
         return l2cap_packets.BasicFrameView(
             bt_packets.PacketViewLittleEndian(list(l2cap_packet.payload)))
 
-    def get_control_frame(self, l2cap_packet, code):
-        l2cap_view = self.get_basic_frame(l2cap_packet)
-        if l2cap_view.GetChannelId() != 1:
-            return None
-        l2cap_control_view = l2cap_packets.ControlView(l2cap_view.GetPayload())
-        if l2cap_control_view.GetCode() != code:
-            return None
-        return l2cap_control_view
-
-    def is_correct_connection_request(self, l2cap_packet):
-        return self.get_control_frame(
-            l2cap_packet,
-            l2cap_packets.CommandCode.CONNECTION_REQUEST) is not None
-
-    def is_correct_configuration_response(self, l2cap_packet):
-        control_frame = self.get_control_frame(
-            l2cap_packet, l2cap_packets.CommandCode.CONFIGURATION_RESPONSE)
-        if control_frame is None:
-            return False
-        configuration_response_view = l2cap_packets.ConfigurationResponseView(
-            control_frame)
-        return configuration_response_view.GetResult(
-        ) == l2cap_packets.ConfigurationResponseResult.SUCCESS
-
-    def is_correct_configuration_request(self, l2cap_packet):
-        return self.get_control_frame(
-            l2cap_packet,
-            l2cap_packets.CommandCode.CONFIGURATION_REQUEST) is not None
-
-    def is_correct_disconnection_request(self, l2cap_packet):
-        return self.get_control_frame(
-            l2cap_packet,
-            l2cap_packets.CommandCode.DISCONNECTION_REQUEST) is not None
-
     def cert_send_b_frame(self, b_frame):
         self.cert_acl.send(b_frame.Serialize())
 
@@ -473,8 +440,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
             mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
 
         assertThat(self.cert_acl).emits(
-            self.is_correct_configuration_response,
-            self.is_correct_configuration_request).inAnyOrder()
+            L2capMatchers.ConfigurationResponse(),
+            L2capMatchers.ConfigurationRequest()).inAnyOrder()
 
         dcid = self.scid_to_dcid[scid]
 
@@ -501,7 +468,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
         self.dut.l2cap.OpenChannel(
             l2cap_facade_pb2.OpenChannelRequest(
                 remote=self.cert_address, psm=psm))
-        assertThat(self.cert_acl).emits(self.is_correct_connection_request)
+        assertThat(self.cert_acl).emits(L2capMatchers.ConnectionRequest())
 
     def test_accept_disconnect(self):
         """
@@ -583,9 +550,9 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
             psm,
             mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC)
 
-        assertThat(self.cert_acl).emits(self.is_correct_configuration_response)
+        assertThat(self.cert_acl).emits(L2capMatchers.ConfigurationResponse())
         assertThat(self.cert_acl).emits(
-            self.is_correct_configuration_request, at_least_times=2)
+            L2capMatchers.ConfigurationRequest(), at_least_times=2)
 
     def test_respond_to_echo_request(self):
         """
@@ -753,8 +720,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
             mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
 
         assertThat(self.cert_acl).emits(
-            self.is_correct_configuration_response,
-            self.is_correct_configuration_request).inAnyOrder()
+            L2capMatchers.ConfigurationResponse(),
+            L2capMatchers.ConfigurationRequest()).inAnyOrder()
 
         self.dut.l2cap.SendDynamicChannelPacket(
             l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
@@ -781,8 +748,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
             mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
 
         assertThat(self.cert_acl).emits(
-            self.is_correct_configuration_response,
-            self.is_correct_configuration_request).inAnyOrder()
+            L2capMatchers.ConfigurationResponse(),
+            L2capMatchers.ConfigurationRequest()).inAnyOrder()
 
         self.dut.l2cap.SendDynamicChannelPacket(
             l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
@@ -809,8 +776,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
             mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
 
         assertThat(self.cert_acl).emits(
-            self.is_correct_configuration_response,
-            self.is_correct_configuration_request).inAnyOrder()
+            L2capMatchers.ConfigurationResponse(),
+            L2capMatchers.ConfigurationRequest()).inAnyOrder()
 
         self.dut.l2cap.SendDynamicChannelPacket(
             l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
@@ -838,8 +805,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
         dcid = self.scid_to_dcid[scid]
 
         assertThat(self.cert_acl).emits(
-            self.is_correct_configuration_response,
-            self.is_correct_configuration_request).inAnyOrder()
+            L2capMatchers.ConfigurationResponse(),
+            L2capMatchers.ConfigurationRequest()).inAnyOrder()
 
         self.dut.l2cap.SendDynamicChannelPacket(
             l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
@@ -892,8 +859,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
         dcid = self.scid_to_dcid[scid]
 
         assertThat(self.cert_acl).emits(
-            self.is_correct_configuration_response,
-            self.is_correct_configuration_request).inAnyOrder()
+            L2capMatchers.ConfigurationResponse(),
+            L2capMatchers.ConfigurationRequest()).inAnyOrder()
 
         for i in range(3):
             i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
@@ -952,8 +919,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
         dcid = self.scid_to_dcid[scid]
 
         assertThat(self.cert_acl).emits(
-            self.is_correct_configuration_response,
-            self.is_correct_configuration_request).inAnyOrder()
+            L2capMatchers.ConfigurationResponse(),
+            L2capMatchers.ConfigurationRequest()).inAnyOrder()
 
         for i in range(3):
             i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
@@ -994,8 +961,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
         dcid = self.scid_to_dcid[scid]
 
         assertThat(self.cert_acl).emits(
-            self.is_correct_configuration_response,
-            self.is_correct_configuration_request).inAnyOrder()
+            L2capMatchers.ConfigurationResponse(),
+            L2capMatchers.ConfigurationRequest()).inAnyOrder()
 
         self.dut.l2cap.SendDynamicChannelPacket(
             l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
@@ -1040,8 +1007,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
         dcid = self.scid_to_dcid[scid]
 
         assertThat(self.cert_acl).emits(
-            self.is_correct_configuration_response,
-            self.is_correct_configuration_request).inAnyOrder()
+            L2capMatchers.ConfigurationResponse(),
+            L2capMatchers.ConfigurationRequest()).inAnyOrder()
 
         self.dut.l2cap.SendDynamicChannelPacket(
             l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
@@ -1087,8 +1054,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
             mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
 
         assertThat(self.cert_acl).emits(
-            self.is_correct_configuration_response,
-            self.is_correct_configuration_request).inAnyOrder()
+            L2capMatchers.ConfigurationResponse(),
+            L2capMatchers.ConfigurationRequest()).inAnyOrder()
 
         self.dut.l2cap.SendDynamicChannelPacket(
             l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
@@ -1118,8 +1085,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
             mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
 
         assertThat(self.cert_acl).emits(
-            self.is_correct_configuration_response,
-            self.is_correct_configuration_request).inAnyOrder()
+            L2capMatchers.ConfigurationResponse(),
+            L2capMatchers.ConfigurationRequest()).inAnyOrder()
 
         dcid = self.scid_to_dcid[scid]
 
@@ -1152,8 +1119,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
             mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
 
         assertThat(self.cert_acl).emits(
-            self.is_correct_configuration_response,
-            self.is_correct_configuration_request).inAnyOrder()
+            L2capMatchers.ConfigurationResponse(),
+            L2capMatchers.ConfigurationRequest()).inAnyOrder()
 
         dcid = self.scid_to_dcid[scid]
 
@@ -1162,7 +1129,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
 
         # Retransmission timer = 2, 20 * monitor timer = 360, so total timeout is 362
         time.sleep(362)
-        assertThat(self.cert_acl).emits(self.is_correct_disconnection_request)
+        assertThat(self.cert_acl).emits(L2capMatchers.DisconnectionRequest())
 
     def test_i_frame_transmissions_exceed_max_transmit(self):
         """
@@ -1184,8 +1151,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
             mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
 
         assertThat(self.cert_acl).emits(
-            self.is_correct_configuration_response,
-            self.is_correct_configuration_request).inAnyOrder()
+            L2capMatchers.ConfigurationResponse(),
+            L2capMatchers.ConfigurationRequest()).inAnyOrder()
 
         dcid = self.scid_to_dcid[scid]
 
@@ -1200,7 +1167,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
             l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE, 0)
         self.cert_send_b_frame(s_frame)
 
-        assertThat(self.cert_acl).emits(self.is_correct_disconnection_request)
+        assertThat(self.cert_acl).emits(L2capMatchers.DisconnectionRequest())
 
     def test_respond_to_rej(self):
         """
@@ -1223,8 +1190,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
             mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
 
         assertThat(self.cert_acl).emits(
-            self.is_correct_configuration_response,
-            self.is_correct_configuration_request).inAnyOrder()
+            L2capMatchers.ConfigurationResponse(),
+            L2capMatchers.ConfigurationRequest()).inAnyOrder()
 
         dcid = self.scid_to_dcid[scid]
 
@@ -1267,8 +1234,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
             mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
 
         assertThat(self.cert_acl).emits(
-            self.is_correct_configuration_response,
-            self.is_correct_configuration_request).inAnyOrder()
+            L2capMatchers.ConfigurationResponse(),
+            L2capMatchers.ConfigurationRequest()).inAnyOrder()
 
         dcid = self.scid_to_dcid[scid]
 
@@ -1309,8 +1276,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
             mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
 
         assertThat(self.cert_acl).emits(
-            self.is_correct_configuration_response,
-            self.is_correct_configuration_request).inAnyOrder()
+            L2capMatchers.ConfigurationResponse(),
+            L2capMatchers.ConfigurationRequest()).inAnyOrder()
 
         dcid = self.scid_to_dcid[scid]
 
@@ -1352,8 +1319,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
             mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
 
         assertThat(self.cert_acl).emits(
-            self.is_correct_configuration_response,
-            self.is_correct_configuration_request).inAnyOrder()
+            L2capMatchers.ConfigurationResponse(),
+            L2capMatchers.ConfigurationRequest()).inAnyOrder()
 
         dcid = self.scid_to_dcid[scid]
 
@@ -1395,8 +1362,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
             mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
 
         assertThat(self.cert_acl).emits(
-            self.is_correct_configuration_response,
-            self.is_correct_configuration_request).inAnyOrder()
+            L2capMatchers.ConfigurationResponse(),
+            L2capMatchers.ConfigurationRequest()).inAnyOrder()
 
         dcid = self.scid_to_dcid[scid]
 
@@ -1453,8 +1420,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
             mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
 
         assertThat(self.cert_acl).emits(
-            self.is_correct_configuration_response,
-            self.is_correct_configuration_request).inAnyOrder()
+            L2capMatchers.ConfigurationResponse(),
+            L2capMatchers.ConfigurationRequest()).inAnyOrder()
 
         dcid = self.scid_to_dcid[scid]
 
@@ -1510,8 +1477,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
             mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
 
         assertThat(self.cert_acl).emits(
-            self.is_correct_configuration_response,
-            self.is_correct_configuration_request).inAnyOrder()
+            L2capMatchers.ConfigurationResponse(),
+            L2capMatchers.ConfigurationRequest()).inAnyOrder()
 
         dcid = self.scid_to_dcid[scid]
 
@@ -1569,8 +1536,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
             mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
 
         assertThat(self.cert_acl).emits(
-            self.is_correct_configuration_response,
-            self.is_correct_configuration_request).inAnyOrder()
+            L2capMatchers.ConfigurationResponse(),
+            L2capMatchers.ConfigurationRequest()).inAnyOrder()
 
         dcid = self.scid_to_dcid[scid]
 
@@ -1628,7 +1595,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
 
         # TODO: Fix this test. It doesn't work so far with PDL struct
 
-        assertThat(self.cert_acl).emits(self.is_correct_configuration_request)
+        assertThat(self.cert_acl).emits(L2capMatchers.ConfigurationRequest())
         asserts.skip("Struct not working")
 
     def test_respond_configuration_request_ertm(self):
@@ -1651,4 +1618,4 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
         self.cert_send_b_frame(open_channel_l2cap)
 
         # TODO: Verify that the type should be ERTM
-        assertThat(self.cert_acl).emits(self.is_correct_configuration_response)
+        assertThat(self.cert_acl).emits(L2capMatchers.ConfigurationResponse())