OSDN Git Service

Add function to send s_frames & clean up sending
authorZach Johnson <zachoverflow@google.com>
Tue, 3 Mar 2020 19:25:37 +0000 (11:25 -0800)
committerZach Johnson <zachoverflow@google.com>
Tue, 3 Mar 2020 22:32:28 +0000 (14:32 -0800)
Test: cert/run --host --test_filter=L2capTest
Change-Id: I3bc7d2f0489a8a99febbc7bf746f355a2c77223d

gd/l2cap/classic/cert/cert_l2cap.py
gd/l2cap/classic/cert/l2cap_test.py

index 00f7040..9fae240 100644 (file)
@@ -23,6 +23,8 @@ from bluetooth_packets_python3 import l2cap_packets
 from bluetooth_packets_python3.l2cap_packets import CommandCode
 from bluetooth_packets_python3.l2cap_packets import Final
 from bluetooth_packets_python3.l2cap_packets import SegmentationAndReassembly
+from bluetooth_packets_python3.l2cap_packets import SupervisoryFunction
+from bluetooth_packets_python3.l2cap_packets import Poll
 from cert.event_stream import FilteringEventStream
 from cert.event_stream import IEventStream
 from cert.matchers import L2capMatchers
@@ -57,6 +59,15 @@ class CertL2capChannel(IEventStream):
             self._dcid, tx_seq, f, req_seq, sar, payload)
         self._acl.send(frame.Serialize())
 
+    def send_s_frame(self,
+                     req_seq,
+                     s=SupervisoryFunction.RECEIVER_READY,
+                     p=Poll.NOT_SET,
+                     f=Final.NOT_SET):
+        frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
+            self._dcid, s, p, f, req_seq)
+        self._acl.send(frame.Serialize())
+
 
 class CertL2cap(Closable):
 
index e735d0e..7d0accb 100644 (file)
@@ -35,6 +35,8 @@ from bluetooth_packets_python3 import hci_packets, l2cap_packets
 from bluetooth_packets_python3.l2cap_packets import SegmentationAndReassembly
 from bluetooth_packets_python3.l2cap_packets import Final
 from bluetooth_packets_python3.l2cap_packets import CommandCode
+from bluetooth_packets_python3.l2cap_packets import SupervisoryFunction
+from bluetooth_packets_python3.l2cap_packets import Poll
 from cert_l2cap import CertL2cap
 
 # Assemble a sample packet. TODO: Use RawBuilder
@@ -124,7 +126,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
         self._open_channel(scid=0x41, psm=0x33)
 
         i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
-            0x99, 0, l2cap_packets.Final.NOT_SET, 1,
+            0x99, 0, Final.NOT_SET, 1,
             l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET)
         self.cert_l2cap.send_acl(i_frame)
         assertThat(self.cert_channel).emitsNone(
@@ -569,10 +571,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
         self._setup_link_from_cert()
         self.cert_l2cap.turn_on_ertm(tx_window_size=1)
 
-        scid = 0x41
-        self._open_channel(scid=scid, psm=0x33, use_ertm=True)
-
-        dcid = self.cert_l2cap.get_dcid(scid)
+        self._open_channel(scid=0x41, psm=0x33, use_ertm=True)
 
         self.dut_channel.send(b'abc')
         self.dut_channel.send(b'def')
@@ -581,10 +580,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
             L2capMatchers.IFrame(tx_seq=0, payload=b'abc'))
         assertThat(self.cert_channel).emitsNone(
             L2capMatchers.IFrame(tx_seq=1, payload=b'def'))
-        s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
-            dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY,
-            l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE, 1)
-        self.cert_send_b_frame(s_frame)
+
+        self.cert_channel.send_s_frame(req_seq=1, f=Final.POLL_RESPONSE)
         assertThat(self.cert_channel).emits(L2capMatchers.IFrame(tx_seq=1))
 
     def test_resume_transmitting_when_acknowledge_previously_sent(self):
@@ -643,18 +640,11 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
         self._setup_link_from_cert()
         self.cert_l2cap.turn_on_ertm()
 
-        scid = 0x41
-        self._open_channel(scid=scid, psm=0x33, use_ertm=True)
-
-        dcid = self.cert_l2cap.get_dcid(scid)
-
-        s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
-            dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY,
-            l2cap_packets.Poll.POLL, l2cap_packets.Final.NOT_SET, 0)
-        self.cert_send_b_frame(s_frame)
+        self._open_channel(scid=0x41, psm=0x33, use_ertm=True)
 
+        self.cert_channel.send_s_frame(req_seq=0, p=Poll.POLL)
         assertThat(self.cert_channel).emits(
-            L2capMatchers.SFrame(f=l2cap_packets.Final.POLL_RESPONSE))
+            L2capMatchers.SFrame(f=Final.POLL_RESPONSE))
 
     def test_s_frame_transmissions_exceed_max_transmit(self):
         """
@@ -683,20 +673,12 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
         self._setup_link_from_cert()
         self.cert_l2cap.turn_on_ertm()
 
-        scid = 0x41
-        self._open_channel(scid=scid, psm=0x33, use_ertm=True)
-
-        dcid = self.cert_l2cap.get_dcid(scid)
+        self._open_channel(scid=0x41, psm=0x33, use_ertm=True)
 
         self.dut_channel.send(b'abc')
-
         assertThat(self.cert_channel).emits(L2capMatchers.IFrame(tx_seq=0))
 
-        s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
-            dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY,
-            l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE, 0)
-        self.cert_send_b_frame(s_frame)
-
+        self.cert_channel.send_s_frame(req_seq=0, f=Final.POLL_RESPONSE)
         assertThat(self.cert_l2cap.get_control_channel()).emits(
             L2capMatchers.DisconnectionRequest())
 
@@ -709,9 +691,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
         self.cert_l2cap.turn_on_ertm(tx_window_size=2, max_transmit=2)
 
         scid = 0x41
-        self._open_channel(scid=scid, psm=0x33, use_ertm=True)
-
-        dcid = self.cert_l2cap.get_dcid(scid)
+        self._open_channel(scid=0x41, psm=0x33, use_ertm=True)
 
         self.dut_channel.send(b'abc')
         self.dut_channel.send(b'abc')
@@ -719,10 +699,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
             assertThat(self.cert_channel).emits(
                 L2capMatchers.IFrame(tx_seq=i), timeout=timedelta(seconds=0.5))
 
-        s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
-            dcid, l2cap_packets.SupervisoryFunction.REJECT,
-            l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.NOT_SET, 0)
-        self.cert_send_b_frame(s_frame)
+        self.cert_channel.send_s_frame(req_seq=0, s=SupervisoryFunction.REJECT)
 
         for i in range(2):
             assertThat(self.cert_channel).emits(
@@ -737,10 +714,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
         self._setup_link_from_cert()
         self.cert_l2cap.turn_on_ertm()
 
-        scid = 0x41
-        self._open_channel(scid=scid, psm=0x33, use_ertm=True)
-
-        dcid = self.cert_l2cap.get_dcid(scid)
+        self._open_channel(scid=0x41, psm=0x33, use_ertm=True)
 
         self.dut_channel.send(b'abc')
 
@@ -749,11 +723,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
         assertThat(self.cert_channel).emits(
             L2capMatchers.SFrame(p=l2cap_packets.Poll.POLL))
 
-        s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
-            dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY,
-            l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE, 0)
-        self.cert_send_b_frame(s_frame)
-
+        self.cert_channel.send_s_frame(req_seq=0, f=Final.POLL_RESPONSE)
         assertThat(self.cert_channel).emits(L2capMatchers.IFrame(tx_seq=0))
 
     def test_receive_i_frame_final_bit_set(self):
@@ -771,8 +741,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
 
         # TODO: Always use their retransmission timeout value
         time.sleep(2)
-        assertThat(self.cert_channel).emits(
-            L2capMatchers.SFrame(p=l2cap_packets.Poll.POLL))
+        assertThat(self.cert_channel).emits(L2capMatchers.SFrame(p=Poll.POLL))
 
         self.cert_channel.send_i_frame(
             tx_seq=0, req_seq=0, f=Final.POLL_RESPONSE, payload=SAMPLE_PACKET)
@@ -788,10 +757,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
         self._setup_link_from_cert()
         self.cert_l2cap.turn_on_ertm()
 
-        scid = 0x41
-        self._open_channel(scid=scid, psm=0x33, use_ertm=True)
-
-        dcid = self.cert_l2cap.get_dcid(scid)
+        self._open_channel(scid=0x41, psm=0x33, use_ertm=True)
 
         self.dut_channel.send(b'abc')
 
@@ -800,11 +766,10 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
         assertThat(self.cert_channel).emits(
             L2capMatchers.SFrame(p=l2cap_packets.Poll.POLL))
 
-        s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
-            dcid, l2cap_packets.SupervisoryFunction.RECEIVER_NOT_READY,
-            l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE, 0)
-        self.cert_send_b_frame(s_frame)
-
+        self.cert_channel.send_s_frame(
+            req_seq=0,
+            s=SupervisoryFunction.RECEIVER_NOT_READY,
+            f=Final.POLL_RESPONSE)
         assertThat(self.cert_channel).emitsNone(L2capMatchers.IFrame(tx_seq=0))
 
     def test_sent_rej_lost(self):
@@ -817,10 +782,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
         self.cert_l2cap.turn_on_ertm(tx_window_size=5)
         ertm_tx_window_size = 5
 
-        scid = 0x41
-        self._open_channel(scid=scid, psm=0x41, use_ertm=True)
-
-        dcid = self.cert_l2cap.get_dcid(scid)
+        self._open_channel(scid=0x41, psm=0x41, use_ertm=True)
 
         self.cert_channel.send_i_frame(
             tx_seq=0, req_seq=0, payload=SAMPLE_PACKET)
@@ -829,12 +791,9 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
         self.cert_channel.send_i_frame(
             tx_seq=ertm_tx_window_size - 1, req_seq=0, payload=SAMPLE_PACKET)
         assertThat(self.cert_channel).emits(
-            L2capMatchers.SFrame(s=l2cap_packets.SupervisoryFunction.REJECT))
+            L2capMatchers.SFrame(s=SupervisoryFunction.REJECT))
 
-        s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
-            dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY,
-            l2cap_packets.Poll.POLL, l2cap_packets.Final.NOT_SET, 0)
-        self.cert_send_b_frame(s_frame)
+        self.cert_channel.send_s_frame(req_seq=0, p=Poll.POLL)
 
         assertThat(self.cert_channel).emits(
             L2capMatchers.SFrame(
@@ -853,10 +812,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
         self._setup_link_from_cert()
         self.cert_l2cap.turn_on_ertm()
 
-        scid = 0x41
-        self._open_channel(scid=scid, psm=0x33, use_ertm=True)
-
-        dcid = self.cert_l2cap.get_dcid(scid)
+        self._open_channel(scid=0x41, psm=0x33, use_ertm=True)
 
         self.dut_channel.send(b'abc')
         self.dut_channel.send(b'abc')
@@ -864,22 +820,16 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
             L2capMatchers.IFrame(tx_seq=0), timeout=timedelta(0.5))
         assertThat(self.cert_channel).emits(
             L2capMatchers.IFrame(tx_seq=1), timeout=timedelta(0.5))
-        assertThat(self.cert_channel).emits(
-            L2capMatchers.SFrame(p=l2cap_packets.Poll.POLL))
-
-        # Send SREJ with F not set
-        s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
-            dcid, l2cap_packets.SupervisoryFunction.SELECT_REJECT,
-            l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.NOT_SET, 0)
-        self.cert_send_b_frame(s_frame)
+        assertThat(self.cert_channel).emits(L2capMatchers.SFrame(p=Poll.POLL))
 
+        self.cert_channel.send_s_frame(
+            req_seq=0, s=SupervisoryFunction.SELECT_REJECT)
         assertThat(self.cert_channel).emitsNone(timeout=timedelta(seconds=0.5))
-        # Send SREJ with F set
-        s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
-            dcid, l2cap_packets.SupervisoryFunction.SELECT_REJECT,
-            l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE, 0)
-        self.cert_send_b_frame(s_frame)
 
+        self.cert_channel.send_s_frame(
+            req_seq=0,
+            s=SupervisoryFunction.SELECT_REJECT,
+            f=Final.POLL_RESPONSE)
         assertThat(self.cert_channel).emits(L2capMatchers.IFrame(tx_seq=0))
 
     def test_handle_receipt_rej_and_rr_with_f_set(self):
@@ -892,10 +842,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
         self._setup_link_from_cert()
         self.cert_l2cap.turn_on_ertm()
 
-        scid = 0x41
-        self._open_channel(scid=scid, psm=0x33, use_ertm=True)
-
-        dcid = self.cert_l2cap.get_dcid(scid)
+        self._open_channel(scid=0x41, psm=0x33, use_ertm=True)
 
         self.dut_channel.send(b'abc')
         self.dut_channel.send(b'abc')
@@ -907,20 +854,12 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
             L2capMatchers.SFrame(p=l2cap_packets.Poll.POLL),
             timeout=timedelta(2))
 
-        # Send REJ with F not set
-        s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
-            dcid, l2cap_packets.SupervisoryFunction.REJECT,
-            l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.NOT_SET, 0)
-        self.cert_send_b_frame(s_frame)
-
+        self.cert_channel.send_s_frame(req_seq=0, s=SupervisoryFunction.REJECT)
         assertThat(self.cert_channel).emitsNone(timeout=timedelta(seconds=0.5))
 
         # Send RR with F set
-        s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
-            dcid, l2cap_packets.SupervisoryFunction.REJECT,
-            l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE, 0)
-        self.cert_send_b_frame(s_frame)
-
+        self.cert_channel.send_s_frame(
+            req_seq=0, s=SupervisoryFunction.REJECT, f=Final.POLL_RESPONSE)
         assertThat(self.cert_channel).emits(L2capMatchers.IFrame(tx_seq=0))
         assertThat(self.cert_channel).emits(L2capMatchers.IFrame(tx_seq=1))
 
@@ -933,10 +872,7 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
         self._setup_link_from_cert()
         self.cert_l2cap.turn_on_ertm()
 
-        scid = 0x41
-        self._open_channel(scid=scid, psm=0x33, use_ertm=True)
-
-        dcid = self.cert_l2cap.get_dcid(scid)
+        self._open_channel(scid=0x41, psm=0x33, use_ertm=True)
 
         self.dut_channel.send(b'abc')
         self.dut_channel.send(b'abc')
@@ -949,11 +885,8 @@ class L2capTest(GdFacadeOnlyBaseTestClass):
             timeout=timedelta(2))
 
         # Send SREJ with F not set
-        s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
-            dcid, l2cap_packets.SupervisoryFunction.SELECT_REJECT,
-            l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.NOT_SET, 0)
-        self.cert_send_b_frame(s_frame)
-
+        self.cert_channel.send_s_frame(
+            req_seq=0, s=SupervisoryFunction.SELECT_REJECT)
         assertThat(self.cert_channel).emitsNone(timeout=timedelta(seconds=0.5))
 
         self.cert_channel.send_i_frame(