OSDN Git Service

Start CertL2cap, an abstraction to make L2cap testing simpler
authorZach Johnson <zachoverflow@google.com>
Mon, 2 Mar 2020 21:11:33 +0000 (13:11 -0800)
committerZach Johnson <zachoverflow@google.com>
Tue, 3 Mar 2020 04:48:49 +0000 (20:48 -0800)
Will contain all the l2cap logic, to make the test files
just the important, simple expression of tests rather than
all the support to run a layer.

Test: cert/run --host --test_filter=L2capTest
Change-Id: Ic880d3f936daccad37fb77521b9240028047ca44

gd/cert/py_l2cap.py
gd/l2cap/classic/cert/cert_l2cap.py [new file with mode: 0644]
gd/l2cap/classic/cert/l2cap_test.py

index d332d7e..0246dfa 100644 (file)
@@ -18,16 +18,28 @@ from l2cap.classic import facade_pb2 as l2cap_facade_pb2
 from bluetooth_packets_python3 import l2cap_packets
 
 
+class PyL2capChannel(object):
+
+    def __init__(self, device, psm):
+        self._device = device
+        self._psm = psm
+
+    def send(self, payload):
+        self._device.l2cap.SendDynamicChannelPacket(
+            l2cap_facade_pb2.DynamicChannelPacket(psm=0x33, payload=payload))
+
+
 class PyL2cap(object):
 
     def __init__(self, device):
-        self.device = device
+        self._device = device
 
     def open_channel(self,
                      psm=0x33,
                      mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC):
 
         # todo, I don't understand what SetDynamicChannel means?
-        self.device.l2cap.SetDynamicChannel(
+        self._device.l2cap.SetDynamicChannel(
             l2cap_facade_pb2.SetEnableDynamicChannelRequest(
                 psm=psm, retransmission_mode=mode))
+        return PyL2capChannel(self._device, psm)
diff --git a/gd/l2cap/classic/cert/cert_l2cap.py b/gd/l2cap/classic/cert/cert_l2cap.py
new file mode 100644 (file)
index 0000000..7b1ac0b
--- /dev/null
@@ -0,0 +1,348 @@
+#!/usr/bin/env python3
+#
+#   Copyright 2020 - The Android Open Source Project
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+
+from cert.closable import Closable
+from cert.closable import safeClose
+from cert.py_acl_manager import PyAclManager
+from cert.truth import assertThat
+import bluetooth_packets_python3 as bt_packets
+from bluetooth_packets_python3 import l2cap_packets
+from bluetooth_packets_python3.l2cap_packets import CommandCode
+from cert.event_stream import FilteringEventStream
+from cert.event_stream import IEventStream
+from cert.matchers import L2capMatchers
+
+
+class CertL2capChannel(IEventStream):
+
+    def __init__(self, device, scid, acl_stream):
+        self._device = device
+        self._scid = scid
+        self._our_acl_view = acl_stream
+
+    def get_event_queue(self):
+        return self._our_acl_view.get_event_queue()
+
+
+class CertL2cap(Closable):
+
+    def __init__(self, device):
+        self._device = device
+        self._acl_manager = PyAclManager(device)
+        self._acl = None
+
+        self.control_table = {
+            CommandCode.CONNECTION_REQUEST:
+            self._on_connection_request_default,
+            CommandCode.CONNECTION_RESPONSE:
+            self._on_connection_response_default,
+            CommandCode.CONFIGURATION_REQUEST:
+            self._on_configuration_request_default,
+            CommandCode.CONFIGURATION_RESPONSE:
+            self._on_configuration_response_default,
+            CommandCode.DISCONNECTION_REQUEST:
+            self._on_disconnection_request_default,
+            CommandCode.DISCONNECTION_RESPONSE:
+            self._on_disconnection_response_default,
+            CommandCode.INFORMATION_REQUEST:
+            self._on_information_request_default,
+            CommandCode.INFORMATION_RESPONSE:
+            self._on_information_response_default
+        }
+
+        self.scid_to_dcid = {}
+        self.ertm_tx_window_size = 10
+        self.ertm_max_transmit = 20
+
+    def close(self):
+        self._acl_manager.close()
+        safeClose(self._acl)
+
+    def connect_acl(self, remote_addr):
+        self._acl = self._acl_manager.initiate_connection(remote_addr)
+        self._acl.wait_for_connection_complete()
+        self.get_acl_stream().register_callback(self._handle_control_packet)
+
+    def open_channel(self, signal_id, psm, scid):
+        # what is the 1 here for?
+        open_channel = l2cap_packets.BasicFrameBuilder(
+            1, l2cap_packets.ConnectionRequestBuilder(signal_id, psm, scid))
+        self.send_acl(open_channel)
+
+        assertThat(self._acl).emits(L2capMatchers.ConnectionResponse(scid))
+        return CertL2capChannel(self._device, scid, self.get_acl_stream())
+
+    # prefer to use channel abstraction instead, if at all possible
+    def send_acl(self, packet):
+        self._acl.send(packet.Serialize())
+
+    # temporary until clients migrated
+    def get_acl_stream(self):
+        return self._acl_manager.get_acl_stream()
+
+    # temporary until clients migrated
+    def get_acl(self):
+        return self._acl
+
+    # temporary until clients migrated
+    def get_dcid(self, scid):
+        return self.scid_to_dcid[scid]
+
+    # more of a hack for the moment
+    def turn_on_ertm(self, tx_window_size=10, max_transmit=20):
+        self.ertm_tx_window_size = tx_window_size
+        self.ertm_max_transmit = max_transmit
+        self.control_table[
+            CommandCode.
+            CONNECTION_RESPONSE] = self._on_connection_response_use_ertm
+
+    # more of a hack for the moment
+    def turn_on_ertm_and_fcs(self):
+        self.control_table[
+            CommandCode.
+            CONNECTION_RESPONSE] = self._on_connection_response_use_ertm_and_fcs
+
+    # more of a hack for the moment
+    def ignore_config_and_connections(self):
+        self.control_table[CommandCode.CONFIGURATION_REQUEST] = lambda _: True
+        self.control_table[CommandCode.CONNECTION_RESPONSE] = lambda _: True
+
+    # more of a hack for the moment
+    def reply_with_unacceptable_parameters(self):
+        self.control_table[
+            CommandCode.
+            CONFIGURATION_REQUEST] = self._on_configuration_request_unacceptable_parameters
+
+    # more of a hack for the moment
+    def reply_with_unknown_options_and_hint(self):
+        self.control_table[
+            CommandCode.
+            CONNECTION_RESPONSE] = self._on_connection_response_configuration_request_with_unknown_options_and_hint
+
+    def _on_connection_request_default(self, l2cap_control_view):
+        connection_request_view = l2cap_packets.ConnectionRequestView(
+            l2cap_control_view)
+        sid = connection_request_view.GetIdentifier()
+        cid = connection_request_view.GetSourceCid()
+
+        self.scid_to_dcid[cid] = cid
+
+        connection_response = l2cap_packets.ConnectionResponseBuilder(
+            sid, cid, cid, l2cap_packets.ConnectionResponseResult.SUCCESS,
+            l2cap_packets.ConnectionResponseStatus.
+            NO_FURTHER_INFORMATION_AVAILABLE)
+        connection_response_l2cap = l2cap_packets.BasicFrameBuilder(
+            1, connection_response)
+        self.send_acl(connection_response_l2cap)
+        return True
+
+    def _on_connection_response_default(self, l2cap_control_view):
+        connection_response_view = l2cap_packets.ConnectionResponseView(
+            l2cap_control_view)
+        sid = connection_response_view.GetIdentifier()
+        scid = connection_response_view.GetSourceCid()
+        dcid = connection_response_view.GetDestinationCid()
+        self.scid_to_dcid[scid] = dcid
+
+        config_request = l2cap_packets.ConfigurationRequestBuilder(
+            sid + 1, dcid, l2cap_packets.Continuation.END, [])
+        config_request_l2cap = l2cap_packets.BasicFrameBuilder(
+            1, config_request)
+        self.send_acl(config_request_l2cap)
+        return True
+
+    def _on_connection_response_use_ertm(self, l2cap_control_view):
+        connection_response_view = l2cap_packets.ConnectionResponseView(
+            l2cap_control_view)
+        sid = connection_response_view.GetIdentifier()
+        scid = connection_response_view.GetSourceCid()
+        dcid = connection_response_view.GetDestinationCid()
+        self.scid_to_dcid[scid] = dcid
+
+        # FIXME: This doesn't work!
+        ertm_option = l2cap_packets.RetransmissionAndFlowControlConfigurationOption(
+        )
+        ertm_option.mode = l2cap_packets.RetransmissionAndFlowControlModeOption.L2CAP_BASIC
+        ertm_option.tx_window_size = self.ertm_tx_window_size
+        ertm_option.max_transmit = self.ertm_max_transmit
+        ertm_option.retransmission_time_out = 2000
+        ertm_option.monitor_time_out = 12000
+        ertm_option.maximum_pdu_size = 1010
+
+        options = [ertm_option]
+
+        config_request = l2cap_packets.ConfigurationRequestBuilder(
+            sid + 1, dcid, l2cap_packets.Continuation.END, options)
+
+        config_request_l2cap = l2cap_packets.BasicFrameBuilder(
+            1, config_request)
+
+        self.send_acl(config_request_l2cap)
+        return True
+
+    def _on_connection_response_use_ertm_and_fcs(self, l2cap_control_view):
+        connection_response_view = l2cap_packets.ConnectionResponseView(
+            l2cap_control_view)
+        sid = connection_response_view.GetIdentifier()
+        scid = connection_response_view.GetSourceCid()
+        dcid = connection_response_view.GetDestinationCid()
+        self.scid_to_dcid[scid] = dcid
+
+        # FIXME: This doesn't work!
+        ertm_option = l2cap_packets.RetransmissionAndFlowControlConfigurationOption(
+        )
+        ertm_option.mode = l2cap_packets.RetransmissionAndFlowControlModeOption.L2CAP_BASIC
+        ertm_option.tx_window_size = self.ertm_tx_window_size
+        ertm_option.max_transmit = self.ertm_max_transmit
+        ertm_option.retransmission_time_out = 2000
+        ertm_option.monitor_time_out = 12000
+        ertm_option.maximum_pdu_size = 1010
+
+        fcs_option = l2cap_packets.FrameCheckSequenceOption()
+        fcs_option.fcs_type = l2cap_packets.FcsType.DEFAULT
+
+        options = [ertm_option, fcs_option]
+
+        config_request = l2cap_packets.ConfigurationRequestBuilder(
+            sid + 1, dcid, l2cap_packets.Continuation.END, options)
+
+        config_request_l2cap = l2cap_packets.BasicFrameBuilder(
+            1, config_request)
+
+        self.send_acl(config_request_l2cap)
+        return True
+
+    def _on_connection_response_configuration_request_with_unknown_options_and_hint(
+            self, l2cap_control_view):
+        connection_response_view = l2cap_packets.ConnectionResponseView(
+            l2cap_control_view)
+        sid = connection_response_view.GetIdentifier()
+        scid = connection_response_view.GetSourceCid()
+        dcid = connection_response_view.GetDestinationCid()
+        self.scid_to_dcid[scid] = dcid
+
+        mtu_opt = l2cap_packets.MtuConfigurationOption()
+        mtu_opt.mtu = 0x1234
+        mtu_opt.is_hint = l2cap_packets.ConfigurationOptionIsHint.OPTION_IS_A_HINT
+
+        options = [mtu_opt]
+        config_request = l2cap_packets.ConfigurationRequestBuilder(
+            sid + 1, dcid, l2cap_packets.Continuation.END, options)
+        config_request_l2cap = l2cap_packets.BasicFrameBuilder(
+            1, config_request)
+
+        byte_array = bytearray(config_request_l2cap.Serialize())
+        ## Modify configuration option type to be a unknown
+        byte_array[12] |= 0x7f
+        self._acl.send(bytes(byte_array))
+        return True
+
+    def _on_configuration_request_default(self, l2cap_control_view):
+        configuration_request = l2cap_packets.ConfigurationRequestView(
+            l2cap_control_view)
+        sid = configuration_request.GetIdentifier()
+        dcid = configuration_request.GetDestinationCid()
+        config_response = l2cap_packets.ConfigurationResponseBuilder(
+            sid, self.scid_to_dcid.get(dcid, 0), l2cap_packets.Continuation.END,
+            l2cap_packets.ConfigurationResponseResult.SUCCESS, [])
+        config_response_l2cap = l2cap_packets.BasicFrameBuilder(
+            1, config_response)
+        self.send_acl(config_response_l2cap)
+
+    def _on_configuration_request_unacceptable_parameters(
+            self, l2cap_control_view):
+        configuration_request = l2cap_packets.ConfigurationRequestView(
+            l2cap_control_view)
+        sid = configuration_request.GetIdentifier()
+        dcid = configuration_request.GetDestinationCid()
+
+        mtu_opt = l2cap_packets.MtuConfigurationOption()
+        mtu_opt.mtu = 123
+        fcs_opt = l2cap_packets.FrameCheckSequenceOption()
+        fcs_opt.fcs_type = l2cap_packets.FcsType.DEFAULT
+        rfc_opt = l2cap_packets.RetransmissionAndFlowControlConfigurationOption(
+        )
+        rfc_opt.mode = l2cap_packets.RetransmissionAndFlowControlModeOption.L2CAP_BASIC
+
+        config_response = l2cap_packets.ConfigurationResponseBuilder(
+            sid, self.scid_to_dcid.get(dcid, 0), l2cap_packets.Continuation.END,
+            l2cap_packets.ConfigurationResponseResult.UNACCEPTABLE_PARAMETERS,
+            [mtu_opt, fcs_opt, rfc_opt])
+        config_response_l2cap = l2cap_packets.BasicFrameBuilder(
+            1, config_response)
+        self.send_acl(config_response_l2cap)
+
+    def _on_configuration_response_default(self, l2cap_control_view):
+        configuration_response = l2cap_packets.ConfigurationResponseView(
+            l2cap_control_view)
+        sid = configuration_response.GetIdentifier()
+
+    def _on_disconnection_request_default(self, l2cap_control_view):
+        disconnection_request = l2cap_packets.DisconnectionRequestView(
+            l2cap_control_view)
+        sid = disconnection_request.GetIdentifier()
+        scid = disconnection_request.GetSourceCid()
+        dcid = disconnection_request.GetDestinationCid()
+        disconnection_response = l2cap_packets.DisconnectionResponseBuilder(
+            sid, dcid, scid)
+        disconnection_response_l2cap = l2cap_packets.BasicFrameBuilder(
+            1, disconnection_response)
+        self.send_acl(disconnection_response_l2cap)
+
+    def _on_disconnection_response_default(self, l2cap_control_view):
+        disconnection_response = l2cap_packets.DisconnectionResponseView(
+            l2cap_control_view)
+
+    def _on_information_request_default(self, l2cap_control_view):
+        information_request = l2cap_packets.InformationRequestView(
+            l2cap_control_view)
+        sid = information_request.GetIdentifier()
+        information_type = information_request.GetInfoType()
+        if information_type == l2cap_packets.InformationRequestInfoType.CONNECTIONLESS_MTU:
+            response = l2cap_packets.InformationResponseConnectionlessMtuBuilder(
+                sid, l2cap_packets.InformationRequestResult.SUCCESS, 100)
+            response_l2cap = l2cap_packets.BasicFrameBuilder(1, response)
+            self.send_acl(response_l2cap)
+            return
+        if information_type == l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED:
+            response = l2cap_packets.InformationResponseExtendedFeaturesBuilder(
+                sid, l2cap_packets.InformationRequestResult.SUCCESS, 0, 0, 0, 1,
+                0, 1, 0, 0, 0, 0)
+            response_l2cap = l2cap_packets.BasicFrameBuilder(1, response)
+            self.send_acl(response_l2cap)
+            return
+        if information_type == l2cap_packets.InformationRequestInfoType.FIXED_CHANNELS_SUPPORTED:
+            response = l2cap_packets.InformationResponseFixedChannelsBuilder(
+                sid, l2cap_packets.InformationRequestResult.SUCCESS, 2)
+            response_l2cap = l2cap_packets.BasicFrameBuilder(1, response)
+            self.send_acl(response_l2cap)
+            return
+
+    def _on_information_response_default(self, l2cap_control_view):
+        information_response = l2cap_packets.InformationResponseView(
+            l2cap_control_view)
+
+    def _handle_control_packet(self, l2cap_packet):
+        packet_bytes = l2cap_packet.payload
+        l2cap_view = l2cap_packets.BasicFrameView(
+            bt_packets.PacketViewLittleEndian(list(packet_bytes)))
+        if l2cap_view.GetChannelId() != 1:
+            return
+        l2cap_control_view = l2cap_packets.ControlView(l2cap_view.GetPayload())
+        fn = self.control_table.get(l2cap_control_view.GetCode())
+        if fn is not None:
+            fn(l2cap_control_view)
+        return
index 5e94570..4ac4669 100644 (file)
@@ -33,6 +33,7 @@ from hci.facade import acl_manager_facade_pb2 as acl_manager_facade
 import bluetooth_packets_python3 as bt_packets
 from bluetooth_packets_python3 import hci_packets, l2cap_packets
 from bluetooth_packets_python3.l2cap_packets import CommandCode
+from cert_l2cap import CertL2cap
 
 # Assemble a sample packet. TODO: Use RawBuilder
 SAMPLE_PACKET = l2cap_packets.CommandRejectNotUnderstoodBuilder(1)
@@ -58,255 +59,14 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
         self.dut.neighbor.EnablePageScan(
             neighbor_facade.EnableMsg(enabled=True))
 
-        self.control_table = {
-            CommandCode.CONNECTION_REQUEST:
-            self._on_connection_request_default,
-            CommandCode.CONNECTION_RESPONSE:
-            self._on_connection_response_default,
-            CommandCode.CONFIGURATION_REQUEST:
-            self._on_configuration_request_default,
-            CommandCode.CONFIGURATION_RESPONSE:
-            self._on_configuration_response_default,
-            CommandCode.DISCONNECTION_REQUEST:
-            self._on_disconnection_request_default,
-            CommandCode.DISCONNECTION_RESPONSE:
-            self._on_disconnection_response_default,
-            CommandCode.INFORMATION_REQUEST:
-            self._on_information_request_default,
-            CommandCode.INFORMATION_RESPONSE:
-            self._on_information_response_default
-        }
-
-        self.scid_to_dcid = {}
-        self.ertm_tx_window_size = 10
-        self.ertm_max_transmit = 20
-
         self.dut_l2cap = PyL2cap(self.dut)
-        self.cert_acl_manager = PyAclManager(self.cert)
+        self.cert_l2cap = CertL2cap(self.cert)
         self.cert_acl = None
 
     def teardown_test(self):
-        self.cert_acl_manager.close()
-        safeClose(self.cert_acl)
+        self.cert_l2cap.close()
         super().teardown_test()
 
-    def _on_connection_request_default(self, l2cap_control_view):
-        connection_request_view = l2cap_packets.ConnectionRequestView(
-            l2cap_control_view)
-        sid = connection_request_view.GetIdentifier()
-        cid = connection_request_view.GetSourceCid()
-
-        self.scid_to_dcid[cid] = cid
-
-        connection_response = l2cap_packets.ConnectionResponseBuilder(
-            sid, cid, cid, l2cap_packets.ConnectionResponseResult.SUCCESS,
-            l2cap_packets.ConnectionResponseStatus.
-            NO_FURTHER_INFORMATION_AVAILABLE)
-        connection_response_l2cap = l2cap_packets.BasicFrameBuilder(
-            1, connection_response)
-        self.cert_acl.send(connection_response_l2cap.Serialize())
-        return True
-
-    def _on_connection_response_default(self, l2cap_control_view):
-        connection_response_view = l2cap_packets.ConnectionResponseView(
-            l2cap_control_view)
-        sid = connection_response_view.GetIdentifier()
-        scid = connection_response_view.GetSourceCid()
-        dcid = connection_response_view.GetDestinationCid()
-        self.scid_to_dcid[scid] = dcid
-
-        config_request = l2cap_packets.ConfigurationRequestBuilder(
-            sid + 1, dcid, l2cap_packets.Continuation.END, [])
-        config_request_l2cap = l2cap_packets.BasicFrameBuilder(
-            1, config_request)
-        self.cert_acl.send(config_request_l2cap.Serialize())
-        return True
-
-    def _on_connection_response_use_ertm(self, l2cap_control_view):
-        connection_response_view = l2cap_packets.ConnectionResponseView(
-            l2cap_control_view)
-        sid = connection_response_view.GetIdentifier()
-        scid = connection_response_view.GetSourceCid()
-        dcid = connection_response_view.GetDestinationCid()
-        self.scid_to_dcid[scid] = dcid
-
-        # FIXME: This doesn't work!
-        ertm_option = l2cap_packets.RetransmissionAndFlowControlConfigurationOption(
-        )
-        ertm_option.mode = l2cap_packets.RetransmissionAndFlowControlModeOption.L2CAP_BASIC
-        ertm_option.tx_window_size = self.ertm_tx_window_size
-        ertm_option.max_transmit = self.ertm_max_transmit
-        ertm_option.retransmission_time_out = 2000
-        ertm_option.monitor_time_out = 12000
-        ertm_option.maximum_pdu_size = 1010
-
-        options = [ertm_option]
-
-        config_request = l2cap_packets.ConfigurationRequestBuilder(
-            sid + 1, dcid, l2cap_packets.Continuation.END, options)
-
-        config_request_l2cap = l2cap_packets.BasicFrameBuilder(
-            1, config_request)
-
-        self.cert_acl.send(config_request_l2cap.Serialize())
-        return True
-
-    def _on_connection_response_use_ertm_and_fcs(self, l2cap_control_view):
-        connection_response_view = l2cap_packets.ConnectionResponseView(
-            l2cap_control_view)
-        sid = connection_response_view.GetIdentifier()
-        scid = connection_response_view.GetSourceCid()
-        dcid = connection_response_view.GetDestinationCid()
-        self.scid_to_dcid[scid] = dcid
-
-        # FIXME: This doesn't work!
-        ertm_option = l2cap_packets.RetransmissionAndFlowControlConfigurationOption(
-        )
-        ertm_option.mode = l2cap_packets.RetransmissionAndFlowControlModeOption.L2CAP_BASIC
-        ertm_option.tx_window_size = self.ertm_tx_window_size
-        ertm_option.max_transmit = self.ertm_max_transmit
-        ertm_option.retransmission_time_out = 2000
-        ertm_option.monitor_time_out = 12000
-        ertm_option.maximum_pdu_size = 1010
-
-        fcs_option = l2cap_packets.FrameCheckSequenceOption()
-        fcs_option.fcs_type = l2cap_packets.FcsType.DEFAULT
-
-        options = [ertm_option, fcs_option]
-
-        config_request = l2cap_packets.ConfigurationRequestBuilder(
-            sid + 1, dcid, l2cap_packets.Continuation.END, options)
-
-        config_request_l2cap = l2cap_packets.BasicFrameBuilder(
-            1, config_request)
-
-        self.cert_acl.send(config_request_l2cap.Serialize())
-        return True
-
-    def _on_connection_response_configuration_request_with_unknown_options_and_hint(
-            self, l2cap_control_view):
-        connection_response_view = l2cap_packets.ConnectionResponseView(
-            l2cap_control_view)
-        sid = connection_response_view.GetIdentifier()
-        scid = connection_response_view.GetSourceCid()
-        dcid = connection_response_view.GetDestinationCid()
-        self.scid_to_dcid[scid] = dcid
-
-        mtu_opt = l2cap_packets.MtuConfigurationOption()
-        mtu_opt.mtu = 0x1234
-        mtu_opt.is_hint = l2cap_packets.ConfigurationOptionIsHint.OPTION_IS_A_HINT
-
-        options = [mtu_opt]
-        config_request = l2cap_packets.ConfigurationRequestBuilder(
-            sid + 1, dcid, l2cap_packets.Continuation.END, options)
-        config_request_l2cap = l2cap_packets.BasicFrameBuilder(
-            1, config_request)
-
-        byte_array = bytearray(config_request_l2cap.Serialize())
-        ## Modify configuration option type to be a unknown
-        byte_array[12] |= 0x7f
-        self.cert.hci_acl_manager.SendAclData(
-            acl_manager_facade.AclData(
-                handle=self.cert_acl_handle, payload=bytes(byte_array)))
-        return True
-
-    def _on_configuration_request_default(self, l2cap_control_view):
-        configuration_request = l2cap_packets.ConfigurationRequestView(
-            l2cap_control_view)
-        sid = configuration_request.GetIdentifier()
-        dcid = configuration_request.GetDestinationCid()
-        config_response = l2cap_packets.ConfigurationResponseBuilder(
-            sid, self.scid_to_dcid.get(dcid, 0), l2cap_packets.Continuation.END,
-            l2cap_packets.ConfigurationResponseResult.SUCCESS, [])
-        config_response_l2cap = l2cap_packets.BasicFrameBuilder(
-            1, config_response)
-        self.cert_send_b_frame(config_response_l2cap)
-
-    def _on_configuration_request_unacceptable_parameters(
-            self, l2cap_control_view):
-        configuration_request = l2cap_packets.ConfigurationRequestView(
-            l2cap_control_view)
-        sid = configuration_request.GetIdentifier()
-        dcid = configuration_request.GetDestinationCid()
-
-        mtu_opt = l2cap_packets.MtuConfigurationOption()
-        mtu_opt.mtu = 123
-        fcs_opt = l2cap_packets.FrameCheckSequenceOption()
-        fcs_opt.fcs_type = l2cap_packets.FcsType.DEFAULT
-        rfc_opt = l2cap_packets.RetransmissionAndFlowControlConfigurationOption(
-        )
-        rfc_opt.mode = l2cap_packets.RetransmissionAndFlowControlModeOption.L2CAP_BASIC
-
-        config_response = l2cap_packets.ConfigurationResponseBuilder(
-            sid, self.scid_to_dcid.get(dcid, 0), l2cap_packets.Continuation.END,
-            l2cap_packets.ConfigurationResponseResult.UNACCEPTABLE_PARAMETERS,
-            [mtu_opt, fcs_opt, rfc_opt])
-        config_response_l2cap = l2cap_packets.BasicFrameBuilder(
-            1, config_response)
-        self.cert_send_b_frame(config_response_l2cap)
-
-    def _on_configuration_response_default(self, l2cap_control_view):
-        configuration_response = l2cap_packets.ConfigurationResponseView(
-            l2cap_control_view)
-        sid = configuration_response.GetIdentifier()
-
-    def _on_disconnection_request_default(self, l2cap_control_view):
-        disconnection_request = l2cap_packets.DisconnectionRequestView(
-            l2cap_control_view)
-        sid = disconnection_request.GetIdentifier()
-        scid = disconnection_request.GetSourceCid()
-        dcid = disconnection_request.GetDestinationCid()
-        disconnection_response = l2cap_packets.DisconnectionResponseBuilder(
-            sid, dcid, scid)
-        disconnection_response_l2cap = l2cap_packets.BasicFrameBuilder(
-            1, disconnection_response)
-        self.cert_acl.send(disconnection_response_l2cap.Serialize())
-
-    def _on_disconnection_response_default(self, l2cap_control_view):
-        disconnection_response = l2cap_packets.DisconnectionResponseView(
-            l2cap_control_view)
-
-    def _on_information_request_default(self, l2cap_control_view):
-        information_request = l2cap_packets.InformationRequestView(
-            l2cap_control_view)
-        sid = information_request.GetIdentifier()
-        information_type = information_request.GetInfoType()
-        if information_type == l2cap_packets.InformationRequestInfoType.CONNECTIONLESS_MTU:
-            response = l2cap_packets.InformationResponseConnectionlessMtuBuilder(
-                sid, l2cap_packets.InformationRequestResult.SUCCESS, 100)
-            response_l2cap = l2cap_packets.BasicFrameBuilder(1, response)
-            self.cert_acl.send(response_l2cap.Serialize())
-            return
-        if information_type == l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED:
-            response = l2cap_packets.InformationResponseExtendedFeaturesBuilder(
-                sid, l2cap_packets.InformationRequestResult.SUCCESS, 0, 0, 0, 1,
-                0, 1, 0, 0, 0, 0)
-            response_l2cap = l2cap_packets.BasicFrameBuilder(1, response)
-            self.cert_acl.send(response_l2cap.Serialize())
-            return
-        if information_type == l2cap_packets.InformationRequestInfoType.FIXED_CHANNELS_SUPPORTED:
-            response = l2cap_packets.InformationResponseFixedChannelsBuilder(
-                sid, l2cap_packets.InformationRequestResult.SUCCESS, 2)
-            response_l2cap = l2cap_packets.BasicFrameBuilder(1, response)
-            self.cert_acl.send(response_l2cap.Serialize())
-            return
-
-    def _on_information_response_default(self, l2cap_control_view):
-        information_response = l2cap_packets.InformationResponseView(
-            l2cap_control_view)
-
-    def _handle_control_packet(self, l2cap_packet):
-        packet_bytes = l2cap_packet.payload
-        l2cap_view = l2cap_packets.BasicFrameView(
-            bt_packets.PacketViewLittleEndian(list(packet_bytes)))
-        if l2cap_view.GetChannelId() != 1:
-            return
-        l2cap_control_view = l2cap_packets.ControlView(l2cap_view.GetPayload())
-        fn = self.control_table.get(l2cap_control_view.GetCode())
-        if fn is not None:
-            fn(l2cap_control_view)
-        return
-
     def cert_send_b_frame(self, b_frame):
         self.cert_acl.send(b_frame.Serialize())
 
@@ -314,11 +74,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
 
         self.dut.neighbor.EnablePageScan(
             neighbor_facade.EnableMsg(enabled=True))
-        self.cert_acl = self.cert_acl_manager.initiate_connection(
-            self.dut.address)
-        self.cert_acl.wait_for_connection_complete()
-        self.cert_acl_manager.get_acl_stream().register_callback(
-            self._handle_control_packet)
+        self.cert_l2cap.connect_acl(self.dut.address)
+        self.cert_acl = self.cert_l2cap.get_acl()
 
     def _open_channel(
             self,
@@ -327,25 +84,17 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
             psm=0x33,
             mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC):
 
-        self.dut_l2cap.open_channel(psm, mode)
-
-        open_channel = l2cap_packets.BasicFrameBuilder(
-            1, l2cap_packets.ConnectionRequestBuilder(signal_id, psm, scid))
-        self.cert_send_b_frame(open_channel)
-
-        assertThat(self.cert_acl_manager.get_acl_stream()).emits(
-            L2capMatchers.ConnectionResponse(scid))
+        self.dut_channel = self.dut_l2cap.open_channel(psm, mode)
+        self.cert_channel = self.cert_l2cap.open_channel(signal_id, psm, scid)
 
     def test_connect_dynamic_channel_and_send_data(self):
         self._setup_link_from_cert()
 
-        psm = 0x33
-        scid = 0x41
-        self._open_channel(1, scid, psm)
-        self.dut.l2cap.SendDynamicChannelPacket(
-            l2cap_facade_pb2.DynamicChannelPacket(psm=0x33, payload=b'abc'))
+        self._open_channel(signal_id=1, scid=0x41, psm=0x33)
 
-        assertThat(self.cert_acl).emits(lambda packet: b'abc' in packet.payload)
+        self.dut_channel.send(b'abc')
+        assertThat(
+            self.cert_channel).emits(lambda packet: b'abc' in packet.payload)
 
     def test_fixed_channel(self):
         self._setup_link_from_cert()
@@ -356,18 +105,20 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
         self.dut.l2cap.SendL2capPacket(
             l2cap_facade_pb2.L2capPacket(channel=2, payload=b"123"))
 
-        assertThat(self.cert_acl).emits(lambda packet: b'123' in packet.payload)
+        assertThat(
+            self.cert_channel).emits(lambda packet: b'123' in packet.payload)
 
     def test_receive_packet_from_unknown_channel(self):
         self._setup_link_from_cert()
         psm = 0x33
         scid = 0x41
         self._open_channel(1, scid, psm)
+
         i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
             0x99, 0, l2cap_packets.Final.NOT_SET, 1,
             l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET)
-        self.cert_send_b_frame(i_frame)
-        assertThat(self.cert_acl).emitsNone(
+        self.cert_l2cap.send_acl(i_frame)
+        assertThat(self.cert_l2cap.get_acl_stream()).emitsNone(
             L2capMatchers.SupervisoryFrame(scid, req_seq=4),
             timeout=timedelta(seconds=1))
 
@@ -379,9 +130,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
 
     def test_connect_and_send_data_ertm_no_segmentation(self):
         self._setup_link_from_cert()
-        self.control_table[
-            CommandCode.
-            CONNECTION_RESPONSE] = self._on_connection_response_use_ertm
+        self.cert_l2cap.turn_on_ertm()
 
         psm = 0x33
         scid = 0x41
@@ -395,12 +144,11 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
             L2capMatchers.ConfigurationResponse(),
             L2capMatchers.ConfigurationRequest()).inAnyOrder()
 
-        dcid = self.scid_to_dcid[scid]
+        dcid = self.cert_l2cap.get_dcid(scid)
 
-        self.dut.l2cap.SendDynamicChannelPacket(
-            l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc' * 34))
-        assertThat(
-            self.cert_acl).emits(lambda packet: b'abc' * 34 in packet.payload)
+        self.dut_channel.send(b'abc' * 34)
+        assertThat(self.cert_channel).emits(
+            lambda packet: b'abc' * 34 in packet.payload)
 
         i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
             dcid, 0, l2cap_packets.Final.NOT_SET, 1,
@@ -432,7 +180,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
         psm = 0x33
         self._open_channel(1, scid, psm)
 
-        dcid = self.scid_to_dcid[scid]
+        dcid = self.cert_l2cap.get_dcid(scid)
 
         close_channel = l2cap_packets.DisconnectionRequestBuilder(1, dcid, scid)
         close_channel_l2cap = l2cap_packets.BasicFrameBuilder(1, close_channel)
@@ -451,8 +199,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
         psm = 0x33
 
         # Don't send configuration request or response back
-        self.control_table[CommandCode.CONFIGURATION_REQUEST] = lambda _: True
-        self.control_table[CommandCode.CONNECTION_RESPONSE] = lambda _: True
+        self.cert_l2cap.ignore_config_and_connections()
 
         self._open_channel(1, scid, psm)
 
@@ -468,9 +215,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
         psm = 0x33
         scid = 0x41
 
-        self.control_table[
-            CommandCode.
-            CONFIGURATION_REQUEST] = self._on_configuration_request_unacceptable_parameters
+        self.cert_l2cap.reply_with_unacceptable_parameters()
 
         self._open_channel(
             1,
@@ -487,7 +232,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
         L2CAP/COS/CFD/BV-12-C
         """
         self._setup_link_from_cert()
-        self.on_connection_response = self._on_connection_response_configuration_request_with_unknown_options_and_hint
+        self.cert_l2cap.reply_with_unknown_options_and_hint()
 
         self._open_channel(signal_id=1, scid=0x41, psm=0x33)
 
@@ -710,9 +455,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
         """
         self._setup_link_from_cert()
 
-        self.control_table[
-            CommandCode.
-            CONNECTION_RESPONSE] = self._on_connection_response_use_ertm
+        self.cert_l2cap.turn_on_ertm()
 
         psm = 0x33
         scid = 0x41
@@ -736,12 +479,10 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
         Verify the IUT will include the FCS in I/S-frames if the Lower Tester explicitly requests that FCS
         should be used.
         """
-
         self._setup_link_from_cert()
 
-        self.control_table[
-            CommandCode.
-            CONNECTION_RESPONSE] = self._on_connection_response_use_ertm_and_fcs
+        self.cert_l2cap.turn_on_ertm_and_fcs()
+
         psm = 0x33
         scid = 0x41
         self._open_channel(
@@ -767,9 +508,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
         """
         self._setup_link_from_cert()
 
-        self.control_table[
-            CommandCode.
-            CONNECTION_RESPONSE] = self._on_connection_response_use_ertm_and_fcs
+        self.cert_l2cap.turn_on_ertm_and_fcs()
+
         psm = 0x33
         scid = 0x41
         self._open_channel(
@@ -794,9 +534,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
         """
         self._setup_link_from_cert()
 
-        self.control_table[
-            CommandCode.
-            CONNECTION_RESPONSE] = self._on_connection_response_use_ertm
+        self.cert_l2cap.turn_on_ertm()
+
         psm = 0x33
         scid = 0x41
         self._open_channel(
@@ -805,7 +544,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
             psm,
             mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
 
-        dcid = self.scid_to_dcid[scid]
+        dcid = self.cert_l2cap.get_dcid(scid)
 
         assertThat(self.cert_acl).emits(
             L2capMatchers.ConfigurationResponse(),
@@ -847,9 +586,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
         Verify the IUT can receive in-sequence valid I-frames and deliver L2CAP SDUs to the Upper Tester
         """
         self._setup_link_from_cert()
-        self.control_table[
-            CommandCode.
-            CONNECTION_RESPONSE] = self._on_connection_response_use_ertm
+
+        self.cert_l2cap.turn_on_ertm()
 
         psm = 0x33
         scid = 0x41
@@ -859,7 +597,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
             psm,
             mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
 
-        dcid = self.scid_to_dcid[scid]
+        dcid = self.cert_l2cap.get_dcid(scid)
 
         assertThat(self.cert_acl).emits(
             L2capMatchers.ConfigurationResponse(),
@@ -902,9 +640,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
         Lower Tester
         """
         self._setup_link_from_cert()
-        self.control_table[
-            CommandCode.
-            CONNECTION_RESPONSE] = self._on_connection_response_use_ertm
+        self.cert_l2cap.turn_on_ertm()
 
         psm = 0x33
         scid = 0x41
@@ -914,7 +650,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
             psm,
             mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
 
-        dcid = self.scid_to_dcid[scid]
+        dcid = self.cert_l2cap.get_dcid(scid)
 
         assertThat(self.cert_acl).emits(
             L2capMatchers.ConfigurationResponse(),
@@ -940,11 +676,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
         IUT will resume transmission of I-frames when an S-frame [RR] is received that acknowledges
         previously sent I-frames.
         """
-        self.ertm_tx_window_size = 1
         self._setup_link_from_cert()
-        self.control_table[
-            CommandCode.
-            CONNECTION_RESPONSE] = self._on_connection_response_use_ertm
+        self.cert_l2cap.turn_on_ertm(tx_window_size=1)
 
         psm = 0x33
         scid = 0x41
@@ -954,7 +687,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
             psm,
             mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
 
-        dcid = self.scid_to_dcid[scid]
+        dcid = self.cert_l2cap.get_dcid(scid)
 
         assertThat(self.cert_acl).emits(
             L2capMatchers.ConfigurationResponse(),
@@ -984,11 +717,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
         IUT will resume transmission of I-frames when an I-frame is received that acknowledges previously
         sent I-frames.
         """
-        self.ertm_tx_window_size = 1
         self._setup_link_from_cert()
-        self.control_table[
-            CommandCode.
-            CONNECTION_RESPONSE] = self._on_connection_response_use_ertm
+        self.cert_l2cap.turn_on_ertm(tx_window_size=1)
 
         psm = 0x33
         scid = 0x41
@@ -998,7 +728,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
             psm,
             mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
 
-        dcid = self.scid_to_dcid[scid]
+        dcid = self.cert_l2cap.get_dcid(scid)
 
         assertThat(self.cert_acl).emits(
             L2capMatchers.ConfigurationResponse(),
@@ -1035,9 +765,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
         Verify the IUT sends an S-frame [RR] with the Poll bit set when its retransmission timer expires.
         """
         self._setup_link_from_cert()
-        self.control_table[
-            CommandCode.
-            CONNECTION_RESPONSE] = self._on_connection_response_use_ertm
+        self.cert_l2cap.turn_on_ertm()
 
         psm = 0x33
         scid = 0x41
@@ -1065,9 +793,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
         with the Poll bit set.
         """
         self._setup_link_from_cert()
-        self.control_table[
-            CommandCode.
-            CONNECTION_RESPONSE] = self._on_connection_response_use_ertm
+        self.cert_l2cap.turn_on_ertm()
 
         psm = 0x33
         scid = 0x41
@@ -1081,7 +807,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
             L2capMatchers.ConfigurationResponse(),
             L2capMatchers.ConfigurationRequest()).inAnyOrder()
 
-        dcid = self.scid_to_dcid[scid]
+        dcid = self.cert_l2cap.get_dcid(scid)
 
         s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
             dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY,
@@ -1099,9 +825,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
         """
         asserts.skip("Need to configure DUT to have a shorter timer")
         self._setup_link_from_cert()
-        self.control_table[
-            CommandCode.
-            CONNECTION_RESPONSE] = self._on_connection_response_use_ertm
+        self.cert_l2cap.turn_on_ertm()
 
         psm = 0x33
         scid = 0x41
@@ -1115,7 +839,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
             L2capMatchers.ConfigurationResponse(),
             L2capMatchers.ConfigurationRequest()).inAnyOrder()
 
-        dcid = self.scid_to_dcid[scid]
+        dcid = self.cert_l2cap.get_dcid(scid)
 
         self.dut.l2cap.SendDynamicChannelPacket(
             l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
@@ -1131,9 +855,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
         not acknowledge the previous I-frame sent by the IUT.
         """
         self._setup_link_from_cert()
-        self.control_table[
-            CommandCode.
-            CONNECTION_RESPONSE] = self._on_connection_response_use_ertm
+        self.cert_l2cap.turn_on_ertm()
 
         psm = 0x33
         scid = 0x41
@@ -1147,7 +869,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
             L2capMatchers.ConfigurationResponse(),
             L2capMatchers.ConfigurationRequest()).inAnyOrder()
 
-        dcid = self.scid_to_dcid[scid]
+        dcid = self.cert_l2cap.get_dcid(scid)
 
         self.dut.l2cap.SendDynamicChannelPacket(
             l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
@@ -1167,12 +889,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
         L2CAP/ERM/BV-13-C [Respond to S-Frame [REJ]]
         Verify the IUT retransmits I-frames starting from the sequence number specified in the S-frame [REJ].
         """
-        self.ertm_tx_window_size = 2
-        self.ertm_max_transmit = 2
         self._setup_link_from_cert()
-        self.control_table[
-            CommandCode.
-            CONNECTION_RESPONSE] = self._on_connection_response_use_ertm
+        self.cert_l2cap.turn_on_ertm(tx_window_size=2, max_transmit=2)
 
         psm = 0x33
         scid = 0x41
@@ -1186,7 +904,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
             L2capMatchers.ConfigurationResponse(),
             L2capMatchers.ConfigurationRequest()).inAnyOrder()
 
-        dcid = self.scid_to_dcid[scid]
+        dcid = self.cert_l2cap.get_dcid(scid)
 
         self.dut.l2cap.SendDynamicChannelPacket(
             l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
@@ -1214,9 +932,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
         [RR] with the Final Bit set.
         """
         self._setup_link_from_cert()
-        self.control_table[
-            CommandCode.
-            CONNECTION_RESPONSE] = self._on_connection_response_use_ertm
+        self.cert_l2cap.turn_on_ertm()
 
         psm = 0x33
         scid = 0x41
@@ -1230,7 +946,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
             L2capMatchers.ConfigurationResponse(),
             L2capMatchers.ConfigurationRequest()).inAnyOrder()
 
-        dcid = self.scid_to_dcid[scid]
+        dcid = self.cert_l2cap.get_dcid(scid)
 
         self.dut.l2cap.SendDynamicChannelPacket(
             l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
@@ -1255,9 +971,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
         with the final bit set.
         """
         self._setup_link_from_cert()
-        self.control_table[
-            CommandCode.
-            CONNECTION_RESPONSE] = self._on_connection_response_use_ertm
+        self.cert_l2cap.turn_on_ertm()
 
         psm = 0x33
         scid = 0x41
@@ -1271,7 +985,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
             L2capMatchers.ConfigurationResponse(),
             L2capMatchers.ConfigurationRequest()).inAnyOrder()
 
-        dcid = self.scid_to_dcid[scid]
+        dcid = self.cert_l2cap.get_dcid(scid)
 
         self.dut.l2cap.SendDynamicChannelPacket(
             l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
@@ -1296,9 +1010,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
         Lower Tester (S-frame [RNR]).
         """
         self._setup_link_from_cert()
-        self.control_table[
-            CommandCode.
-            CONNECTION_RESPONSE] = self._on_connection_response_use_ertm
+        self.cert_l2cap.turn_on_ertm()
 
         psm = 0x33
         scid = 0x41
@@ -1312,7 +1024,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
             L2capMatchers.ConfigurationResponse(),
             L2capMatchers.ConfigurationRequest()).inAnyOrder()
 
-        dcid = self.scid_to_dcid[scid]
+        dcid = self.cert_l2cap.get_dcid(scid)
 
         self.dut.l2cap.SendDynamicChannelPacket(
             l2cap_facade_pb2.DynamicChannelPacket(psm=0x33, payload=b'abc'))
@@ -1336,11 +1048,9 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
         Verify the IUT can handle receipt of an S-=frame [RR] Poll = 1 if the S-frame [REJ] sent from the IUT
         is lost.
         """
-        self.ertm_tx_window_size = 5
         self._setup_link_from_cert()
-        self.control_table[
-            CommandCode.
-            CONNECTION_RESPONSE] = self._on_connection_response_use_ertm
+        self.cert_l2cap.turn_on_ertm(tx_window_size=5)
+        ertm_tx_window_size = 5
 
         psm = 0x33
         scid = 0x41
@@ -1354,7 +1064,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
             L2capMatchers.ConfigurationResponse(),
             L2capMatchers.ConfigurationRequest()).inAnyOrder()
 
-        dcid = self.scid_to_dcid[scid]
+        dcid = self.cert_l2cap.get_dcid(scid)
 
         i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
             dcid, 0, l2cap_packets.Final.NOT_SET, 0,
@@ -1364,7 +1074,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
             L2capMatchers.SupervisoryFrame(scid, req_seq=1))
 
         i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
-            dcid, self.ertm_tx_window_size - 1, l2cap_packets.Final.NOT_SET, 0,
+            dcid, ertm_tx_window_size - 1, l2cap_packets.Final.NOT_SET, 0,
             l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET)
         self.cert_send_b_frame(i_frame)
         assertThat(self.cert_acl).emits(
@@ -1379,7 +1089,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
         assertThat(self.cert_acl).emits(
             L2capMatchers.SupervisoryFrame(
                 scid, req_seq=1, f=l2cap_packets.Final.POLL_RESPONSE))
-        for i in range(1, self.ertm_tx_window_size):
+        for i in range(1, ertm_tx_window_size):
             i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
                 dcid, i, l2cap_packets.Final.NOT_SET, 0,
                 l2cap_packets.SegmentationAndReassembly.UNSEGMENTED,
@@ -1394,9 +1104,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
         Verify the IUT will only retransmit the requested I-frame once after receiving a duplicate SREJ.
         """
         self._setup_link_from_cert()
-        self.control_table[
-            CommandCode.
-            CONNECTION_RESPONSE] = self._on_connection_response_use_ertm
+        self.cert_l2cap.turn_on_ertm()
 
         psm = 0x33
         scid = 0x41
@@ -1410,7 +1118,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
             L2capMatchers.ConfigurationResponse(),
             L2capMatchers.ConfigurationRequest()).inAnyOrder()
 
-        dcid = self.scid_to_dcid[scid]
+        dcid = self.cert_l2cap.get_dcid(scid)
 
         self.dut.l2cap.SendDynamicChannelPacket(
             l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
@@ -1449,9 +1157,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
         retransmitted.
         """
         self._setup_link_from_cert()
-        self.control_table[
-            CommandCode.
-            CONNECTION_RESPONSE] = self._on_connection_response_use_ertm
+        self.cert_l2cap.turn_on_ertm()
 
         psm = 0x33
         scid = 0x41
@@ -1465,7 +1171,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
             L2capMatchers.ConfigurationResponse(),
             L2capMatchers.ConfigurationRequest()).inAnyOrder()
 
-        dcid = self.scid_to_dcid[scid]
+        dcid = self.cert_l2cap.get_dcid(scid)
 
         self.dut.l2cap.SendDynamicChannelPacket(
             l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
@@ -1507,9 +1213,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
         followed by an I-frame with the Final bit set that indicates the same I-frames should be retransmitted.
         """
         self._setup_link_from_cert()
-        self.control_table[
-            CommandCode.
-            CONNECTION_RESPONSE] = self._on_connection_response_use_ertm
+        self.cert_l2cap.turn_on_ertm()
 
         psm = 0x33
         scid = 0x41
@@ -1523,7 +1227,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
             L2capMatchers.ConfigurationResponse(),
             L2capMatchers.ConfigurationRequest()).inAnyOrder()
 
-        dcid = self.scid_to_dcid[scid]
+        dcid = self.cert_l2cap.get_dcid(scid)
 
         self.dut.l2cap.SendDynamicChannelPacket(
             l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
@@ -1565,9 +1269,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
         """
         self._setup_link_from_cert()
 
-        self.control_table[
-            CommandCode.
-            CONNECTION_RESPONSE] = self._on_connection_response_use_ertm
+        self.cert_l2cap.turn_on_ertm()
 
         psm = 0x33
         scid = 0x41