from cert.gd_base_test_facade_only import GdFacadeOnlyBaseTestClass
from cert.event_stream import EventStream
from cert.py_l2cap import PyL2cap
+from cert.py_acl_manager import PyAclManager
from facade import common_pb2
from facade import rootservice_pb2 as facade_rootservice
from google.protobuf import empty_pb2 as empty_proto
self.ertm_max_transmit = 20
self.dut_l2cap = PyL2cap(self.dut)
+ self.cert_acl_manager = PyAclManager(self.cert)
+
+ def teardown_test(self):
+ self.cert_acl_manager.close()
+ super().teardown_test()
def _on_connection_request_default(self, l2cap_control_view):
connection_request_view = l2cap_packets.ConnectionRequestView(
self.dut.neighbor.EnablePageScan(
neighbor_facade.EnableMsg(enabled=True))
- with EventStream(
- self.cert.hci_acl_manager.CreateConnection(
- acl_manager_facade.ConnectionMsg(
- address_type=int(
- hci_packets.AddressType.PUBLIC_DEVICE_ADDRESS),
- address=bytes(self.dut_address.address)))
- ) as connection_event_stream:
-
- # Cert gets ConnectionComplete with a handle and sends ACL data
- handle = 0xfff
-
- def get_handle(packet):
- packet_bytes = packet.event
- if b'\x03\x0b\x00' in packet_bytes:
- nonlocal handle
- cc_view = hci_packets.ConnectionCompleteView(
- hci_packets.EventPacketView(
- bt_packets.PacketViewLittleEndian(
- list(packet_bytes))))
- handle = cc_view.GetConnectionHandle()
- return True
- return False
-
- connection_event_stream.assert_event_occurs(get_handle)
-
- self.cert_acl_handle = handle
- return handle
+ with self.cert_acl_manager.initiate_connection(
+ self.dut.address) as cert_acl:
+ cert_acl.wait_for_connection_complete()
+ self.cert_acl_handle = cert_acl.handle ## HACK HACK HACK
+ return cert_acl.handle
def _open_channel(
self,
def test_connect_dynamic_channel_and_send_data(self):
cert_acl_handle = self._setup_link_from_cert()
- with EventStream(
- self.cert.hci_acl_manager.FetchAclData(
- empty_proto.Empty())) as cert_acl_data_stream:
- cert_acl_data_stream.register_callback(self._handle_control_packet)
- psm = 0x33
- scid = 0x41
- self._open_channel(cert_acl_data_stream, 1, cert_acl_handle, scid,
- psm)
- self.dut.l2cap.SendDynamicChannelPacket(
- l2cap_facade_pb2.DynamicChannelPacket(psm=0x33, payload=b'abc'))
- cert_acl_data_stream.assert_event_occurs(
- lambda packet: b'abc' in packet.payload)
+ cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+ cert_acl_data_stream.register_callback(self._handle_control_packet)
+ psm = 0x33
+ scid = 0x41
+ self._open_channel(cert_acl_data_stream, 1, cert_acl_handle, scid, psm)
+ self.dut.l2cap.SendDynamicChannelPacket(
+ l2cap_facade_pb2.DynamicChannelPacket(psm=0x33, payload=b'abc'))
+ cert_acl_data_stream.assert_event_occurs(
+ lambda packet: b'abc' in packet.payload)
def test_fixed_channel(self):
cert_acl_handle = self._setup_link_from_cert()
+ cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
self.dut.l2cap.RegisterChannel(
l2cap_facade_pb2.RegisterChannelRequest(channel=2))
asserts.skip("FIXME: Not working")
- with EventStream(
- self.cert.hci_acl_manager.FetchAclData(
- empty_proto.Empty())) as cert_acl_data_stream:
- self.dut.l2cap.SendL2capPacket(
- l2cap_facade_pb2.L2capPacket(channel=2, payload=b"123"))
- cert_acl_data_stream.assert_event_occurs(
- lambda packet: b'123' in packet.payload)
+ self.dut.l2cap.SendL2capPacket(
+ l2cap_facade_pb2.L2capPacket(channel=2, payload=b"123"))
+ cert_acl_data_stream.assert_event_occurs(
+ lambda packet: b'123' in packet.payload)
def test_receive_packet_from_unknown_channel(self):
cert_acl_handle = self._setup_link_from_cert()
- with EventStream(
- self.cert.hci_acl_manager.FetchAclData(
- empty_proto.Empty())) as cert_acl_data_stream:
- cert_acl_data_stream.register_callback(self._handle_control_packet)
- psm = 0x33
- scid = 0x41
- self._open_channel(cert_acl_data_stream, 1, cert_acl_handle, scid,
- psm)
- i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
- 0x99, 0, l2cap_packets.Final.NOT_SET, 1,
- l2cap_packets.SegmentationAndReassembly.UNSEGMENTED,
- SAMPLE_PACKET)
- self.cert_send_b_frame(i_frame)
- cert_acl_data_stream.assert_none_matching(
- lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == 4,
- timedelta(seconds=1))
+ cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+ cert_acl_data_stream.register_callback(self._handle_control_packet)
+ psm = 0x33
+ scid = 0x41
+ self._open_channel(cert_acl_data_stream, 1, cert_acl_handle, scid, psm)
+ i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
+ 0x99, 0, l2cap_packets.Final.NOT_SET, 1,
+ l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET)
+ self.cert_send_b_frame(i_frame)
+ cert_acl_data_stream.assert_none_matching(
+ lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == 4,
+ timedelta(seconds=1))
def test_open_two_channels(self):
cert_acl_handle = self._setup_link_from_cert()
- with EventStream(
- self.cert.hci_acl_manager.FetchAclData(
- empty_proto.Empty())) as cert_acl_data_stream:
- cert_acl_data_stream.register_callback(self._handle_control_packet)
- self._open_channel(cert_acl_data_stream, 1, cert_acl_handle, 0x41,
- 0x41)
- self._open_channel(cert_acl_data_stream, 2, cert_acl_handle, 0x43,
- 0x43)
+ cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+ cert_acl_data_stream.register_callback(self._handle_control_packet)
+ self._open_channel(cert_acl_data_stream, 1, cert_acl_handle, 0x41, 0x41)
+ self._open_channel(cert_acl_data_stream, 2, cert_acl_handle, 0x43, 0x43)
def test_connect_and_send_data_ertm_no_segmentation(self):
cert_acl_handle = self._setup_link_from_cert()
- with EventStream(
- self.cert.hci_acl_manager.FetchAclData(
- empty_proto.Empty())) as cert_acl_data_stream:
- cert_acl_data_stream.register_callback(self._handle_control_packet)
- self.on_connection_response = self._on_connection_response_use_ertm
-
- psm = 0x33
- scid = 0x41
- self._open_channel(
- cert_acl_data_stream,
- 1,
- cert_acl_handle,
- scid,
- psm,
- mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
-
- # FIXME: Order shouldn't matter here
- cert_acl_data_stream.assert_event_occurs(
- self.is_correct_configuration_response)
- cert_acl_data_stream.assert_event_occurs(
- self.is_correct_configuration_request)
-
- dcid = self.scid_to_dcid[scid]
+ cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+ cert_acl_data_stream.register_callback(self._handle_control_packet)
+ self.on_connection_response = self._on_connection_response_use_ertm
- self.dut.l2cap.SendDynamicChannelPacket(
- l2cap_facade_pb2.DynamicChannelPacket(
- psm=psm, payload=b'abc' * 34))
- cert_acl_data_stream.assert_event_occurs(
- lambda packet: b'abc' * 34 in packet.payload)
-
- i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
- dcid, 0, l2cap_packets.Final.NOT_SET, 1,
- l2cap_packets.SegmentationAndReassembly.UNSEGMENTED,
- SAMPLE_PACKET)
- self.cert_send_b_frame(i_frame)
+ psm = 0x33
+ scid = 0x41
+ self._open_channel(
+ cert_acl_data_stream,
+ 1,
+ cert_acl_handle,
+ scid,
+ psm,
+ mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
+
+ # FIXME: Order shouldn't matter here
+ cert_acl_data_stream.assert_event_occurs(
+ self.is_correct_configuration_response)
+ cert_acl_data_stream.assert_event_occurs(
+ self.is_correct_configuration_request)
+
+ dcid = self.scid_to_dcid[scid]
+
+ self.dut.l2cap.SendDynamicChannelPacket(
+ l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc' * 34))
+ cert_acl_data_stream.assert_event_occurs(
+ lambda packet: b'abc' * 34 in packet.payload)
+
+ i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
+ dcid, 0, l2cap_packets.Final.NOT_SET, 1,
+ l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET)
+ self.cert_send_b_frame(i_frame)
def test_basic_operation_request_connection(self):
"""
initiate the configuration procedure.
"""
cert_acl_handle = self._setup_link_from_cert()
+ cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
- with EventStream(
- self.cert.hci_acl_manager.FetchAclData(
- empty_proto.Empty())) as cert_acl_data_stream:
- cert_acl_data_stream.register_callback(self._handle_control_packet)
- psm = 0x33
- # TODO: Use another test case
- self.dut.l2cap.OpenChannel(
- l2cap_facade_pb2.OpenChannelRequest(
- remote=self.cert_address, psm=psm))
- cert_acl_data_stream.assert_event_occurs(
- self.is_correct_connection_request)
+ cert_acl_data_stream.register_callback(self._handle_control_packet)
+ psm = 0x33
+ # TODO: Use another test case
+ self.dut.l2cap.OpenChannel(
+ l2cap_facade_pb2.OpenChannelRequest(
+ remote=self.cert_address, psm=psm))
+ cert_acl_data_stream.assert_event_occurs(
+ self.is_correct_connection_request)
def test_accept_disconnect(self):
"""
L2CAP/COS/CED/BV-07-C
"""
cert_acl_handle = self._setup_link_from_cert()
+ cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
- with EventStream(
- self.cert.hci_acl_manager.FetchAclData(
- empty_proto.Empty())) as cert_acl_data_stream:
- cert_acl_data_stream.register_callback(self._handle_control_packet)
-
- scid = 0x41
- psm = 0x33
- self._open_channel(cert_acl_data_stream, 1, cert_acl_handle, scid,
- psm)
-
- dcid = self.scid_to_dcid[scid]
-
- close_channel = l2cap_packets.DisconnectionRequestBuilder(
- 1, dcid, scid)
- close_channel_l2cap = l2cap_packets.BasicFrameBuilder(
- 1, close_channel)
- self.cert_send_b_frame(close_channel_l2cap)
-
- def verify_disconnection_response(packet):
- packet_bytes = packet.payload
- l2cap_view = l2cap_packets.BasicFrameView(
- bt_packets.PacketViewLittleEndian(list(packet_bytes)))
- l2cap_control_view = l2cap_packets.ControlView(
- l2cap_view.GetPayload())
- if l2cap_control_view.GetCode(
- ) != l2cap_packets.CommandCode.DISCONNECTION_RESPONSE:
- return False
- disconnection_response_view = l2cap_packets.DisconnectionResponseView(
- l2cap_control_view)
- return disconnection_response_view.GetSourceCid(
- ) == scid and disconnection_response_view.GetDestinationCid(
- ) == dcid
+ cert_acl_data_stream.register_callback(self._handle_control_packet)
- cert_acl_data_stream.assert_event_occurs(
- verify_disconnection_response)
+ scid = 0x41
+ psm = 0x33
+ self._open_channel(cert_acl_data_stream, 1, cert_acl_handle, scid, psm)
+
+ dcid = self.scid_to_dcid[scid]
+
+ close_channel = l2cap_packets.DisconnectionRequestBuilder(1, dcid, scid)
+ close_channel_l2cap = l2cap_packets.BasicFrameBuilder(1, close_channel)
+ self.cert_send_b_frame(close_channel_l2cap)
+
+ def verify_disconnection_response(packet):
+ packet_bytes = packet.payload
+ l2cap_view = l2cap_packets.BasicFrameView(
+ bt_packets.PacketViewLittleEndian(list(packet_bytes)))
+ l2cap_control_view = l2cap_packets.ControlView(
+ l2cap_view.GetPayload())
+ if l2cap_control_view.GetCode(
+ ) != l2cap_packets.CommandCode.DISCONNECTION_RESPONSE:
+ return False
+ disconnection_response_view = l2cap_packets.DisconnectionResponseView(
+ l2cap_control_view)
+ return disconnection_response_view.GetSourceCid(
+ ) == scid and disconnection_response_view.GetDestinationCid() == dcid
+
+ cert_acl_data_stream.assert_event_occurs(verify_disconnection_response)
def test_disconnect_on_timeout(self):
"""
L2CAP/COS/CED/BV-08-C
"""
cert_acl_handle = self._setup_link_from_cert()
+ cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+
+ scid = 0x41
+ psm = 0x33
+ cert_acl_data_stream.register_callback(self._handle_control_packet)
+
+ # Don't send configuration request or response back
+ self.on_configuration_request = lambda _: True
+ self.on_connection_response = lambda _: True
+
+ self._open_channel(cert_acl_data_stream, 1, cert_acl_handle, scid, psm)
+
+ def is_configuration_response(l2cap_packet):
+ packet_bytes = l2cap_packet.payload
+ l2cap_view = l2cap_packets.BasicFrameView(
+ bt_packets.PacketViewLittleEndian(list(packet_bytes)))
+ if l2cap_view.GetChannelId() != 1:
+ return False
+ l2cap_control_view = l2cap_packets.ControlView(
+ l2cap_view.GetPayload())
+ return l2cap_control_view.GetCode(
+ ) == l2cap_packets.CommandCode.CONFIGURATION_RESPONSE
- with EventStream(
- self.cert.hci_acl_manager.FetchAclData(
- empty_proto.Empty())) as cert_acl_data_stream:
- scid = 0x41
- psm = 0x33
- cert_acl_data_stream.register_callback(self._handle_control_packet)
-
- # Don't send configuration request or response back
- self.on_configuration_request = lambda _: True
- self.on_connection_response = lambda _: True
-
- self._open_channel(cert_acl_data_stream, 1, cert_acl_handle, scid,
- psm)
-
- def is_configuration_response(l2cap_packet):
- packet_bytes = l2cap_packet.payload
- l2cap_view = l2cap_packets.BasicFrameView(
- bt_packets.PacketViewLittleEndian(list(packet_bytes)))
- if l2cap_view.GetChannelId() != 1:
- return False
- l2cap_control_view = l2cap_packets.ControlView(
- l2cap_view.GetPayload())
- return l2cap_control_view.GetCode(
- ) == l2cap_packets.CommandCode.CONFIGURATION_RESPONSE
-
- cert_acl_data_stream.assert_none_matching(is_configuration_response)
+ cert_acl_data_stream.assert_none_matching(is_configuration_response)
def test_retry_config_after_rejection(self):
"""
L2CAP/COS/CFD/BV-02-C
"""
cert_acl_handle = self._setup_link_from_cert()
- with EventStream(
- self.cert.hci_acl_manager.FetchAclData(
- empty_proto.Empty())) as cert_acl_data_stream:
- cert_acl_data_stream.register_callback(self._handle_control_packet)
+ cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+ cert_acl_data_stream.register_callback(self._handle_control_packet)
- psm = 0x33
- scid = 0x41
+ psm = 0x33
+ scid = 0x41
- self.on_configuration_request = self._on_configuration_request_unacceptable_parameters
+ self.on_configuration_request = self._on_configuration_request_unacceptable_parameters
- self._open_channel(
- cert_acl_data_stream,
- 1,
- cert_acl_handle,
- scid,
- psm,
- mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC)
+ self._open_channel(
+ cert_acl_data_stream,
+ 1,
+ cert_acl_handle,
+ scid,
+ psm,
+ mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC)
- cert_acl_data_stream.assert_event_occurs(
- self.is_correct_configuration_response)
- cert_acl_data_stream.assert_event_occurs(
- self.is_correct_configuration_request, at_least_times=2)
+ cert_acl_data_stream.assert_event_occurs(
+ self.is_correct_configuration_response)
+ cert_acl_data_stream.assert_event_occurs(
+ self.is_correct_configuration_request, at_least_times=2)
def test_respond_to_echo_request(self):
"""
Verify that the IUT responds to an echo request.
"""
cert_acl_handle = self._setup_link_from_cert()
- with EventStream(
- self.cert.hci_acl_manager.FetchAclData(
- empty_proto.Empty())) as cert_acl_data_stream:
- cert_acl_data_stream.register_callback(self._handle_control_packet)
+ cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+ cert_acl_data_stream.register_callback(self._handle_control_packet)
- echo_request = l2cap_packets.EchoRequestBuilder(
- 100, l2cap_packets.DisconnectionRequestBuilder(1, 2, 3))
- echo_request_l2cap = l2cap_packets.BasicFrameBuilder(
- 1, echo_request)
- self.cert_send_b_frame(echo_request_l2cap)
+ echo_request = l2cap_packets.EchoRequestBuilder(
+ 100, l2cap_packets.DisconnectionRequestBuilder(1, 2, 3))
+ echo_request_l2cap = l2cap_packets.BasicFrameBuilder(1, echo_request)
+ self.cert_send_b_frame(echo_request_l2cap)
- cert_acl_data_stream.assert_event_occurs(
- lambda packet: b"\x06\x01\x04\x00\x02\x00\x03\x00" in packet.payload
- )
+ cert_acl_data_stream.assert_event_occurs(
+ lambda packet: b"\x06\x01\x04\x00\x02\x00\x03\x00" in packet.payload
+ )
def test_reject_unknown_command(self):
"""
L2CAP/COS/CED/BI-01-C
"""
cert_acl_handle = self._setup_link_from_cert()
- with EventStream(
- self.cert.hci_acl_manager.FetchAclData(
- empty_proto.Empty())) as cert_acl_data_stream:
- cert_acl_data_stream.register_callback(self._handle_control_packet)
+ cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+ cert_acl_data_stream.register_callback(self._handle_control_packet)
- invalid_command_packet = b"\x04\x00\x01\x00\xff\x01\x00\x00"
- self.cert.hci_acl_manager.SendAclData(
- acl_manager_facade.AclData(
- handle=cert_acl_handle,
- payload=bytes(invalid_command_packet)))
-
- def is_command_reject(l2cap_packet):
- packet_bytes = l2cap_packet.payload
- l2cap_view = l2cap_packets.BasicFrameView(
- bt_packets.PacketViewLittleEndian(list(packet_bytes)))
- if l2cap_view.GetChannelId() != 1:
- return False
- l2cap_control_view = l2cap_packets.ControlView(
- l2cap_view.GetPayload())
- return l2cap_control_view.GetCode(
- ) == l2cap_packets.CommandCode.COMMAND_REJECT
-
- cert_acl_data_stream.assert_event_occurs(is_command_reject)
+ invalid_command_packet = b"\x04\x00\x01\x00\xff\x01\x00\x00"
+ self.cert.hci_acl_manager.SendAclData(
+ acl_manager_facade.AclData(
+ handle=cert_acl_handle, payload=bytes(invalid_command_packet)))
+
+ def is_command_reject(l2cap_packet):
+ packet_bytes = l2cap_packet.payload
+ l2cap_view = l2cap_packets.BasicFrameView(
+ bt_packets.PacketViewLittleEndian(list(packet_bytes)))
+ if l2cap_view.GetChannelId() != 1:
+ return False
+ l2cap_control_view = l2cap_packets.ControlView(
+ l2cap_view.GetPayload())
+ return l2cap_control_view.GetCode(
+ ) == l2cap_packets.CommandCode.COMMAND_REJECT
+
+ cert_acl_data_stream.assert_event_occurs(is_command_reject)
def test_query_for_1_2_features(self):
"""
L2CAP/COS/IEX/BV-01-C [Query for 1.2 Features]
"""
cert_acl_handle = self._setup_link_from_cert()
- with EventStream(
- self.cert.hci_acl_manager.FetchAclData(
- empty_proto.Empty())) as cert_acl_data_stream:
- cert_acl_data_stream.register_callback(self._handle_control_packet)
- signal_id = 3
- information_request = l2cap_packets.InformationRequestBuilder(
- signal_id, l2cap_packets.InformationRequestInfoType.
- EXTENDED_FEATURES_SUPPORTED)
- information_request_l2cap = l2cap_packets.BasicFrameBuilder(
- 1, information_request)
- self.cert_send_b_frame(information_request_l2cap)
-
- def is_correct_information_response(l2cap_packet):
- packet_bytes = l2cap_packet.payload
- l2cap_view = l2cap_packets.BasicFrameView(
- bt_packets.PacketViewLittleEndian(list(packet_bytes)))
- if l2cap_view.GetChannelId() != 1:
- return False
- l2cap_control_view = l2cap_packets.ControlView(
- l2cap_view.GetPayload())
- if l2cap_control_view.GetCode(
- ) != l2cap_packets.CommandCode.INFORMATION_RESPONSE:
- return False
- information_response_view = l2cap_packets.InformationResponseView(
- l2cap_control_view)
- return information_response_view.GetInfoType(
- ) == l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED
+ cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+ cert_acl_data_stream.register_callback(self._handle_control_packet)
+ signal_id = 3
+ information_request = l2cap_packets.InformationRequestBuilder(
+ signal_id,
+ l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED
+ )
+ information_request_l2cap = l2cap_packets.BasicFrameBuilder(
+ 1, information_request)
+ self.cert_send_b_frame(information_request_l2cap)
- cert_acl_data_stream.assert_event_occurs(
- is_correct_information_response)
+ def is_correct_information_response(l2cap_packet):
+ packet_bytes = l2cap_packet.payload
+ l2cap_view = l2cap_packets.BasicFrameView(
+ bt_packets.PacketViewLittleEndian(list(packet_bytes)))
+ if l2cap_view.GetChannelId() != 1:
+ return False
+ l2cap_control_view = l2cap_packets.ControlView(
+ l2cap_view.GetPayload())
+ if l2cap_control_view.GetCode(
+ ) != l2cap_packets.CommandCode.INFORMATION_RESPONSE:
+ return False
+ information_response_view = l2cap_packets.InformationResponseView(
+ l2cap_control_view)
+ return information_response_view.GetInfoType(
+ ) == l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED
+
+ cert_acl_data_stream.assert_event_occurs(
+ is_correct_information_response)
def test_extended_feature_info_response_ertm(self):
"""
Retransmission Mode]
"""
cert_acl_handle = self._setup_link_from_cert()
- with EventStream(
- self.cert.hci_acl_manager.FetchAclData(
- empty_proto.Empty())) as cert_acl_data_stream:
- cert_acl_data_stream.register_callback(self._handle_control_packet)
-
- signal_id = 3
- information_request = l2cap_packets.InformationRequestBuilder(
- signal_id, l2cap_packets.InformationRequestInfoType.
- EXTENDED_FEATURES_SUPPORTED)
- information_request_l2cap = l2cap_packets.BasicFrameBuilder(
- 1, information_request)
- self.cert_send_b_frame(information_request_l2cap)
-
- def is_correct_information_response(l2cap_packet):
- packet_bytes = l2cap_packet.payload
- l2cap_view = l2cap_packets.BasicFrameView(
- bt_packets.PacketViewLittleEndian(list(packet_bytes)))
- if l2cap_view.GetChannelId() != 1:
- return False
- l2cap_control_view = l2cap_packets.ControlView(
- l2cap_view.GetPayload())
- if l2cap_control_view.GetCode(
- ) != l2cap_packets.CommandCode.INFORMATION_RESPONSE:
- return False
- information_response_view = l2cap_packets.InformationResponseView(
- l2cap_control_view)
- if information_response_view.GetInfoType(
- ) != l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED:
- return False
- extended_features_view = l2cap_packets.InformationResponseExtendedFeaturesView(
- information_response_view)
- return extended_features_view.GetEnhancedRetransmissionMode()
+ cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+ cert_acl_data_stream.register_callback(self._handle_control_packet)
- cert_acl_data_stream.assert_event_occurs(
- is_correct_information_response)
+ signal_id = 3
+ information_request = l2cap_packets.InformationRequestBuilder(
+ signal_id,
+ l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED
+ )
+ information_request_l2cap = l2cap_packets.BasicFrameBuilder(
+ 1, information_request)
+ self.cert_send_b_frame(information_request_l2cap)
+
+ def is_correct_information_response(l2cap_packet):
+ packet_bytes = l2cap_packet.payload
+ l2cap_view = l2cap_packets.BasicFrameView(
+ bt_packets.PacketViewLittleEndian(list(packet_bytes)))
+ if l2cap_view.GetChannelId() != 1:
+ return False
+ l2cap_control_view = l2cap_packets.ControlView(
+ l2cap_view.GetPayload())
+ if l2cap_control_view.GetCode(
+ ) != l2cap_packets.CommandCode.INFORMATION_RESPONSE:
+ return False
+ information_response_view = l2cap_packets.InformationResponseView(
+ l2cap_control_view)
+ if information_response_view.GetInfoType(
+ ) != l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED:
+ return False
+ extended_features_view = l2cap_packets.InformationResponseExtendedFeaturesView(
+ information_response_view)
+ return extended_features_view.GetEnhancedRetransmissionMode()
+
+ cert_acl_data_stream.assert_event_occurs(
+ is_correct_information_response)
def test_extended_feature_info_response_fcs(self):
"""
Note: This is not mandated by L2CAP Spec
"""
cert_acl_handle = self._setup_link_from_cert()
- with EventStream(
- self.cert.hci_acl_manager.FetchAclData(
- empty_proto.Empty())) as cert_acl_data_stream:
- cert_acl_data_stream.register_callback(self._handle_control_packet)
-
- signal_id = 3
- information_request = l2cap_packets.InformationRequestBuilder(
- signal_id, l2cap_packets.InformationRequestInfoType.
- EXTENDED_FEATURES_SUPPORTED)
- information_request_l2cap = l2cap_packets.BasicFrameBuilder(
- 1, information_request)
- self.cert_send_b_frame(information_request_l2cap)
-
- def is_correct_information_response(l2cap_packet):
- packet_bytes = l2cap_packet.payload
- l2cap_view = l2cap_packets.BasicFrameView(
- bt_packets.PacketViewLittleEndian(list(packet_bytes)))
- if l2cap_view.GetChannelId() != 1:
- return False
- l2cap_control_view = l2cap_packets.ControlView(
- l2cap_view.GetPayload())
- if l2cap_control_view.GetCode(
- ) != l2cap_packets.CommandCode.INFORMATION_RESPONSE:
- return False
- information_response_view = l2cap_packets.InformationResponseView(
- l2cap_control_view)
- if information_response_view.GetInfoType(
- ) != l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED:
- return False
- extended_features_view = l2cap_packets.InformationResponseExtendedFeaturesView(
- information_response_view)
- return extended_features_view.GetFcsOption()
+ cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+ cert_acl_data_stream.register_callback(self._handle_control_packet)
- cert_acl_data_stream.assert_event_occurs(
- is_correct_information_response)
+ signal_id = 3
+ information_request = l2cap_packets.InformationRequestBuilder(
+ signal_id,
+ l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED
+ )
+ information_request_l2cap = l2cap_packets.BasicFrameBuilder(
+ 1, information_request)
+ self.cert_send_b_frame(information_request_l2cap)
+
+ def is_correct_information_response(l2cap_packet):
+ packet_bytes = l2cap_packet.payload
+ l2cap_view = l2cap_packets.BasicFrameView(
+ bt_packets.PacketViewLittleEndian(list(packet_bytes)))
+ if l2cap_view.GetChannelId() != 1:
+ return False
+ l2cap_control_view = l2cap_packets.ControlView(
+ l2cap_view.GetPayload())
+ if l2cap_control_view.GetCode(
+ ) != l2cap_packets.CommandCode.INFORMATION_RESPONSE:
+ return False
+ information_response_view = l2cap_packets.InformationResponseView(
+ l2cap_control_view)
+ if information_response_view.GetInfoType(
+ ) != l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED:
+ return False
+ extended_features_view = l2cap_packets.InformationResponseExtendedFeaturesView(
+ information_response_view)
+ return extended_features_view.GetFcsOption()
+
+ cert_acl_data_stream.assert_event_occurs(
+ is_correct_information_response)
def test_config_channel_not_use_FCS(self):
"""
Verify the IUT can configure a channel to not use FCS in I/S-frames.
"""
cert_acl_handle = self._setup_link_from_cert()
- with EventStream(
- self.cert.hci_acl_manager.FetchAclData(
- empty_proto.Empty())) as cert_acl_data_stream:
- cert_acl_data_stream.register_callback(self._handle_control_packet)
-
- self.on_connection_response = self._on_connection_response_use_ertm
-
- psm = 0x33
- scid = 0x41
- self._open_channel(
- cert_acl_data_stream,
- 1,
- cert_acl_handle,
- scid,
- psm,
- mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
- # FIXME: Order shouldn't matter here
- cert_acl_data_stream.assert_event_occurs(
- self.is_correct_configuration_response)
- cert_acl_data_stream.assert_event_occurs(
- self.is_correct_configuration_request)
+ cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+ cert_acl_data_stream.register_callback(self._handle_control_packet)
- self.dut.l2cap.SendDynamicChannelPacket(
- l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
- cert_acl_data_stream.assert_event_occurs(
- lambda packet: b"abc" in packet.payload)
+ self.on_connection_response = self._on_connection_response_use_ertm
+
+ psm = 0x33
+ scid = 0x41
+ self._open_channel(
+ cert_acl_data_stream,
+ 1,
+ cert_acl_handle,
+ scid,
+ psm,
+ mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
+ # FIXME: Order shouldn't matter here
+ cert_acl_data_stream.assert_event_occurs(
+ self.is_correct_configuration_response)
+ cert_acl_data_stream.assert_event_occurs(
+ self.is_correct_configuration_request)
+
+ self.dut.l2cap.SendDynamicChannelPacket(
+ l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
+ cert_acl_data_stream.assert_event_occurs(
+ lambda packet: b"abc" in packet.payload)
def test_explicitly_request_use_FCS(self):
"""
"""
cert_acl_handle = self._setup_link_from_cert()
- with EventStream(
- self.cert.hci_acl_manager.FetchAclData(
- empty_proto.Empty())) as cert_acl_data_stream:
- cert_acl_data_stream.register_callback(self._handle_control_packet)
-
- self.on_connection_response = self._on_connection_response_use_ertm_and_fcs
- psm = 0x33
- scid = 0x41
- self._open_channel(
- cert_acl_data_stream,
- 1,
- cert_acl_handle,
- scid,
- psm,
- mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
- # FIXME: Order shouldn't matter here
- cert_acl_data_stream.assert_event_occurs(
- self.is_correct_configuration_response)
- cert_acl_data_stream.assert_event_occurs(
- self.is_correct_configuration_request)
+ cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+ cert_acl_data_stream.register_callback(self._handle_control_packet)
- self.dut.l2cap.SendDynamicChannelPacket(
- l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
- cert_acl_data_stream.assert_event_occurs(
- lambda packet: b"abc\x4f\xa3" in packet.payload
- ) # TODO: Use packet parser
+ self.on_connection_response = self._on_connection_response_use_ertm_and_fcs
+ psm = 0x33
+ scid = 0x41
+ self._open_channel(
+ cert_acl_data_stream,
+ 1,
+ cert_acl_handle,
+ scid,
+ psm,
+ mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
+ # FIXME: Order shouldn't matter here
+ cert_acl_data_stream.assert_event_occurs(
+ self.is_correct_configuration_response)
+ cert_acl_data_stream.assert_event_occurs(
+ self.is_correct_configuration_request)
+
+ self.dut.l2cap.SendDynamicChannelPacket(
+ l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
+ cert_acl_data_stream.assert_event_occurs(
+ lambda packet: b"abc\x4f\xa3" in packet.payload
+ ) # TODO: Use packet parser
def test_implicitly_request_use_FCS(self):
"""
TODO: Update this test case. What's the difference between this one and test_explicitly_request_use_FCS?
"""
cert_acl_handle = self._setup_link_from_cert()
- with EventStream(
- self.cert.hci_acl_manager.FetchAclData(
- empty_proto.Empty())) as cert_acl_data_stream:
- cert_acl_data_stream.register_callback(self._handle_control_packet)
-
- self.on_connection_response = self._on_connection_response_use_ertm_and_fcs
- psm = 0x33
- scid = 0x41
- self._open_channel(
- cert_acl_data_stream,
- 1,
- cert_acl_handle,
- scid,
- psm,
- mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
- # FIXME: Order shouldn't matter here
- cert_acl_data_stream.assert_event_occurs(
- self.is_correct_configuration_response)
- cert_acl_data_stream.assert_event_occurs(
- self.is_correct_configuration_request)
+ cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+ cert_acl_data_stream.register_callback(self._handle_control_packet)
- self.dut.l2cap.SendDynamicChannelPacket(
- l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
- cert_acl_data_stream.assert_event_occurs(
- lambda packet: b"abc\x4f\xa3" in packet.payload
- ) # TODO: Use packet parser
+ self.on_connection_response = self._on_connection_response_use_ertm_and_fcs
+ psm = 0x33
+ scid = 0x41
+ self._open_channel(
+ cert_acl_data_stream,
+ 1,
+ cert_acl_handle,
+ scid,
+ psm,
+ mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
+ # FIXME: Order shouldn't matter here
+ cert_acl_data_stream.assert_event_occurs(
+ self.is_correct_configuration_response)
+ cert_acl_data_stream.assert_event_occurs(
+ self.is_correct_configuration_request)
+
+ self.dut.l2cap.SendDynamicChannelPacket(
+ l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
+ cert_acl_data_stream.assert_event_occurs(
+ lambda packet: b"abc\x4f\xa3" in packet.payload
+ ) # TODO: Use packet parser
def test_transmit_i_frames(self):
"""
L2CAP/ERM/BV-01-C [Transmit I-frames]
"""
cert_acl_handle = self._setup_link_from_cert()
- with EventStream(
- self.cert.hci_acl_manager.FetchAclData(
- empty_proto.Empty())) as cert_acl_data_stream:
- cert_acl_data_stream.register_callback(self._handle_control_packet)
-
- self.on_connection_response = self._on_connection_response_use_ertm
- psm = 0x33
- scid = 0x41
- self._open_channel(
- cert_acl_data_stream,
- 1,
- cert_acl_handle,
- scid,
- psm,
- mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
-
- dcid = self.scid_to_dcid[scid]
-
- # FIXME: Order shouldn't matter here
- cert_acl_data_stream.assert_event_occurs(
- self.is_correct_configuration_response)
- cert_acl_data_stream.assert_event_occurs(
- self.is_correct_configuration_request)
-
- self.dut.l2cap.SendDynamicChannelPacket(
- l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
- cert_acl_data_stream.assert_event_occurs(
- lambda packet: b"abc" in packet.payload)
-
- # Assemble a sample packet. TODO: Use RawBuilder
- SAMPLE_PACKET = l2cap_packets.CommandRejectNotUnderstoodBuilder(1)
-
- i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
- dcid, 0, l2cap_packets.Final.NOT_SET, 1,
- l2cap_packets.SegmentationAndReassembly.UNSEGMENTED,
- SAMPLE_PACKET)
- self.cert_send_b_frame(i_frame)
-
- self.dut.l2cap.SendDynamicChannelPacket(
- l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
- cert_acl_data_stream.assert_event_occurs(
- lambda packet: b"abc" in packet.payload)
-
- i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
- dcid, 1, l2cap_packets.Final.NOT_SET, 2,
- l2cap_packets.SegmentationAndReassembly.UNSEGMENTED,
- SAMPLE_PACKET)
- self.cert_send_b_frame(i_frame)
-
- self.dut.l2cap.SendDynamicChannelPacket(
- l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
- cert_acl_data_stream.assert_event_occurs(
- lambda packet: b"abc" in packet.payload)
+ cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+ cert_acl_data_stream.register_callback(self._handle_control_packet)
- i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
- dcid, 2, l2cap_packets.Final.NOT_SET, 3,
- l2cap_packets.SegmentationAndReassembly.UNSEGMENTED,
- SAMPLE_PACKET)
- self.cert_send_b_frame(i_frame)
+ self.on_connection_response = self._on_connection_response_use_ertm
+ psm = 0x33
+ scid = 0x41
+ self._open_channel(
+ cert_acl_data_stream,
+ 1,
+ cert_acl_handle,
+ scid,
+ psm,
+ mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
+
+ dcid = self.scid_to_dcid[scid]
+
+ # FIXME: Order shouldn't matter here
+ cert_acl_data_stream.assert_event_occurs(
+ self.is_correct_configuration_response)
+ cert_acl_data_stream.assert_event_occurs(
+ self.is_correct_configuration_request)
+
+ self.dut.l2cap.SendDynamicChannelPacket(
+ l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
+ cert_acl_data_stream.assert_event_occurs(
+ lambda packet: b"abc" in packet.payload)
+
+ # Assemble a sample packet. TODO: Use RawBuilder
+ SAMPLE_PACKET = l2cap_packets.CommandRejectNotUnderstoodBuilder(1)
+
+ i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
+ dcid, 0, l2cap_packets.Final.NOT_SET, 1,
+ l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET)
+ self.cert_send_b_frame(i_frame)
+
+ self.dut.l2cap.SendDynamicChannelPacket(
+ l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
+ cert_acl_data_stream.assert_event_occurs(
+ lambda packet: b"abc" in packet.payload)
+
+ i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
+ dcid, 1, l2cap_packets.Final.NOT_SET, 2,
+ l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET)
+ self.cert_send_b_frame(i_frame)
+
+ self.dut.l2cap.SendDynamicChannelPacket(
+ l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
+ cert_acl_data_stream.assert_event_occurs(
+ lambda packet: b"abc" in packet.payload)
+
+ i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
+ dcid, 2, l2cap_packets.Final.NOT_SET, 3,
+ l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET)
+ self.cert_send_b_frame(i_frame)
def test_receive_i_frames(self):
"""
Verify the IUT can receive in-sequence valid I-frames and deliver L2CAP SDUs to the Upper Tester
"""
cert_acl_handle = self._setup_link_from_cert()
- with EventStream(
- self.cert.hci_acl_manager.FetchAclData(
- empty_proto.Empty())) as cert_acl_data_stream:
- cert_acl_data_stream.register_callback(self._handle_control_packet)
- self.on_connection_response = self._on_connection_response_use_ertm
-
- psm = 0x33
- scid = 0x41
- self._open_channel(
- cert_acl_data_stream,
- 1,
- cert_acl_handle,
- scid,
- psm,
- mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
-
- dcid = self.scid_to_dcid[scid]
-
- # FIXME: Order shouldn't matter here
- cert_acl_data_stream.assert_event_occurs(
- self.is_correct_configuration_response)
- cert_acl_data_stream.assert_event_occurs(
- self.is_correct_configuration_request)
-
- for i in range(3):
- i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
- dcid, i, l2cap_packets.Final.NOT_SET, 0,
- l2cap_packets.SegmentationAndReassembly.UNSEGMENTED,
- SAMPLE_PACKET)
- self.cert_send_b_frame(i_frame)
- cert_acl_data_stream.assert_event_occurs(
- lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == i + 1
- )
+ cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+ cert_acl_data_stream.register_callback(self._handle_control_packet)
+ self.on_connection_response = self._on_connection_response_use_ertm
- i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
- dcid, 3, l2cap_packets.Final.NOT_SET, 0,
- l2cap_packets.SegmentationAndReassembly.START, SAMPLE_PACKET)
- self.cert_send_b_frame(i_frame)
- cert_acl_data_stream.assert_event_occurs(
- lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == 4
- )
+ psm = 0x33
+ scid = 0x41
+ self._open_channel(
+ cert_acl_data_stream,
+ 1,
+ cert_acl_handle,
+ scid,
+ psm,
+ mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
+
+ dcid = self.scid_to_dcid[scid]
+ # FIXME: Order shouldn't matter here
+ cert_acl_data_stream.assert_event_occurs(
+ self.is_correct_configuration_response)
+ cert_acl_data_stream.assert_event_occurs(
+ self.is_correct_configuration_request)
+
+ for i in range(3):
i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
- dcid, 4, l2cap_packets.Final.NOT_SET, 0,
- l2cap_packets.SegmentationAndReassembly.CONTINUATION,
+ dcid, i, l2cap_packets.Final.NOT_SET, 0,
+ l2cap_packets.SegmentationAndReassembly.UNSEGMENTED,
SAMPLE_PACKET)
self.cert_send_b_frame(i_frame)
cert_acl_data_stream.assert_event_occurs(
- lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == 5
+ lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == i + 1
)
- i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
- dcid, 5, l2cap_packets.Final.NOT_SET, 0,
- l2cap_packets.SegmentationAndReassembly.END, SAMPLE_PACKET)
- self.cert_send_b_frame(i_frame)
- cert_acl_data_stream.assert_event_occurs(
- lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == 6
- )
+ i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
+ dcid, 3, l2cap_packets.Final.NOT_SET, 0,
+ l2cap_packets.SegmentationAndReassembly.START, SAMPLE_PACKET)
+ self.cert_send_b_frame(i_frame)
+ cert_acl_data_stream.assert_event_occurs(
+ lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == 4
+ )
+
+ i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
+ dcid, 4, l2cap_packets.Final.NOT_SET, 0,
+ l2cap_packets.SegmentationAndReassembly.CONTINUATION, SAMPLE_PACKET)
+ self.cert_send_b_frame(i_frame)
+ cert_acl_data_stream.assert_event_occurs(
+ lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == 5
+ )
+
+ i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
+ dcid, 5, l2cap_packets.Final.NOT_SET, 0,
+ l2cap_packets.SegmentationAndReassembly.END, SAMPLE_PACKET)
+ self.cert_send_b_frame(i_frame)
+ cert_acl_data_stream.assert_event_occurs(
+ lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == 6
+ )
def test_acknowledging_received_i_frames(self):
"""
Lower Tester
"""
cert_acl_handle = self._setup_link_from_cert()
- with EventStream(
- self.cert.hci_acl_manager.FetchAclData(
- empty_proto.Empty())) as cert_acl_data_stream:
- cert_acl_data_stream.register_callback(self._handle_control_packet)
- self.on_connection_response = self._on_connection_response_use_ertm
-
- psm = 0x33
- scid = 0x41
- self._open_channel(
- cert_acl_data_stream,
- 1,
- cert_acl_handle,
- scid,
- psm,
- mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
-
- dcid = self.scid_to_dcid[scid]
-
- # FIXME: Order shouldn't matter here
- cert_acl_data_stream.assert_event_occurs(
- self.is_correct_configuration_response)
+ cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+ cert_acl_data_stream.register_callback(self._handle_control_packet)
+ self.on_connection_response = self._on_connection_response_use_ertm
+
+ psm = 0x33
+ scid = 0x41
+ self._open_channel(
+ cert_acl_data_stream,
+ 1,
+ cert_acl_handle,
+ scid,
+ psm,
+ mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
+
+ dcid = self.scid_to_dcid[scid]
+
+ # FIXME: Order shouldn't matter here
+ cert_acl_data_stream.assert_event_occurs(
+ self.is_correct_configuration_response)
+ cert_acl_data_stream.assert_event_occurs(
+ self.is_correct_configuration_request)
+
+ for i in range(3):
+ i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
+ dcid, i, l2cap_packets.Final.NOT_SET, 0,
+ l2cap_packets.SegmentationAndReassembly.UNSEGMENTED,
+ SAMPLE_PACKET)
+ self.cert_send_b_frame(i_frame)
cert_acl_data_stream.assert_event_occurs(
- self.is_correct_configuration_request)
-
- for i in range(3):
- i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
- dcid, i, l2cap_packets.Final.NOT_SET, 0,
- l2cap_packets.SegmentationAndReassembly.UNSEGMENTED,
- SAMPLE_PACKET)
- self.cert_send_b_frame(i_frame)
- cert_acl_data_stream.assert_event_occurs(
- lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == i + 1
- )
-
- cert_acl_data_stream.assert_none_matching(
- lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == 4,
- timedelta(seconds=1))
+ lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == i + 1
+ )
+
+ cert_acl_data_stream.assert_none_matching(
+ lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == 4,
+ timedelta(seconds=1))
def test_resume_transmitting_when_received_rr(self):
"""
"""
self.ertm_tx_window_size = 1
cert_acl_handle = self._setup_link_from_cert()
- with EventStream(
- self.cert.hci_acl_manager.FetchAclData(
- empty_proto.Empty())) as cert_acl_data_stream:
- cert_acl_data_stream.register_callback(self._handle_control_packet)
- self.on_connection_response = self._on_connection_response_use_ertm
-
- psm = 0x33
- scid = 0x41
- self._open_channel(
- cert_acl_data_stream,
- 1,
- cert_acl_handle,
- scid,
- psm,
- mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
-
- dcid = self.scid_to_dcid[scid]
-
- # FIXME: Order shouldn't matter here
- cert_acl_data_stream.assert_event_occurs(
- self.is_correct_configuration_response)
- cert_acl_data_stream.assert_event_occurs(
- self.is_correct_configuration_request)
+ cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+ cert_acl_data_stream.register_callback(self._handle_control_packet)
+ self.on_connection_response = self._on_connection_response_use_ertm
- self.dut.l2cap.SendDynamicChannelPacket(
- l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
- self.dut.l2cap.SendDynamicChannelPacket(
- l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'def'))
-
- # TODO: Besides checking TxSeq, we also want to check payload, once we can get it from packet view
- cert_acl_data_stream.assert_event_occurs(
- lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0
- )
- cert_acl_data_stream.assert_none_matching(
- lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1,
- )
- s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
- dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY,
- l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE,
- 1)
- self.cert_send_b_frame(s_frame)
- cert_acl_data_stream.assert_event_occurs(
- lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1
- )
+ psm = 0x33
+ scid = 0x41
+ self._open_channel(
+ cert_acl_data_stream,
+ 1,
+ cert_acl_handle,
+ scid,
+ psm,
+ mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
+
+ dcid = self.scid_to_dcid[scid]
+
+ # FIXME: Order shouldn't matter here
+ cert_acl_data_stream.assert_event_occurs(
+ self.is_correct_configuration_response)
+ cert_acl_data_stream.assert_event_occurs(
+ self.is_correct_configuration_request)
+
+ self.dut.l2cap.SendDynamicChannelPacket(
+ l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
+ self.dut.l2cap.SendDynamicChannelPacket(
+ l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'def'))
+
+ # TODO: Besides checking TxSeq, we also want to check payload, once we can get it from packet view
+ cert_acl_data_stream.assert_event_occurs(
+ lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0)
+ cert_acl_data_stream.assert_none_matching(
+ lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1,
+ )
+ s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
+ dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY,
+ l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE, 1)
+ self.cert_send_b_frame(s_frame)
+ cert_acl_data_stream.assert_event_occurs(
+ lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1)
def test_resume_transmitting_when_acknowledge_previously_sent(self):
"""
"""
self.ertm_tx_window_size = 1
cert_acl_handle = self._setup_link_from_cert()
- with EventStream(
- self.cert.hci_acl_manager.FetchAclData(
- empty_proto.Empty())) as cert_acl_data_stream:
- cert_acl_data_stream.register_callback(self._handle_control_packet)
- self.on_connection_response = self._on_connection_response_use_ertm
-
- psm = 0x33
- scid = 0x41
- self._open_channel(
- cert_acl_data_stream,
- 1,
- cert_acl_handle,
- scid,
- psm,
- mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
-
- dcid = self.scid_to_dcid[scid]
-
- # FIXME: Order shouldn't matter here
- cert_acl_data_stream.assert_event_occurs(
- self.is_correct_configuration_response)
- cert_acl_data_stream.assert_event_occurs(
- self.is_correct_configuration_request)
-
- self.dut.l2cap.SendDynamicChannelPacket(
- l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
- self.dut.l2cap.SendDynamicChannelPacket(
- l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'def'))
-
- cert_acl_data_stream.assert_event_occurs(
- lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0
- )
- # TODO: If 1 second is greater than their retransmit timeout, use a smaller timeout
- cert_acl_data_stream.assert_none_matching(
- lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1,
- timedelta(seconds=1))
+ cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+ cert_acl_data_stream.register_callback(self._handle_control_packet)
+ self.on_connection_response = self._on_connection_response_use_ertm
- i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
- dcid, 0, l2cap_packets.Final.NOT_SET, 1,
- l2cap_packets.SegmentationAndReassembly.UNSEGMENTED,
- SAMPLE_PACKET)
- self.cert_send_b_frame(i_frame)
-
- cert_acl_data_stream.assert_event_occurs(
- lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1
- )
-
- i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
- dcid, 1, l2cap_packets.Final.NOT_SET, 2,
- l2cap_packets.SegmentationAndReassembly.UNSEGMENTED,
- SAMPLE_PACKET)
- self.cert_send_b_frame(i_frame)
+ psm = 0x33
+ scid = 0x41
+ self._open_channel(
+ cert_acl_data_stream,
+ 1,
+ cert_acl_handle,
+ scid,
+ psm,
+ mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
+
+ dcid = self.scid_to_dcid[scid]
+
+ # FIXME: Order shouldn't matter here
+ cert_acl_data_stream.assert_event_occurs(
+ self.is_correct_configuration_response)
+ cert_acl_data_stream.assert_event_occurs(
+ self.is_correct_configuration_request)
+
+ self.dut.l2cap.SendDynamicChannelPacket(
+ l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
+ self.dut.l2cap.SendDynamicChannelPacket(
+ l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'def'))
+
+ cert_acl_data_stream.assert_event_occurs(
+ lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0)
+ # TODO: If 1 second is greater than their retransmit timeout, use a smaller timeout
+ cert_acl_data_stream.assert_none_matching(
+ lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1,
+ timedelta(seconds=1))
+
+ i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
+ dcid, 0, l2cap_packets.Final.NOT_SET, 1,
+ l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET)
+ self.cert_send_b_frame(i_frame)
+
+ cert_acl_data_stream.assert_event_occurs(
+ lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1)
+
+ i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
+ dcid, 1, l2cap_packets.Final.NOT_SET, 2,
+ l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET)
+ self.cert_send_b_frame(i_frame)
def test_transmit_s_frame_rr_with_poll_bit_set(self):
"""
Verify the IUT sends an S-frame [RR] with the Poll bit set when its retransmission timer expires.
"""
cert_acl_handle = self._setup_link_from_cert()
- with EventStream(
- self.cert.hci_acl_manager.FetchAclData(
- empty_proto.Empty())) as cert_acl_data_stream:
- cert_acl_data_stream.register_callback(self._handle_control_packet)
- self.on_connection_response = self._on_connection_response_use_ertm
-
- psm = 0x33
- scid = 0x41
- self._open_channel(
- cert_acl_data_stream,
- 1,
- cert_acl_handle,
- scid,
- psm,
- mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
-
- # FIXME: Order shouldn't matter here
- cert_acl_data_stream.assert_event_occurs(
- self.is_correct_configuration_response)
- cert_acl_data_stream.assert_event_occurs(
- self.is_correct_configuration_request)
+ cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+ cert_acl_data_stream.register_callback(self._handle_control_packet)
+ self.on_connection_response = self._on_connection_response_use_ertm
- self.dut.l2cap.SendDynamicChannelPacket(
- l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
- # TODO: Always use their retransmission timeout value
- time.sleep(2)
- cert_acl_data_stream.assert_event_occurs(
- lambda packet: self.get_p_from_ertm_s_frame(scid, packet) == l2cap_packets.Poll.POLL
- )
+ psm = 0x33
+ scid = 0x41
+ self._open_channel(
+ cert_acl_data_stream,
+ 1,
+ cert_acl_handle,
+ scid,
+ psm,
+ mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
+
+ # FIXME: Order shouldn't matter here
+ cert_acl_data_stream.assert_event_occurs(
+ self.is_correct_configuration_response)
+ cert_acl_data_stream.assert_event_occurs(
+ self.is_correct_configuration_request)
+
+ self.dut.l2cap.SendDynamicChannelPacket(
+ l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
+ # TODO: Always use their retransmission timeout value
+ time.sleep(2)
+ cert_acl_data_stream.assert_event_occurs(
+ lambda packet: self.get_p_from_ertm_s_frame(scid, packet) == l2cap_packets.Poll.POLL
+ )
def test_transmit_s_frame_rr_with_final_bit_set(self):
"""
with the Poll bit set.
"""
cert_acl_handle = self._setup_link_from_cert()
- with EventStream(
- self.cert.hci_acl_manager.FetchAclData(
- empty_proto.Empty())) as cert_acl_data_stream:
- cert_acl_data_stream.register_callback(self._handle_control_packet)
- self.on_connection_response = self._on_connection_response_use_ertm
-
- psm = 0x33
- scid = 0x41
- self._open_channel(
- cert_acl_data_stream,
- 1,
- cert_acl_handle,
- scid,
- psm,
- mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
-
- # FIXME: Order shouldn't matter here
- cert_acl_data_stream.assert_event_occurs(
- self.is_correct_configuration_response)
- cert_acl_data_stream.assert_event_occurs(
- self.is_correct_configuration_request)
-
- dcid = self.scid_to_dcid[scid]
+ cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+ cert_acl_data_stream.register_callback(self._handle_control_packet)
+ self.on_connection_response = self._on_connection_response_use_ertm
- s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
- dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY,
- l2cap_packets.Poll.POLL, l2cap_packets.Final.NOT_SET, 0)
- self.cert_send_b_frame(s_frame)
-
- cert_acl_data_stream.assert_event_occurs(
- lambda packet: self.get_f_from_ertm_s_frame(scid, packet) == l2cap_packets.Final.POLL_RESPONSE
- )
+ psm = 0x33
+ scid = 0x41
+ self._open_channel(
+ cert_acl_data_stream,
+ 1,
+ cert_acl_handle,
+ scid,
+ psm,
+ mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
+
+ # FIXME: Order shouldn't matter here
+ cert_acl_data_stream.assert_event_occurs(
+ self.is_correct_configuration_response)
+ cert_acl_data_stream.assert_event_occurs(
+ self.is_correct_configuration_request)
+
+ dcid = self.scid_to_dcid[scid]
+
+ s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
+ dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY,
+ l2cap_packets.Poll.POLL, l2cap_packets.Final.NOT_SET, 0)
+ self.cert_send_b_frame(s_frame)
+
+ cert_acl_data_stream.assert_event_occurs(
+ lambda packet: self.get_f_from_ertm_s_frame(scid, packet) == l2cap_packets.Final.POLL_RESPONSE
+ )
def test_s_frame_transmissions_exceed_max_transmit(self):
"""
"""
asserts.skip("Need to configure DUT to have a shorter timer")
cert_acl_handle = self._setup_link_from_cert()
- with EventStream(
- self.cert.hci_acl_manager.FetchAclData(
- empty_proto.Empty())) as cert_acl_data_stream:
- cert_acl_data_stream.register_callback(self._handle_control_packet)
- self.on_connection_response = self._on_connection_response_use_ertm
-
- psm = 0x33
- scid = 0x41
- self._open_channel(
- cert_acl_data_stream,
- 1,
- cert_acl_handle,
- scid,
- psm,
- mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
-
- # FIXME: Order shouldn't matter here
- cert_acl_data_stream.assert_event_occurs(
- self.is_correct_configuration_response)
- cert_acl_data_stream.assert_event_occurs(
- self.is_correct_configuration_request)
+ cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+ cert_acl_data_stream.register_callback(self._handle_control_packet)
+ self.on_connection_response = self._on_connection_response_use_ertm
- dcid = self.scid_to_dcid[scid]
+ psm = 0x33
+ scid = 0x41
+ self._open_channel(
+ cert_acl_data_stream,
+ 1,
+ cert_acl_handle,
+ scid,
+ psm,
+ mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
- self.dut.l2cap.SendDynamicChannelPacket(
- l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
+ # FIXME: Order shouldn't matter here
+ cert_acl_data_stream.assert_event_occurs(
+ self.is_correct_configuration_response)
+ cert_acl_data_stream.assert_event_occurs(
+ self.is_correct_configuration_request)
- # Retransmission timer = 2, 20 * monitor timer = 360, so total timeout is 362
- time.sleep(362)
- cert_acl_data_stream.assert_event_occurs(
- self.is_correct_disconnection_request)
+ dcid = self.scid_to_dcid[scid]
+
+ self.dut.l2cap.SendDynamicChannelPacket(
+ l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
+
+ # Retransmission timer = 2, 20 * monitor timer = 360, so total timeout is 362
+ time.sleep(362)
+ cert_acl_data_stream.assert_event_occurs(
+ self.is_correct_disconnection_request)
def test_i_frame_transmissions_exceed_max_transmit(self):
"""
not acknowledge the previous I-frame sent by the IUT.
"""
cert_acl_handle = self._setup_link_from_cert()
- with EventStream(
- self.cert.hci_acl_manager.FetchAclData(
- empty_proto.Empty())) as cert_acl_data_stream:
- cert_acl_data_stream.register_callback(self._handle_control_packet)
- self.on_connection_response = self._on_connection_response_use_ertm
-
- psm = 0x33
- scid = 0x41
- self._open_channel(
- cert_acl_data_stream,
- 1,
- cert_acl_handle,
- scid,
- psm,
- mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
-
- # FIXME: Order shouldn't matter here
- cert_acl_data_stream.assert_event_occurs(
- self.is_correct_configuration_response)
- cert_acl_data_stream.assert_event_occurs(
- self.is_correct_configuration_request)
+ cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+ cert_acl_data_stream.register_callback(self._handle_control_packet)
+ self.on_connection_response = self._on_connection_response_use_ertm
- dcid = self.scid_to_dcid[scid]
+ psm = 0x33
+ scid = 0x41
+ self._open_channel(
+ cert_acl_data_stream,
+ 1,
+ cert_acl_handle,
+ scid,
+ psm,
+ mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
- self.dut.l2cap.SendDynamicChannelPacket(
- l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
+ # FIXME: Order shouldn't matter here
+ cert_acl_data_stream.assert_event_occurs(
+ self.is_correct_configuration_response)
+ cert_acl_data_stream.assert_event_occurs(
+ self.is_correct_configuration_request)
- cert_acl_data_stream.assert_event_occurs(
- lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0
- )
+ dcid = self.scid_to_dcid[scid]
- s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
- dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY,
- l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE,
- 0)
- self.cert_send_b_frame(s_frame)
+ self.dut.l2cap.SendDynamicChannelPacket(
+ l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
- cert_acl_data_stream.assert_event_occurs(
- self.is_correct_disconnection_request)
+ cert_acl_data_stream.assert_event_occurs(
+ lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0)
+
+ s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
+ dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY,
+ l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE, 0)
+ self.cert_send_b_frame(s_frame)
+
+ cert_acl_data_stream.assert_event_occurs(
+ self.is_correct_disconnection_request)
def test_respond_to_rej(self):
"""
self.ertm_tx_window_size = 2
self.ertm_max_transmit = 2
cert_acl_handle = self._setup_link_from_cert()
- with EventStream(
- self.cert.hci_acl_manager.FetchAclData(
- empty_proto.Empty())) as cert_acl_data_stream:
- cert_acl_data_stream.register_callback(self._handle_control_packet)
- self.on_connection_response = self._on_connection_response_use_ertm
-
- psm = 0x33
- scid = 0x41
- self._open_channel(
- cert_acl_data_stream,
- 1,
- cert_acl_handle,
- scid,
- psm,
- mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
-
- # FIXME: Order shouldn't matter here
- cert_acl_data_stream.assert_event_occurs(
- self.is_correct_configuration_response)
- cert_acl_data_stream.assert_event_occurs(
- self.is_correct_configuration_request)
+ cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+ cert_acl_data_stream.register_callback(self._handle_control_packet)
+ self.on_connection_response = self._on_connection_response_use_ertm
- dcid = self.scid_to_dcid[scid]
+ psm = 0x33
+ scid = 0x41
+ self._open_channel(
+ cert_acl_data_stream,
+ 1,
+ cert_acl_handle,
+ scid,
+ psm,
+ mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
- self.dut.l2cap.SendDynamicChannelPacket(
- l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
- self.dut.l2cap.SendDynamicChannelPacket(
- l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
- for i in range(2):
- cert_acl_data_stream.assert_event_occurs(
- lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == i,
- timeout=timedelta(seconds=0.5))
+ # FIXME: Order shouldn't matter here
+ cert_acl_data_stream.assert_event_occurs(
+ self.is_correct_configuration_response)
+ cert_acl_data_stream.assert_event_occurs(
+ self.is_correct_configuration_request)
- s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
- dcid, l2cap_packets.SupervisoryFunction.REJECT,
- l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.NOT_SET, 0)
- self.cert_send_b_frame(s_frame)
+ dcid = self.scid_to_dcid[scid]
- for i in range(2):
- cert_acl_data_stream.assert_event_occurs(
- lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == i,
- timeout=timedelta(seconds=0.5))
+ self.dut.l2cap.SendDynamicChannelPacket(
+ l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
+ self.dut.l2cap.SendDynamicChannelPacket(
+ l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
+ for i in range(2):
+ cert_acl_data_stream.assert_event_occurs(
+ lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == i,
+ timeout=timedelta(seconds=0.5))
+
+ s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
+ dcid, l2cap_packets.SupervisoryFunction.REJECT,
+ l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.NOT_SET, 0)
+ self.cert_send_b_frame(s_frame)
+
+ for i in range(2):
+ cert_acl_data_stream.assert_event_occurs(
+ lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == i,
+ timeout=timedelta(seconds=0.5))
def test_receive_s_frame_rr_final_bit_set(self):
"""
[RR] with the Final Bit set.
"""
cert_acl_handle = self._setup_link_from_cert()
- with EventStream(
- self.cert.hci_acl_manager.FetchAclData(
- empty_proto.Empty())) as cert_acl_data_stream:
- cert_acl_data_stream.register_callback(self._handle_control_packet)
- self.on_connection_response = self._on_connection_response_use_ertm
-
- psm = 0x33
- scid = 0x41
- self._open_channel(
- cert_acl_data_stream,
- 1,
- cert_acl_handle,
- scid,
- psm,
- mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
-
- # FIXME: Order shouldn't matter here
- cert_acl_data_stream.assert_event_occurs(
- self.is_correct_configuration_response)
- cert_acl_data_stream.assert_event_occurs(
- self.is_correct_configuration_request)
+ cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+ cert_acl_data_stream.register_callback(self._handle_control_packet)
+ self.on_connection_response = self._on_connection_response_use_ertm
- dcid = self.scid_to_dcid[scid]
-
- self.dut.l2cap.SendDynamicChannelPacket(
- l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
-
- # TODO: Always use their retransmission timeout value
- time.sleep(2)
- cert_acl_data_stream.assert_event_occurs(
- lambda packet: self.get_p_from_ertm_s_frame(scid, packet) == l2cap_packets.Poll.POLL
- )
+ psm = 0x33
+ scid = 0x41
+ self._open_channel(
+ cert_acl_data_stream,
+ 1,
+ cert_acl_handle,
+ scid,
+ psm,
+ mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
+
+ # FIXME: Order shouldn't matter here
+ cert_acl_data_stream.assert_event_occurs(
+ self.is_correct_configuration_response)
+ cert_acl_data_stream.assert_event_occurs(
+ self.is_correct_configuration_request)
+
+ dcid = self.scid_to_dcid[scid]
+
+ self.dut.l2cap.SendDynamicChannelPacket(
+ l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
+
+ # TODO: Always use their retransmission timeout value
+ time.sleep(2)
+ cert_acl_data_stream.assert_event_occurs(
+ lambda packet: self.get_p_from_ertm_s_frame(scid, packet) == l2cap_packets.Poll.POLL
+ )
- s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
- dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY,
- l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE,
- 0)
- self.cert_send_b_frame(s_frame)
+ s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
+ dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY,
+ l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE, 0)
+ self.cert_send_b_frame(s_frame)
- cert_acl_data_stream.assert_event_occurs(
- lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0
- )
+ cert_acl_data_stream.assert_event_occurs(
+ lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0)
def test_receive_i_frame_final_bit_set(self):
"""
with the final bit set.
"""
cert_acl_handle = self._setup_link_from_cert()
- with EventStream(
- self.cert.hci_acl_manager.FetchAclData(
- empty_proto.Empty())) as cert_acl_data_stream:
- cert_acl_data_stream.register_callback(self._handle_control_packet)
- self.on_connection_response = self._on_connection_response_use_ertm
-
- psm = 0x33
- scid = 0x41
- self._open_channel(
- cert_acl_data_stream,
- 1,
- cert_acl_handle,
- scid,
- psm,
- mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
-
- # FIXME: Order shouldn't matter here
- cert_acl_data_stream.assert_event_occurs(
- self.is_correct_configuration_response)
- cert_acl_data_stream.assert_event_occurs(
- self.is_correct_configuration_request)
+ cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+ cert_acl_data_stream.register_callback(self._handle_control_packet)
+ self.on_connection_response = self._on_connection_response_use_ertm
- dcid = self.scid_to_dcid[scid]
-
- self.dut.l2cap.SendDynamicChannelPacket(
- l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
-
- # TODO: Always use their retransmission timeout value
- time.sleep(2)
- cert_acl_data_stream.assert_event_occurs(
- lambda packet: self.get_p_from_ertm_s_frame(scid, packet) == l2cap_packets.Poll.POLL
- )
+ psm = 0x33
+ scid = 0x41
+ self._open_channel(
+ cert_acl_data_stream,
+ 1,
+ cert_acl_handle,
+ scid,
+ psm,
+ mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
+
+ # FIXME: Order shouldn't matter here
+ cert_acl_data_stream.assert_event_occurs(
+ self.is_correct_configuration_response)
+ cert_acl_data_stream.assert_event_occurs(
+ self.is_correct_configuration_request)
+
+ dcid = self.scid_to_dcid[scid]
+
+ self.dut.l2cap.SendDynamicChannelPacket(
+ l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
+
+ # TODO: Always use their retransmission timeout value
+ time.sleep(2)
+ cert_acl_data_stream.assert_event_occurs(
+ lambda packet: self.get_p_from_ertm_s_frame(scid, packet) == l2cap_packets.Poll.POLL
+ )
- i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
- dcid, 0, l2cap_packets.Final.POLL_RESPONSE, 0,
- l2cap_packets.SegmentationAndReassembly.UNSEGMENTED,
- SAMPLE_PACKET)
- self.cert_send_b_frame(i_frame)
+ i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
+ dcid, 0, l2cap_packets.Final.POLL_RESPONSE, 0,
+ l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET)
+ self.cert_send_b_frame(i_frame)
- cert_acl_data_stream.assert_event_occurs(
- lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0
- )
+ cert_acl_data_stream.assert_event_occurs(
+ lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0)
def test_recieve_rnr(self):
"""
Lower Tester (S-frame [RNR]).
"""
cert_acl_handle = self._setup_link_from_cert()
- with EventStream(
- self.cert.hci_acl_manager.FetchAclData(
- empty_proto.Empty())) as cert_acl_data_stream:
- cert_acl_data_stream.register_callback(self._handle_control_packet)
- self.on_connection_response = self._on_connection_response_use_ertm
-
- psm = 0x33
- scid = 0x41
- self._open_channel(
- cert_acl_data_stream,
- 1,
- cert_acl_handle,
- scid,
- psm,
- mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
-
- # FIXME: Order shouldn't matter here
- cert_acl_data_stream.assert_event_occurs(
- self.is_correct_configuration_response)
- cert_acl_data_stream.assert_event_occurs(
- self.is_correct_configuration_request)
-
- dcid = self.scid_to_dcid[scid]
-
- self.dut.l2cap.SendDynamicChannelPacket(
- l2cap_facade_pb2.DynamicChannelPacket(psm=0x33, payload=b'abc'))
+ cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+ cert_acl_data_stream.register_callback(self._handle_control_packet)
+ self.on_connection_response = self._on_connection_response_use_ertm
- # TODO: Always use their retransmission timeout value
- time.sleep(2)
- cert_acl_data_stream.assert_event_occurs(
- lambda packet: self.get_p_from_ertm_s_frame(scid, packet) == l2cap_packets.Poll.POLL
- )
+ psm = 0x33
+ scid = 0x41
+ self._open_channel(
+ cert_acl_data_stream,
+ 1,
+ cert_acl_handle,
+ scid,
+ psm,
+ mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
+
+ # FIXME: Order shouldn't matter here
+ cert_acl_data_stream.assert_event_occurs(
+ self.is_correct_configuration_response)
+ cert_acl_data_stream.assert_event_occurs(
+ self.is_correct_configuration_request)
+
+ dcid = self.scid_to_dcid[scid]
+
+ self.dut.l2cap.SendDynamicChannelPacket(
+ l2cap_facade_pb2.DynamicChannelPacket(psm=0x33, payload=b'abc'))
+
+ # TODO: Always use their retransmission timeout value
+ time.sleep(2)
+ cert_acl_data_stream.assert_event_occurs(
+ lambda packet: self.get_p_from_ertm_s_frame(scid, packet) == l2cap_packets.Poll.POLL
+ )
- s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
- dcid, l2cap_packets.SupervisoryFunction.RECEIVER_NOT_READY,
- l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE,
- 0)
- self.cert_send_b_frame(s_frame)
+ s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
+ dcid, l2cap_packets.SupervisoryFunction.RECEIVER_NOT_READY,
+ l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE, 0)
+ self.cert_send_b_frame(s_frame)
- cert_acl_data_stream.assert_none_matching(
- lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0
- )
+ cert_acl_data_stream.assert_none_matching(
+ lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0)
def test_sent_rej_lost(self):
"""
"""
self.ertm_tx_window_size = 5
cert_acl_handle = self._setup_link_from_cert()
- with EventStream(
- self.cert.hci_acl_manager.FetchAclData(
- empty_proto.Empty())) as cert_acl_data_stream:
- cert_acl_data_stream.register_callback(self._handle_control_packet)
- self.on_connection_response = self._on_connection_response_use_ertm
-
- psm = 0x33
- scid = 0x41
- self._open_channel(
- cert_acl_data_stream,
- 1,
- cert_acl_handle,
- scid,
- psm,
- mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
-
- # FIXME: Order shouldn't matter here
- cert_acl_data_stream.assert_event_occurs(
- self.is_correct_configuration_response)
- cert_acl_data_stream.assert_event_occurs(
- self.is_correct_configuration_request)
+ cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+ cert_acl_data_stream.register_callback(self._handle_control_packet)
+ self.on_connection_response = self._on_connection_response_use_ertm
- dcid = self.scid_to_dcid[scid]
+ psm = 0x33
+ scid = 0x41
+ self._open_channel(
+ cert_acl_data_stream,
+ 1,
+ cert_acl_handle,
+ scid,
+ psm,
+ mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
+
+ # FIXME: Order shouldn't matter here
+ cert_acl_data_stream.assert_event_occurs(
+ self.is_correct_configuration_response)
+ cert_acl_data_stream.assert_event_occurs(
+ self.is_correct_configuration_request)
+
+ dcid = self.scid_to_dcid[scid]
+
+ i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
+ dcid, 0, l2cap_packets.Final.NOT_SET, 0,
+ l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET)
+ self.cert_send_b_frame(i_frame)
+ cert_acl_data_stream.assert_event_occurs(
+ lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == 1
+ )
- i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
- dcid, 0, l2cap_packets.Final.NOT_SET, 0,
- l2cap_packets.SegmentationAndReassembly.UNSEGMENTED,
- SAMPLE_PACKET)
- self.cert_send_b_frame(i_frame)
- cert_acl_data_stream.assert_event_occurs(
- lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == 1
- )
+ i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
+ dcid, self.ertm_tx_window_size - 1, l2cap_packets.Final.NOT_SET, 0,
+ l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET)
+ self.cert_send_b_frame(i_frame)
+ cert_acl_data_stream.assert_event_occurs(
+ lambda packet: self.get_s_from_ertm_s_frame(scid, packet) == l2cap_packets.SupervisoryFunction.REJECT
+ )
+
+ s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
+ dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY,
+ l2cap_packets.Poll.POLL, l2cap_packets.Final.NOT_SET, 0)
+ self.cert_send_b_frame(s_frame)
+ cert_acl_data_stream.assert_event_occurs(
+ lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == 1 and self.get_f_from_ertm_s_frame(scid, packet) == l2cap_packets.Final.POLL_RESPONSE)
+ for i in range(1, self.ertm_tx_window_size):
i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
- dcid, self.ertm_tx_window_size - 1, l2cap_packets.Final.NOT_SET,
- 0, l2cap_packets.SegmentationAndReassembly.UNSEGMENTED,
+ dcid, i, l2cap_packets.Final.NOT_SET, 0,
+ l2cap_packets.SegmentationAndReassembly.UNSEGMENTED,
SAMPLE_PACKET)
self.cert_send_b_frame(i_frame)
cert_acl_data_stream.assert_event_occurs(
- lambda packet: self.get_s_from_ertm_s_frame(scid, packet) == l2cap_packets.SupervisoryFunction.REJECT
+ lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == i + 1
)
- s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
- dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY,
- l2cap_packets.Poll.POLL, l2cap_packets.Final.NOT_SET, 0)
- self.cert_send_b_frame(s_frame)
-
- cert_acl_data_stream.assert_event_occurs(
- lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == 1 and self.get_f_from_ertm_s_frame(scid, packet) == l2cap_packets.Final.POLL_RESPONSE)
- for i in range(1, self.ertm_tx_window_size):
- i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
- dcid, i, l2cap_packets.Final.NOT_SET, 0,
- l2cap_packets.SegmentationAndReassembly.UNSEGMENTED,
- SAMPLE_PACKET)
- self.cert_send_b_frame(i_frame)
- cert_acl_data_stream.assert_event_occurs(
- lambda packet: self.get_req_seq_from_ertm_s_frame(scid, packet) == i + 1
- )
-
def test_handle_duplicate_srej(self):
"""
L2CAP/ERM/BI-03-C [Handle Duplicate S-Frame [SREJ]]
Verify the IUT will only retransmit the requested I-frame once after receiving a duplicate SREJ.
"""
cert_acl_handle = self._setup_link_from_cert()
- with EventStream(
- self.cert.hci_acl_manager.FetchAclData(
- empty_proto.Empty())) as cert_acl_data_stream:
- cert_acl_data_stream.register_callback(self._handle_control_packet)
- self.on_connection_response = self._on_connection_response_use_ertm
-
- psm = 0x33
- scid = 0x41
- self._open_channel(
- cert_acl_data_stream,
- 1,
- cert_acl_handle,
- scid,
- psm,
- mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
-
- # FIXME: Order shouldn't matter here
- cert_acl_data_stream.assert_event_occurs(
- self.is_correct_configuration_response)
- cert_acl_data_stream.assert_event_occurs(
- self.is_correct_configuration_request)
+ cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+ cert_acl_data_stream.register_callback(self._handle_control_packet)
+ self.on_connection_response = self._on_connection_response_use_ertm
- dcid = self.scid_to_dcid[scid]
-
- self.dut.l2cap.SendDynamicChannelPacket(
- l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
- self.dut.l2cap.SendDynamicChannelPacket(
- l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
- cert_acl_data_stream.assert_event_occurs(
- lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0,
- timeout=timedelta(0.5))
- cert_acl_data_stream.assert_event_occurs(
- lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1,
- timeout=timedelta(0.5))
- cert_acl_data_stream.assert_event_occurs(
- lambda packet: self.get_p_from_ertm_s_frame(scid, packet) == l2cap_packets.Poll.POLL
- )
+ psm = 0x33
+ scid = 0x41
+ self._open_channel(
+ cert_acl_data_stream,
+ 1,
+ cert_acl_handle,
+ scid,
+ psm,
+ mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
+
+ # FIXME: Order shouldn't matter here
+ cert_acl_data_stream.assert_event_occurs(
+ self.is_correct_configuration_response)
+ cert_acl_data_stream.assert_event_occurs(
+ self.is_correct_configuration_request)
+
+ dcid = self.scid_to_dcid[scid]
+
+ self.dut.l2cap.SendDynamicChannelPacket(
+ l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
+ self.dut.l2cap.SendDynamicChannelPacket(
+ l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
+ cert_acl_data_stream.assert_event_occurs(
+ lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0,
+ timeout=timedelta(0.5))
+ cert_acl_data_stream.assert_event_occurs(
+ lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1,
+ timeout=timedelta(0.5))
+ cert_acl_data_stream.assert_event_occurs(
+ lambda packet: self.get_p_from_ertm_s_frame(scid, packet) == l2cap_packets.Poll.POLL
+ )
- # Send SREJ with F not set
- s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
- dcid, l2cap_packets.SupervisoryFunction.SELECT_REJECT,
- l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.NOT_SET, 0)
- self.cert_send_b_frame(s_frame)
+ # Send SREJ with F not set
+ s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
+ dcid, l2cap_packets.SupervisoryFunction.SELECT_REJECT,
+ l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.NOT_SET, 0)
+ self.cert_send_b_frame(s_frame)
- cert_acl_data_stream.assert_none(timeout=timedelta(seconds=0.5))
- # Send SREJ with F set
- s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
- dcid, l2cap_packets.SupervisoryFunction.SELECT_REJECT,
- l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE,
- 0)
- self.cert_send_b_frame(s_frame)
+ cert_acl_data_stream.assert_none(timeout=timedelta(seconds=0.5))
+ # Send SREJ with F set
+ s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
+ dcid, l2cap_packets.SupervisoryFunction.SELECT_REJECT,
+ l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE, 0)
+ self.cert_send_b_frame(s_frame)
- cert_acl_data_stream.assert_event_occurs(
- lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0
- )
+ cert_acl_data_stream.assert_event_occurs(
+ lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0)
def test_handle_receipt_rej_and_rr_with_f_set(self):
"""
retransmitted.
"""
cert_acl_handle = self._setup_link_from_cert()
- with EventStream(
- self.cert.hci_acl_manager.FetchAclData(
- empty_proto.Empty())) as cert_acl_data_stream:
- cert_acl_data_stream.register_callback(self._handle_control_packet)
- self.on_connection_response = self._on_connection_response_use_ertm
-
- psm = 0x33
- scid = 0x41
- self._open_channel(
- cert_acl_data_stream,
- 1,
- cert_acl_handle,
- scid,
- psm,
- mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
-
- # FIXME: Order shouldn't matter here
- cert_acl_data_stream.assert_event_occurs(
- self.is_correct_configuration_response)
- cert_acl_data_stream.assert_event_occurs(
- self.is_correct_configuration_request)
-
- dcid = self.scid_to_dcid[scid]
-
- self.dut.l2cap.SendDynamicChannelPacket(
- l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
- self.dut.l2cap.SendDynamicChannelPacket(
- l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
- cert_acl_data_stream.assert_event_occurs(
- lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0,
- timeout=timedelta(0.5))
- cert_acl_data_stream.assert_event_occurs(
- lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1,
- timeout=timedelta(0.5))
- cert_acl_data_stream.assert_event_occurs(
- lambda packet: self.get_p_from_ertm_s_frame(scid, packet) == l2cap_packets.Poll.POLL,
- timeout=timedelta(2))
-
- # Send REJ with F not set
- s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
- dcid, l2cap_packets.SupervisoryFunction.REJECT,
- l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.NOT_SET, 0)
- self.cert_send_b_frame(s_frame)
-
- cert_acl_data_stream.assert_none(timeout=timedelta(seconds=0.5))
+ cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+ cert_acl_data_stream.register_callback(self._handle_control_packet)
+ self.on_connection_response = self._on_connection_response_use_ertm
- # Send RR with F set
- s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
- dcid, l2cap_packets.SupervisoryFunction.REJECT,
- l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE,
- 0)
- self.cert_send_b_frame(s_frame)
-
- cert_acl_data_stream.assert_event_occurs(
- lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0
- )
- cert_acl_data_stream.assert_event_occurs(
- lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1
- )
+ psm = 0x33
+ scid = 0x41
+ self._open_channel(
+ cert_acl_data_stream,
+ 1,
+ cert_acl_handle,
+ scid,
+ psm,
+ mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
+
+ # FIXME: Order shouldn't matter here
+ cert_acl_data_stream.assert_event_occurs(
+ self.is_correct_configuration_response)
+ cert_acl_data_stream.assert_event_occurs(
+ self.is_correct_configuration_request)
+
+ dcid = self.scid_to_dcid[scid]
+
+ self.dut.l2cap.SendDynamicChannelPacket(
+ l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
+ self.dut.l2cap.SendDynamicChannelPacket(
+ l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
+ cert_acl_data_stream.assert_event_occurs(
+ lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0,
+ timeout=timedelta(0.5))
+ cert_acl_data_stream.assert_event_occurs(
+ lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1,
+ timeout=timedelta(0.5))
+ cert_acl_data_stream.assert_event_occurs(
+ lambda packet: self.get_p_from_ertm_s_frame(scid, packet) == l2cap_packets.Poll.POLL,
+ timeout=timedelta(2))
+
+ # Send REJ with F not set
+ s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
+ dcid, l2cap_packets.SupervisoryFunction.REJECT,
+ l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.NOT_SET, 0)
+ self.cert_send_b_frame(s_frame)
+
+ cert_acl_data_stream.assert_none(timeout=timedelta(seconds=0.5))
+
+ # Send RR with F set
+ s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
+ dcid, l2cap_packets.SupervisoryFunction.REJECT,
+ l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE, 0)
+ self.cert_send_b_frame(s_frame)
+
+ cert_acl_data_stream.assert_event_occurs(
+ lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0)
+ cert_acl_data_stream.assert_event_occurs(
+ lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1)
def test_handle_rej_and_i_frame_with_f_set(self):
"""
followed by an I-frame with the Final bit set that indicates the same I-frames should be retransmitted.
"""
cert_acl_handle = self._setup_link_from_cert()
- with EventStream(
- self.cert.hci_acl_manager.FetchAclData(
- empty_proto.Empty())) as cert_acl_data_stream:
- cert_acl_data_stream.register_callback(self._handle_control_packet)
- self.on_connection_response = self._on_connection_response_use_ertm
-
- psm = 0x33
- scid = 0x41
- self._open_channel(
- cert_acl_data_stream,
- 1,
- cert_acl_handle,
- scid,
- psm,
- mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
-
- # FIXME: Order shouldn't matter here
- cert_acl_data_stream.assert_event_occurs(
- self.is_correct_configuration_response)
- cert_acl_data_stream.assert_event_occurs(
- self.is_correct_configuration_request)
+ cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+ cert_acl_data_stream.register_callback(self._handle_control_packet)
+ self.on_connection_response = self._on_connection_response_use_ertm
- dcid = self.scid_to_dcid[scid]
-
- self.dut.l2cap.SendDynamicChannelPacket(
- l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
- self.dut.l2cap.SendDynamicChannelPacket(
- l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
- cert_acl_data_stream.assert_event_occurs(
- lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0,
- timeout=timedelta(0.5))
- cert_acl_data_stream.assert_event_occurs(
- lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1,
- timeout=timedelta(0.5))
- cert_acl_data_stream.assert_event_occurs(
- lambda packet: self.get_p_from_ertm_s_frame(scid, packet) == l2cap_packets.Poll.POLL,
- timeout=timedelta(2))
-
- # Send SREJ with F not set
- s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
- dcid, l2cap_packets.SupervisoryFunction.SELECT_REJECT,
- l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.NOT_SET, 0)
- self.cert_send_b_frame(s_frame)
-
- cert_acl_data_stream.assert_none(timeout=timedelta(seconds=0.5))
-
- i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
- dcid, 0, l2cap_packets.Final.POLL_RESPONSE, 0,
- l2cap_packets.SegmentationAndReassembly.UNSEGMENTED,
- SAMPLE_PACKET)
- self.cert_send_b_frame(i_frame)
-
- cert_acl_data_stream.assert_event_occurs(
- lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0
- )
- cert_acl_data_stream.assert_event_occurs(
- lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1
- )
+ psm = 0x33
+ scid = 0x41
+ self._open_channel(
+ cert_acl_data_stream,
+ 1,
+ cert_acl_handle,
+ scid,
+ psm,
+ mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
+
+ # FIXME: Order shouldn't matter here
+ cert_acl_data_stream.assert_event_occurs(
+ self.is_correct_configuration_response)
+ cert_acl_data_stream.assert_event_occurs(
+ self.is_correct_configuration_request)
+
+ dcid = self.scid_to_dcid[scid]
+
+ self.dut.l2cap.SendDynamicChannelPacket(
+ l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
+ self.dut.l2cap.SendDynamicChannelPacket(
+ l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
+ cert_acl_data_stream.assert_event_occurs(
+ lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0,
+ timeout=timedelta(0.5))
+ cert_acl_data_stream.assert_event_occurs(
+ lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1,
+ timeout=timedelta(0.5))
+ cert_acl_data_stream.assert_event_occurs(
+ lambda packet: self.get_p_from_ertm_s_frame(scid, packet) == l2cap_packets.Poll.POLL,
+ timeout=timedelta(2))
+
+ # Send SREJ with F not set
+ s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
+ dcid, l2cap_packets.SupervisoryFunction.SELECT_REJECT,
+ l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.NOT_SET, 0)
+ self.cert_send_b_frame(s_frame)
+
+ cert_acl_data_stream.assert_none(timeout=timedelta(seconds=0.5))
+
+ i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
+ dcid, 0, l2cap_packets.Final.POLL_RESPONSE, 0,
+ l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET)
+ self.cert_send_b_frame(i_frame)
+
+ cert_acl_data_stream.assert_event_occurs(
+ lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 0)
+ cert_acl_data_stream.assert_event_occurs(
+ lambda packet: self.get_tx_seq_from_ertm_i_frame(scid, packet) == 1)
def test_initiated_configuration_request_ertm(self):
"""
Enhanced Retransmission Mode.
"""
cert_acl_handle = self._setup_link_from_cert()
- with EventStream(
- self.cert.hci_acl_manager.FetchAclData(
- empty_proto.Empty())) as cert_acl_data_stream:
- cert_acl_data_stream.register_callback(self._handle_control_packet)
- self.on_connection_response = self._on_connection_response_use_ertm
-
- psm = 0x33
- scid = 0x41
- self._open_channel(
- cert_acl_data_stream,
- 1,
- cert_acl_handle,
- scid,
- psm,
- mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
-
- # TODO: Fix this test. It doesn't work so far with PDL struct
+ cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+ cert_acl_data_stream.register_callback(self._handle_control_packet)
+ self.on_connection_response = self._on_connection_response_use_ertm
- cert_acl_data_stream.assert_event_occurs(
- self.is_correct_configuration_request)
- asserts.skip("Struct not working")
+ psm = 0x33
+ scid = 0x41
+ self._open_channel(
+ cert_acl_data_stream,
+ 1,
+ cert_acl_handle,
+ scid,
+ psm,
+ mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
+
+ # TODO: Fix this test. It doesn't work so far with PDL struct
+
+ cert_acl_data_stream.assert_event_occurs(
+ self.is_correct_configuration_request)
+ asserts.skip("Struct not working")
def test_respond_configuration_request_ertm(self):
"""
that specifies Enhanced Retransmission Mode.
"""
cert_acl_handle = self._setup_link_from_cert()
- with EventStream(
- self.cert.hci_acl_manager.FetchAclData(
- empty_proto.Empty())) as cert_acl_data_stream:
- cert_acl_data_stream.register_callback(self._handle_control_packet)
- psm = 1
- scid = 0x0101
- self.retransmission_mode = l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM
- self.dut.l2cap.SetDynamicChannel(
- l2cap_facade_pb2.SetEnableDynamicChannelRequest(
- psm=psm, retransmission_mode=self.retransmission_mode))
-
- open_channel = l2cap_packets.ConnectionRequestBuilder(1, psm, scid)
- open_channel_l2cap = l2cap_packets.BasicFrameBuilder(
- 1, open_channel)
- self.cert_send_b_frame(open_channel_l2cap)
-
- # TODO: Verify that the type should be ERTM
- cert_acl_data_stream.assert_event_occurs(
- self.is_correct_configuration_response)
+ cert_acl_data_stream = self.cert_acl_manager.get_acl_stream()
+ cert_acl_data_stream.register_callback(self._handle_control_packet)
+ psm = 1
+ scid = 0x0101
+ self.retransmission_mode = l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM
+ self.dut.l2cap.SetDynamicChannel(
+ l2cap_facade_pb2.SetEnableDynamicChannelRequest(
+ psm=psm, retransmission_mode=self.retransmission_mode))
+
+ open_channel = l2cap_packets.ConnectionRequestBuilder(1, psm, scid)
+ open_channel_l2cap = l2cap_packets.BasicFrameBuilder(1, open_channel)
+ self.cert_send_b_frame(open_channel_l2cap)
+
+ # TODO: Verify that the type should be ERTM
+ cert_acl_data_stream.assert_event_occurs(
+ self.is_correct_configuration_response)