From: Zach Johnson Date: Mon, 2 Mar 2020 05:26:18 +0000 (-0800) Subject: Start migrating l2cap packet matchers to their own file X-Git-Url: http://git.osdn.net/view?a=commitdiff_plain;h=ce6c4376441aef8873544c0ed5796072c1443168;p=android-x86%2Fsystem-bt.git Start migrating l2cap packet matchers to their own file Test: cert/run --host --test_filter=L2capTest Change-Id: Ia3aab45455042b10273a0741702faa7ea743c7b4 --- diff --git a/gd/cert/matchers.py b/gd/cert/matchers.py new file mode 100644 index 000000000..c8b0511d8 --- /dev/null +++ b/gd/cert/matchers.py @@ -0,0 +1,63 @@ +#!/usr/bin/env python3 +# +# Copyright 2020 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import bluetooth_packets_python3 as bt_packets +from bluetooth_packets_python3 import l2cap_packets +from bluetooth_packets_python3.l2cap_packets import CommandCode + + +class L2capMatchers(object): + + @staticmethod + def ConnectionRequest(): + return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.CONNECTION_REQUEST) + + @staticmethod + def ConfigurationResponse(): + return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.CONFIGURATION_RESPONSE) + + @staticmethod + def ConfigurationRequest(): + return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.CONFIGURATION_REQUEST) + + @staticmethod + def DisconnectionRequest(): + return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.DISCONNECTION_REQUEST) + + @staticmethod + def _basic_frame(packet): + if packet is None: + return None + return l2cap_packets.BasicFrameView( + bt_packets.PacketViewLittleEndian(list(packet.payload))) + + @staticmethod + def _control_frame(packet): + frame = L2capMatchers._basic_frame(packet) + if frame is None or frame.GetChannelId() != 1: + return None + return l2cap_packets.ControlView(frame.GetPayload()) + + @staticmethod + def _control_frame_with_code(packet, code): + frame = L2capMatchers._control_frame(packet) + if frame is None or frame.GetCode() != code: + return None + return frame + + @staticmethod + def _is_control_frame_with_code(packet, code): + return L2capMatchers._control_frame_with_code(packet, code) is not None diff --git a/gd/l2cap/classic/cert/l2cap_test.py b/gd/l2cap/classic/cert/l2cap_test.py index 873dd83f3..27c494b90 100644 --- a/gd/l2cap/classic/cert/l2cap_test.py +++ b/gd/l2cap/classic/cert/l2cap_test.py @@ -23,6 +23,7 @@ from cert.truth import assertThat from cert.closable import safeClose from cert.py_l2cap import PyL2cap from cert.py_acl_manager import PyAclManager +from cert.matchers import L2capMatchers from facade import common_pb2 from facade import rootservice_pb2 as facade_rootservice from google.protobuf import empty_pb2 as empty_proto @@ -283,40 +284,6 @@ class L2capTest(GdFacadeOnlyBaseTestClass): return l2cap_packets.BasicFrameView( bt_packets.PacketViewLittleEndian(list(l2cap_packet.payload))) - def get_control_frame(self, l2cap_packet, code): - l2cap_view = self.get_basic_frame(l2cap_packet) - if l2cap_view.GetChannelId() != 1: - return None - l2cap_control_view = l2cap_packets.ControlView(l2cap_view.GetPayload()) - if l2cap_control_view.GetCode() != code: - return None - return l2cap_control_view - - def is_correct_connection_request(self, l2cap_packet): - return self.get_control_frame( - l2cap_packet, - l2cap_packets.CommandCode.CONNECTION_REQUEST) is not None - - def is_correct_configuration_response(self, l2cap_packet): - control_frame = self.get_control_frame( - l2cap_packet, l2cap_packets.CommandCode.CONFIGURATION_RESPONSE) - if control_frame is None: - return False - configuration_response_view = l2cap_packets.ConfigurationResponseView( - control_frame) - return configuration_response_view.GetResult( - ) == l2cap_packets.ConfigurationResponseResult.SUCCESS - - def is_correct_configuration_request(self, l2cap_packet): - return self.get_control_frame( - l2cap_packet, - l2cap_packets.CommandCode.CONFIGURATION_REQUEST) is not None - - def is_correct_disconnection_request(self, l2cap_packet): - return self.get_control_frame( - l2cap_packet, - l2cap_packets.CommandCode.DISCONNECTION_REQUEST) is not None - def cert_send_b_frame(self, b_frame): self.cert_acl.send(b_frame.Serialize()) @@ -473,8 +440,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass): mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM) assertThat(self.cert_acl).emits( - self.is_correct_configuration_response, - self.is_correct_configuration_request).inAnyOrder() + L2capMatchers.ConfigurationResponse(), + L2capMatchers.ConfigurationRequest()).inAnyOrder() dcid = self.scid_to_dcid[scid] @@ -501,7 +468,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass): self.dut.l2cap.OpenChannel( l2cap_facade_pb2.OpenChannelRequest( remote=self.cert_address, psm=psm)) - assertThat(self.cert_acl).emits(self.is_correct_connection_request) + assertThat(self.cert_acl).emits(L2capMatchers.ConnectionRequest()) def test_accept_disconnect(self): """ @@ -583,9 +550,9 @@ class L2capTest(GdFacadeOnlyBaseTestClass): psm, mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC) - assertThat(self.cert_acl).emits(self.is_correct_configuration_response) + assertThat(self.cert_acl).emits(L2capMatchers.ConfigurationResponse()) assertThat(self.cert_acl).emits( - self.is_correct_configuration_request, at_least_times=2) + L2capMatchers.ConfigurationRequest(), at_least_times=2) def test_respond_to_echo_request(self): """ @@ -753,8 +720,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass): mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM) assertThat(self.cert_acl).emits( - self.is_correct_configuration_response, - self.is_correct_configuration_request).inAnyOrder() + L2capMatchers.ConfigurationResponse(), + L2capMatchers.ConfigurationRequest()).inAnyOrder() self.dut.l2cap.SendDynamicChannelPacket( l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc')) @@ -781,8 +748,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass): mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM) assertThat(self.cert_acl).emits( - self.is_correct_configuration_response, - self.is_correct_configuration_request).inAnyOrder() + L2capMatchers.ConfigurationResponse(), + L2capMatchers.ConfigurationRequest()).inAnyOrder() self.dut.l2cap.SendDynamicChannelPacket( l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc')) @@ -809,8 +776,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass): mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM) assertThat(self.cert_acl).emits( - self.is_correct_configuration_response, - self.is_correct_configuration_request).inAnyOrder() + L2capMatchers.ConfigurationResponse(), + L2capMatchers.ConfigurationRequest()).inAnyOrder() self.dut.l2cap.SendDynamicChannelPacket( l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc')) @@ -838,8 +805,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass): dcid = self.scid_to_dcid[scid] assertThat(self.cert_acl).emits( - self.is_correct_configuration_response, - self.is_correct_configuration_request).inAnyOrder() + L2capMatchers.ConfigurationResponse(), + L2capMatchers.ConfigurationRequest()).inAnyOrder() self.dut.l2cap.SendDynamicChannelPacket( l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc')) @@ -892,8 +859,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass): dcid = self.scid_to_dcid[scid] assertThat(self.cert_acl).emits( - self.is_correct_configuration_response, - self.is_correct_configuration_request).inAnyOrder() + L2capMatchers.ConfigurationResponse(), + L2capMatchers.ConfigurationRequest()).inAnyOrder() for i in range(3): i_frame = l2cap_packets.EnhancedInformationFrameBuilder( @@ -952,8 +919,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass): dcid = self.scid_to_dcid[scid] assertThat(self.cert_acl).emits( - self.is_correct_configuration_response, - self.is_correct_configuration_request).inAnyOrder() + L2capMatchers.ConfigurationResponse(), + L2capMatchers.ConfigurationRequest()).inAnyOrder() for i in range(3): i_frame = l2cap_packets.EnhancedInformationFrameBuilder( @@ -994,8 +961,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass): dcid = self.scid_to_dcid[scid] assertThat(self.cert_acl).emits( - self.is_correct_configuration_response, - self.is_correct_configuration_request).inAnyOrder() + L2capMatchers.ConfigurationResponse(), + L2capMatchers.ConfigurationRequest()).inAnyOrder() self.dut.l2cap.SendDynamicChannelPacket( l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc')) @@ -1040,8 +1007,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass): dcid = self.scid_to_dcid[scid] assertThat(self.cert_acl).emits( - self.is_correct_configuration_response, - self.is_correct_configuration_request).inAnyOrder() + L2capMatchers.ConfigurationResponse(), + L2capMatchers.ConfigurationRequest()).inAnyOrder() self.dut.l2cap.SendDynamicChannelPacket( l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc')) @@ -1087,8 +1054,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass): mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM) assertThat(self.cert_acl).emits( - self.is_correct_configuration_response, - self.is_correct_configuration_request).inAnyOrder() + L2capMatchers.ConfigurationResponse(), + L2capMatchers.ConfigurationRequest()).inAnyOrder() self.dut.l2cap.SendDynamicChannelPacket( l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc')) @@ -1118,8 +1085,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass): mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM) assertThat(self.cert_acl).emits( - self.is_correct_configuration_response, - self.is_correct_configuration_request).inAnyOrder() + L2capMatchers.ConfigurationResponse(), + L2capMatchers.ConfigurationRequest()).inAnyOrder() dcid = self.scid_to_dcid[scid] @@ -1152,8 +1119,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass): mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM) assertThat(self.cert_acl).emits( - self.is_correct_configuration_response, - self.is_correct_configuration_request).inAnyOrder() + L2capMatchers.ConfigurationResponse(), + L2capMatchers.ConfigurationRequest()).inAnyOrder() dcid = self.scid_to_dcid[scid] @@ -1162,7 +1129,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass): # Retransmission timer = 2, 20 * monitor timer = 360, so total timeout is 362 time.sleep(362) - assertThat(self.cert_acl).emits(self.is_correct_disconnection_request) + assertThat(self.cert_acl).emits(L2capMatchers.DisconnectionRequest()) def test_i_frame_transmissions_exceed_max_transmit(self): """ @@ -1184,8 +1151,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass): mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM) assertThat(self.cert_acl).emits( - self.is_correct_configuration_response, - self.is_correct_configuration_request).inAnyOrder() + L2capMatchers.ConfigurationResponse(), + L2capMatchers.ConfigurationRequest()).inAnyOrder() dcid = self.scid_to_dcid[scid] @@ -1200,7 +1167,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass): l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE, 0) self.cert_send_b_frame(s_frame) - assertThat(self.cert_acl).emits(self.is_correct_disconnection_request) + assertThat(self.cert_acl).emits(L2capMatchers.DisconnectionRequest()) def test_respond_to_rej(self): """ @@ -1223,8 +1190,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass): mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM) assertThat(self.cert_acl).emits( - self.is_correct_configuration_response, - self.is_correct_configuration_request).inAnyOrder() + L2capMatchers.ConfigurationResponse(), + L2capMatchers.ConfigurationRequest()).inAnyOrder() dcid = self.scid_to_dcid[scid] @@ -1267,8 +1234,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass): mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM) assertThat(self.cert_acl).emits( - self.is_correct_configuration_response, - self.is_correct_configuration_request).inAnyOrder() + L2capMatchers.ConfigurationResponse(), + L2capMatchers.ConfigurationRequest()).inAnyOrder() dcid = self.scid_to_dcid[scid] @@ -1309,8 +1276,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass): mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM) assertThat(self.cert_acl).emits( - self.is_correct_configuration_response, - self.is_correct_configuration_request).inAnyOrder() + L2capMatchers.ConfigurationResponse(), + L2capMatchers.ConfigurationRequest()).inAnyOrder() dcid = self.scid_to_dcid[scid] @@ -1352,8 +1319,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass): mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM) assertThat(self.cert_acl).emits( - self.is_correct_configuration_response, - self.is_correct_configuration_request).inAnyOrder() + L2capMatchers.ConfigurationResponse(), + L2capMatchers.ConfigurationRequest()).inAnyOrder() dcid = self.scid_to_dcid[scid] @@ -1395,8 +1362,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass): mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM) assertThat(self.cert_acl).emits( - self.is_correct_configuration_response, - self.is_correct_configuration_request).inAnyOrder() + L2capMatchers.ConfigurationResponse(), + L2capMatchers.ConfigurationRequest()).inAnyOrder() dcid = self.scid_to_dcid[scid] @@ -1453,8 +1420,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass): mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM) assertThat(self.cert_acl).emits( - self.is_correct_configuration_response, - self.is_correct_configuration_request).inAnyOrder() + L2capMatchers.ConfigurationResponse(), + L2capMatchers.ConfigurationRequest()).inAnyOrder() dcid = self.scid_to_dcid[scid] @@ -1510,8 +1477,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass): mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM) assertThat(self.cert_acl).emits( - self.is_correct_configuration_response, - self.is_correct_configuration_request).inAnyOrder() + L2capMatchers.ConfigurationResponse(), + L2capMatchers.ConfigurationRequest()).inAnyOrder() dcid = self.scid_to_dcid[scid] @@ -1569,8 +1536,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass): mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM) assertThat(self.cert_acl).emits( - self.is_correct_configuration_response, - self.is_correct_configuration_request).inAnyOrder() + L2capMatchers.ConfigurationResponse(), + L2capMatchers.ConfigurationRequest()).inAnyOrder() dcid = self.scid_to_dcid[scid] @@ -1628,7 +1595,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass): # TODO: Fix this test. It doesn't work so far with PDL struct - assertThat(self.cert_acl).emits(self.is_correct_configuration_request) + assertThat(self.cert_acl).emits(L2capMatchers.ConfigurationRequest()) asserts.skip("Struct not working") def test_respond_configuration_request_ertm(self): @@ -1651,4 +1618,4 @@ class L2capTest(GdFacadeOnlyBaseTestClass): self.cert_send_b_frame(open_channel_l2cap) # TODO: Verify that the type should be ERTM - assertThat(self.cert_acl).emits(self.is_correct_configuration_response) + assertThat(self.cert_acl).emits(L2capMatchers.ConfigurationResponse())