OSDN Git Service

LeL2cap: Add PTS test cases and allow verify from DUT
authorHansong Zhang <hsz@google.com>
Thu, 12 Mar 2020 02:13:53 +0000 (19:13 -0700)
committerHansong Zhang <hsz@google.com>
Fri, 13 Mar 2020 18:22:16 +0000 (11:22 -0700)
Bug: 145707677
Test: cert/run --host
Change-Id: I028566a37491c06bdc89864751e83844a33a894e

gd/cert/matchers.py
gd/cert/py_l2cap.py
gd/l2cap/classic/cert/l2cap_test.py
gd/l2cap/le/cert/cert_le_l2cap.py
gd/l2cap/le/cert/le_l2cap_test.py
gd/l2cap/le/facade.cc

index ac25610..4efcd0a 100644 (file)
@@ -54,6 +54,10 @@ class L2capMatchers(object):
         return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.COMMAND_REJECT)
 
     @staticmethod
+    def LeCommandReject():
+        return lambda packet: L2capMatchers._is_le_control_frame_with_code(packet, LeCommandCode.COMMAND_REJECT)
+
+    @staticmethod
     def CreditBasedConnectionRequest():
         return lambda packet: L2capMatchers._is_le_control_frame_with_code(packet, LeCommandCode.LE_CREDIT_BASED_CONNECTION_REQUEST)
 
@@ -67,6 +71,10 @@ class L2capMatchers(object):
         return lambda packet: L2capMatchers._is_matching_le_disconnection_request(packet, scid, dcid)
 
     @staticmethod
+    def LeDisconnectionResponse(scid, dcid):
+        return lambda packet: L2capMatchers._is_matching_le_disconnection_response(packet, scid, dcid)
+
+    @staticmethod
     def SFrame(req_seq=None, f=None, s=None, p=None):
         return lambda packet: L2capMatchers._is_matching_supervisory_frame(packet, req_seq, f, s, p)
 
@@ -78,11 +86,20 @@ class L2capMatchers(object):
     def Data(payload):
         return lambda packet: packet.GetPayload().GetBytes() == payload
 
+    @staticmethod
+    def FirstLeIFrame(payload, sdu_size):
+        return lambda packet: L2capMatchers._is_matching_first_le_i_frame(packet, payload, sdu_size)
+
     # this is a hack - should be removed
     @staticmethod
     def PartialData(payload):
         return lambda packet: payload in packet.GetPayload().GetBytes()
 
+    # this is a hack - should be removed
+    @staticmethod
+    def PacketPayloadRawData(payload):
+        return lambda packet: payload in packet.payload
+
     @staticmethod
     def ExtractBasicFrame(scid):
         return lambda packet: L2capMatchers._basic_frame_for(packet, scid)
@@ -151,6 +168,12 @@ class L2capMatchers(object):
         return True
 
     @staticmethod
+    def _is_matching_first_le_i_frame(packet, payload, sdu_size):
+        first_le_i_frame = l2cap_packets.FirstLeInformationFrameView(packet)
+        return first_le_i_frame.GetPayload().GetBytes(
+        ) == payload and first_le_i_frame.GetL2capSduLength() == sdu_size
+
+    @staticmethod
     def _control_frame(packet):
         if packet.GetChannelId() != 1:
             return None
@@ -207,6 +230,16 @@ class L2capMatchers(object):
         ) == dcid
 
     @staticmethod
+    def _is_matching_le_disconnection_response(packet, scid, dcid):
+        frame = L2capMatchers.le_control_frame_with_code(
+            packet, LeCommandCode.DISCONNECTION_RESPONSE)
+        if frame is None:
+            return False
+        response = l2cap_packets.LeDisconnectionResponseView(frame)
+        return response.GetSourceCid() == scid and response.GetDestinationCid(
+        ) == dcid
+
+    @staticmethod
     def _is_matching_le_disconnection_request(packet, scid, dcid):
         frame = L2capMatchers.le_control_frame_with_code(
             packet, LeCommandCode.DISCONNECTION_REQUEST)
index ae4e8c6..913e507 100644 (file)
@@ -17,6 +17,9 @@
 from l2cap.classic import facade_pb2 as l2cap_facade_pb2
 from l2cap.le import facade_pb2 as l2cap_le_facade_pb2
 from bluetooth_packets_python3 import l2cap_packets
+from cert.event_stream import EventStream
+from cert.closable import Closable, safeClose
+from google.protobuf import empty_pb2 as empty_proto
 
 
 class PyL2capChannel(object):
@@ -29,16 +32,15 @@ class PyL2capChannel(object):
         self._device.l2cap.SendDynamicChannelPacket(
             l2cap_facade_pb2.DynamicChannelPacket(psm=0x33, payload=payload))
 
-    def send_le(self, payload):
-        self._device.l2cap_le.SendDynamicChannelPacket(
-            l2cap_le_facade_pb2.DynamicChannelPacket(psm=0x33, payload=payload))
-
 
-class PyL2cap(object):
+class PyL2cap(Closable):
 
     def __init__(self, device):
         self._device = device
 
+    def close(self):
+        pass
+
     def open_channel(self,
                      psm=0x33,
                      mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC):
@@ -49,8 +51,33 @@ class PyL2cap(object):
                 psm=psm, retransmission_mode=mode))
         return PyL2capChannel(self._device, psm)
 
+
+class PyLeL2capChannel(object):
+
+    def __init__(self, device, psm):
+        self._device = device
+        self._psm = psm
+
+    def send(self, payload):
+        self._device.l2cap_le.SendDynamicChannelPacket(
+            l2cap_le_facade_pb2.DynamicChannelPacket(psm=0x33, payload=payload))
+
+
+class PyLeL2cap(Closable):
+
+    def __init__(self, device):
+        self._device = device
+        self.le_l2cap_stream = EventStream(
+            self._device.l2cap_le.FetchL2capData(empty_proto.Empty()))
+
+    def close(self):
+        safeClose(self.le_l2cap_stream)
+
+    def get_le_l2cap_stream(self):
+        return self.le_l2cap_stream
+
     def open_credit_based_flow_control_channel(self, psm=0x33):
         # todo, I don't understand what SetDynamicChannel means?
         self._device.l2cap_le.SetDynamicChannel(
             l2cap_le_facade_pb2.SetEnableDynamicChannelRequest(psm=psm))
-        return PyL2capChannel(self._device, psm)
+        return PyLeL2capChannel(self._device, psm)
index be63ff5..c303fb1 100644 (file)
@@ -63,6 +63,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
 
     def teardown_test(self):
         self.cert_l2cap.close()
+        self.dut_l2cap.close()
         super().teardown_test()
 
     def cert_send_b_frame(self, b_frame):
index 85e5728..e7bd53a 100644 (file)
@@ -46,6 +46,11 @@ class CertLeL2capChannel(IEventStream):
         frame = l2cap_packets.BasicFrameBuilder(self._dcid, packet)
         self._acl.send(frame.Serialize())
 
+    def send_first_le_i_frame(self, sdu_size, packet):
+        frame = l2cap_packets.FirstLeInformationFrameBuilder(
+            self._dcid, sdu_size, packet)
+        self._acl.send(frame.Serialize())
+
     def disconnect_and_verify(self):
         assertThat(self._scid).isNotEqualTo(1)
         self._control_channel.send(
@@ -53,13 +58,16 @@ class CertLeL2capChannel(IEventStream):
                                                       self._scid))
 
         assertThat(self._control_channel).emits(
-            L2capMatchers.DisconnectionResponse(self._scid, self._dcid))
+            L2capMatchers.LeDisconnectionResponse(self._scid, self._dcid))
 
-    def get_scid(self):
-        return self._scid
+    def verify_disconnect_request(self):
+        assertThat(self._control_channel).emits(
+            L2capMatchers.LeDisconnectionRequest(self._dcid, self._scid))
 
-    def get_dcid(self):
-        return self._dcid
+    def send_credits(self, num_credits):
+        self._control_channel.send(
+            l2cap_packets.LeFlowControlCreditBuilder(2, self._scid,
+                                                     num_credits))
 
 
 class CertLeL2cap(Closable):
@@ -94,10 +102,16 @@ class CertLeL2cap(Closable):
             control_channel=None)
         self._get_acl_stream().register_callback(self._handle_control_packet)
 
-    def open_channel(self, signal_id, psm, scid, initial_credit=6):
+    def open_channel(self,
+                     signal_id,
+                     psm,
+                     scid,
+                     mtu=1000,
+                     mps=100,
+                     initial_credit=6):
         self.control_channel.send(
             l2cap_packets.LeCreditBasedConnectionRequestBuilder(
-                signal_id, psm, scid, 2000, 1000, initial_credit))
+                signal_id, psm, scid, mtu, mps, initial_credit))
 
         response = L2capCaptures.CreditBasedConnectionResponse(scid)
         assertThat(self.control_channel).emits(response)
@@ -116,91 +130,6 @@ class CertLeL2cap(Closable):
     def _get_acl_stream(self):
         return self._le_acl_manager.get_le_acl_stream()
 
-    def _on_connection_request_default(self, l2cap_control_view):
-        connection_request_view = l2cap_packets.ConnectionRequestView(
-            l2cap_control_view)
-        sid = connection_request_view.GetIdentifier()
-        cid = connection_request_view.GetSourceCid()
-
-        self.scid_to_dcid[cid] = cid
-
-        connection_response = l2cap_packets.ConnectionResponseBuilder(
-            sid, cid, cid, l2cap_packets.ConnectionResponseResult.SUCCESS,
-            l2cap_packets.ConnectionResponseStatus.
-            NO_FURTHER_INFORMATION_AVAILABLE)
-        self.control_channel.send(connection_response)
-        return True
-
-    def _on_connection_response_default(self, l2cap_control_view):
-        connection_response_view = l2cap_packets.ConnectionResponseView(
-            l2cap_control_view)
-        sid = connection_response_view.GetIdentifier()
-        scid = connection_response_view.GetSourceCid()
-        dcid = connection_response_view.GetDestinationCid()
-        self.scid_to_dcid[scid] = dcid
-
-        options = []
-        if self.ertm_option is not None:
-            options.append(self.ertm_option)
-        if self.fcs_option is not None:
-            options.append(self.fcs_option)
-
-        config_request = l2cap_packets.ConfigurationRequestBuilder(
-            sid + 1, dcid, l2cap_packets.Continuation.END, options)
-        self.control_channel.send(config_request)
-        return True
-
-    def _on_connection_response_configuration_request_with_unknown_options_and_hint(
-            self, l2cap_control_view):
-        connection_response_view = l2cap_packets.ConnectionResponseView(
-            l2cap_control_view)
-        sid = connection_response_view.GetIdentifier()
-        scid = connection_response_view.GetSourceCid()
-        dcid = connection_response_view.GetDestinationCid()
-        self.scid_to_dcid[scid] = dcid
-
-        mtu_opt = l2cap_packets.MtuConfigurationOption()
-        mtu_opt.mtu = 0x1234
-        mtu_opt.is_hint = l2cap_packets.ConfigurationOptionIsHint.OPTION_IS_A_HINT
-
-        options = [mtu_opt]
-        config_request = l2cap_packets.ConfigurationRequestBuilder(
-            sid + 1, dcid, l2cap_packets.Continuation.END, options)
-        config_request_l2cap = l2cap_packets.BasicFrameBuilder(
-            1, config_request)
-
-        byte_array = bytearray(config_request_l2cap.Serialize())
-        ## Modify configuration option type to be a unknown
-        byte_array[12] |= 0x7f
-        self._acl.send(bytes(byte_array))
-        return True
-
-    def _on_connection_response_configuration_request_with_continuation_flag(
-            self, l2cap_control_view):
-        connection_response_view = l2cap_packets.ConnectionResponseView(
-            l2cap_control_view)
-        sid = connection_response_view.GetIdentifier()
-        scid = connection_response_view.GetSourceCid()
-        dcid = connection_response_view.GetDestinationCid()
-        self.scid_to_dcid[scid] = dcid
-
-        mtu_opt = l2cap_packets.MtuConfigurationOption()
-        mtu_opt.mtu = 0x1234
-
-        options = [mtu_opt]
-        config_request = l2cap_packets.ConfigurationRequestBuilder(
-            sid + 1, dcid, l2cap_packets.Continuation.CONTINUE, options)
-
-        self.control_channel.send(config_request)
-
-        flush_timeout_option = l2cap_packets.FlushTimeoutConfigurationOption()
-        flush_timeout_option.flush_timeout = 65535
-        option = [flush_timeout_option]
-        config_request = l2cap_packets.ConfigurationRequestBuilder(
-            sid + 2, dcid, l2cap_packets.Continuation.END, option)
-        self.get_control_channel().send(config_request)
-        return True
-
     def _on_disconnection_request_default(self, l2cap_control_view):
         disconnection_request = l2cap_packets.DisconnectionRequestView(
             l2cap_control_view)
index 0cc511b..1a9d6f7 100644 (file)
@@ -21,7 +21,7 @@ from cert.gd_base_test_facade_only import GdFacadeOnlyBaseTestClass
 from cert.event_stream import EventStream
 from cert.truth import assertThat
 from cert.closable import safeClose
-from cert.py_l2cap import PyL2cap
+from cert.py_l2cap import PyLeL2cap
 from cert.py_acl_manager import PyAclManager
 from cert.matchers import L2capMatchers
 from facade import common_pb2 as common
@@ -53,11 +53,12 @@ class LeL2capTest(GdFacadeOnlyBaseTestClass):
             empty_proto.Empty()).address
         self.cert_address = common.BluetoothAddress(address=self.cert.address)
 
-        self.dut_l2cap = PyL2cap(self.dut)
+        self.dut_l2cap = PyLeL2cap(self.dut)
         self.cert_l2cap = CertLeL2cap(self.cert)
 
     def teardown_test(self):
         self.cert_l2cap.close()
+        self.dut_l2cap.close()
         super().teardown_test()
 
     def _setup_link_from_cert(self):
@@ -90,22 +91,86 @@ class LeL2capTest(GdFacadeOnlyBaseTestClass):
                                   signal_id=1,
                                   scid=0x0101,
                                   psm=0x33,
+                                  mtu=1000,
+                                  mps=100,
                                   initial_credit=6):
 
         dut_channel = self.dut_l2cap.open_credit_based_flow_control_channel(psm)
-        cert_channel = self.cert_l2cap.open_channel(signal_id, psm, scid,
-                                                    initial_credit)
+        cert_channel = self.cert_l2cap.open_channel(signal_id, psm, scid, mtu,
+                                                    mps, initial_credit)
 
         return (dut_channel, cert_channel)
 
+    def test_segmentation(self):
+        """
+        L2CAP/COS/CFC/BV-01-C
+        """
+        self._setup_link_from_cert()
+        (dut_channel, cert_channel) = self._open_unvalidated_channel(
+            mtu=1000, mps=102)
+        dut_channel.send(b'hello' * 20 + b'world')
+        # The first LeInformation packet contains 2 bytes of SDU size.
+        # The packet is divided into first 100 bytes from 'hellohello....'
+        # and remaining 5 bytes 'world'
+        assertThat(cert_channel).emits(
+            L2capMatchers.FirstLeIFrame(b'hello' * 20, sdu_size=105),
+            L2capMatchers.Data(b'world')).inOrder()
+
+    def test_no_segmentation(self):
+        """
+        L2CAP/COS/CFC/BV-02-C
+        """
+        self._setup_link_from_cert()
+        (dut_channel, cert_channel) = self._open_unvalidated_channel(
+            mtu=1000, mps=202)
+        dut_channel.send(b'hello' * 40)
+        assertThat(cert_channel).emits(
+            L2capMatchers.FirstLeIFrame(b'hello' * 40, sdu_size=200))
+
+    def test_reassembling(self):
+        """
+        L2CAP/COS/CFC/BV-03-C
+        """
+        self._setup_link_from_cert()
+        (dut_channel, cert_channel) = self._open_unvalidated_channel()
+        sdu_size_for_two_sample_packet = 12
+        cert_channel.send_first_le_i_frame(sdu_size_for_two_sample_packet,
+                                           SAMPLE_PACKET)
+        cert_channel.send(SAMPLE_PACKET)
+        assertThat(self.dut_l2cap.le_l2cap_stream).emits(
+            L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00' * 2))
+
+    def test_data_receiving(self):
+        """
+        L2CAP/COS/CFC/BV-04-C
+        """
+        self._setup_link_from_cert()
+        (dut_channel, cert_channel) = self._open_unvalidated_channel()
+        cert_channel.send_first_le_i_frame(6, SAMPLE_PACKET)
+        assertThat(self.dut_l2cap.le_l2cap_stream).emits(
+            L2capMatchers.PacketPayloadRawData(b'\x01\x01\x02\x00\x00\x00'))
+
+    def test_reject_unknown_command_in_le_sigling_channel(self):
+        """
+        L2CAP/LE/REJ/BI-01-C
+        """
+        self._setup_link_from_cert()
+        self.cert_l2cap.get_control_channel().send(
+            l2cap_packets.InformationRequestBuilder(
+                2, l2cap_packets.InformationRequestInfoType.
+                EXTENDED_FEATURES_SUPPORTED))
+        assertThat(self.cert_l2cap.get_control_channel()).emits(
+            L2capMatchers.LeCommandReject())
+
     def test_credit_based_connection_response_on_supported_le_psm(self):
         """
         L2CAP/LE/CFC/BV-03-C
         """
         self._setup_link_from_cert()
         (dut_channel, cert_channel) = self._open_unvalidated_channel()
-        dut_channel.send_le(b'hello')
-        assertThat(cert_channel).emits(L2capMatchers.PartialData(b'hello'))
+        dut_channel.send(b'hello')
+        assertThat(cert_channel).emits(
+            L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5))
 
     def test_credit_based_connection_request_unsupported_le_psm(self):
         """
@@ -129,31 +194,31 @@ class LeL2capTest(GdFacadeOnlyBaseTestClass):
         (dut_channel,
          cert_channel) = self._open_unvalidated_channel(initial_credit=0)
         for _ in range(4):
-            dut_channel.send_le(b'hello')
-        self.cert_l2cap.get_control_channel().send(
-            l2cap_packets.LeFlowControlCreditBuilder(2, cert_channel.get_scid(),
-                                                     1))
-        assertThat(cert_channel).emits(L2capMatchers.PartialData(b'hello'))
-        self.cert_l2cap.get_control_channel().send(
-            l2cap_packets.LeFlowControlCreditBuilder(3, cert_channel.get_scid(),
-                                                     1))
-        assertThat(cert_channel).emits(L2capMatchers.PartialData(b'hello'))
-        self.cert_l2cap.get_control_channel().send(
-            l2cap_packets.LeFlowControlCreditBuilder(4, cert_channel.get_scid(),
-                                                     2))
+            dut_channel.send(b'hello')
+        cert_channel.send_credits(1)
+        assertThat(cert_channel).emits(
+            L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5))
+        cert_channel.send_credits(1)
         assertThat(cert_channel).emits(
-            L2capMatchers.PartialData(b'hello'), at_least_times=2)
+            L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5))
+        cert_channel.send_credits(2)
+        assertThat(cert_channel).emits(
+            L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5),
+            L2capMatchers.FirstLeIFrame(b'hello', sdu_size=5))
 
-    def test_acredit_exchange_exceed_initial_credits(self):
+    def test_credit_exchange_exceed_initial_credits(self):
         """
         L2CAP/LE/CFC/BI-01-C
         """
         self._setup_link_from_cert()
-        (dut_channel,
-         cert_channel) = self._open_unvalidated_channel(initial_credit=100)
-        self.cert_l2cap.get_control_channel().send(
-            l2cap_packets.LeFlowControlCreditBuilder(2, cert_channel.get_scid(),
-                                                     65500))
-        assertThat(self.cert_l2cap.get_control_channel()).emits(
-            L2capMatchers.LeDisconnectionRequest(cert_channel.get_dcid(),
-                                                 cert_channel.get_scid()))
+        (dut_channel, cert_channel) = self._open_unvalidated_channel()
+        cert_channel.send_credits(65535)
+        cert_channel.verify_disconnect_request()
+
+    def test_disconnection_response(self):
+        """
+        L2CAP/LE/CFC/BV-09-C
+        """
+        self._setup_link_from_cert()
+        (dut_channel, cert_channel) = self._open_unvalidated_channel()
+        cert_channel.disconnect_and_verify()
index a5a4548..d1800f8 100644 (file)
@@ -96,7 +96,13 @@ class L2capLeModuleFacadeService : public L2capLeModuleFacade::Service {
     }
 
     void on_l2cap_service_registration_complete(DynamicChannelManager::RegistrationResult registration_result,
-                                                std::unique_ptr<DynamicChannelService> service) {}
+                                                std::unique_ptr<DynamicChannelService> service) {
+      if (registration_result != DynamicChannelManager::RegistrationResult::SUCCESS) {
+        LOG_ERROR("Service registration failed");
+      } else {
+        service_ = std::move(service);
+      }
+    }
 
     // invoked from Facade Handler
     void on_connection_open(std::unique_ptr<DynamicChannel> channel) {