self._open_channel(signal_id=1, scid=0x41, psm=0x33)
self.dut_channel.send(b'abc')
- assertThat(
- self.cert_channel).emits(lambda packet: b'abc' in packet.payload)
+ assertThat(self.cert_channel).emits(L2capMatchers.Data(b'abc'))
def test_fixed_channel(self):
self._setup_link_from_cert()
self.dut.l2cap.SendL2capPacket(
l2cap_facade_pb2.L2capPacket(channel=2, payload=b"123"))
- assertThat(
- self.cert_channel).emits(lambda packet: b'123' in packet.payload)
+ assertThat(self.cert_channel).emits(L2capMatchers.PartialData(b'123'))
def test_receive_packet_from_unknown_channel(self):
self._setup_link_from_cert()
0x99, 0, l2cap_packets.Final.NOT_SET, 1,
l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET)
self.cert_l2cap.send_acl(i_frame)
- assertThat(self.cert_l2cap.get_acl_stream()).emitsNone(
- L2capMatchers.SupervisoryFrame(scid, req_seq=4),
+ assertThat(self.cert_channel).emitsNone(
+ L2capMatchers.SupervisoryFrame(req_seq=4),
timeout=timedelta(seconds=1))
def test_open_two_channels(self):
psm,
mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
- assertThat(self.cert_acl).emits(
+ assertThat(self.cert_l2cap.get_control_channel()).emits(
L2capMatchers.ConfigurationResponse(),
L2capMatchers.ConfigurationRequest()).inAnyOrder()
self.dut_channel.send(b'abc' * 34)
assertThat(self.cert_channel).emits(
- lambda packet: b'abc' * 34 in packet.payload)
+ L2capMatchers.PartialData(b'abc' * 34))
i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
dcid, 0, l2cap_packets.Final.NOT_SET, 1,
self.dut.l2cap.OpenChannel(
l2cap_facade_pb2.OpenChannelRequest(
remote=self.cert_address, psm=psm))
- assertThat(self.cert_acl).emits(L2capMatchers.ConnectionRequest())
+ assertThat(self.cert_l2cap.get_control_channel()).emits(
+ L2capMatchers.ConnectionRequest())
def test_accept_disconnect(self):
"""
close_channel_l2cap = l2cap_packets.BasicFrameBuilder(1, close_channel)
self.cert_send_b_frame(close_channel_l2cap)
- assertThat(self.cert_acl).emits(
+ assertThat(self.cert_l2cap.get_control_channel()).emits(
L2capMatchers.DisconnectionResponse(scid, dcid))
def test_disconnect_on_timeout(self):
self._open_channel(1, scid, psm)
- assertThat(self.cert_acl).emitsNone(
+ assertThat(self.cert_l2cap.get_control_channel()).emitsNone(
L2capMatchers.ConfigurationResponse())
def test_retry_config_after_rejection(self):
psm,
mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC)
- assertThat(self.cert_acl).emits(L2capMatchers.ConfigurationResponse())
- assertThat(self.cert_acl).emits(
+ assertThat(self.cert_l2cap.get_control_channel()).emits(
+ L2capMatchers.ConfigurationResponse())
+ assertThat(self.cert_l2cap.get_control_channel()).emits(
L2capMatchers.ConfigurationRequest(), at_least_times=2)
def test_config_unknown_options_with_hint(self):
self._open_channel(signal_id=1, scid=0x41, psm=0x33)
- assertThat(self.cert_acl).emits(L2capMatchers.ConfigurationResponse())
+ assertThat(self.cert_l2cap.get_control_channel()).emits(
+ L2capMatchers.ConfigurationResponse())
def test_respond_to_echo_request(self):
"""
Verify that the IUT responds to an echo request.
"""
self._setup_link_from_cert()
+ asserts.skip("is echo without a channel supported?")
echo_request = l2cap_packets.EchoRequestBuilder(
100, l2cap_packets.DisconnectionRequestBuilder(1, 2, 3))
echo_request_l2cap = l2cap_packets.BasicFrameBuilder(1, echo_request)
self.cert_send_b_frame(echo_request_l2cap)
- assertThat(self.cert_acl).emits(
- lambda packet: b"\x06\x01\x04\x00\x02\x00\x03\x00" in packet.payload
- )
+ assertThat(self.cert_channel).emits(
+ L2capMatchers.PartialData(b"\x06\x01\x04\x00\x02\x00\x03\x00"))
def test_reject_unknown_command(self):
"""
"""
self._setup_link_from_cert()
+ asserts.skip("need to use packet builders")
invalid_command_packet = b"\x04\x00\x01\x00\xff\x01\x00\x00"
- self.cert_acl.send(invalid_command_packet)
+ self.cert_l2cap.get_control_channel().send(invalid_command_packet)
- assertThat(self.cert_acl).emits(L2capMatchers.CommandReject())
+ assertThat(self.cert_channel).emits(L2capMatchers.CommandReject())
def test_query_for_1_2_features(self):
"""
self.cert_send_b_frame(information_request_l2cap)
def is_correct_information_response(l2cap_packet):
- packet_bytes = l2cap_packet.payload
- l2cap_view = l2cap_packets.BasicFrameView(
- bt_packets.PacketViewLittleEndian(list(packet_bytes)))
- if l2cap_view.GetChannelId() != 1:
- return False
l2cap_control_view = l2cap_packets.ControlView(
- l2cap_view.GetPayload())
+ l2cap_packet.GetPayload())
if l2cap_control_view.GetCode(
) != l2cap_packets.CommandCode.INFORMATION_RESPONSE:
return False
return information_response_view.GetInfoType(
) == l2cap_packets.InformationRequestInfoType.EXTENDED_FEATURES_SUPPORTED
- assertThat(self.cert_acl).emits(is_correct_information_response)
+ assertThat(self.cert_l2cap.get_control_channel()).emits(
+ is_correct_information_response)
def test_extended_feature_info_response_ertm(self):
"""
self.cert_send_b_frame(information_request_l2cap)
def is_correct_information_response(l2cap_packet):
- packet_bytes = l2cap_packet.payload
- l2cap_view = l2cap_packets.BasicFrameView(
- bt_packets.PacketViewLittleEndian(list(packet_bytes)))
- if l2cap_view.GetChannelId() != 1:
- return False
l2cap_control_view = l2cap_packets.ControlView(
- l2cap_view.GetPayload())
+ l2cap_packet.GetPayload())
if l2cap_control_view.GetCode(
) != l2cap_packets.CommandCode.INFORMATION_RESPONSE:
return False
information_response_view)
return extended_features_view.GetEnhancedRetransmissionMode()
- assertThat(self.cert_acl).emits(is_correct_information_response)
+ assertThat(self.cert_l2cap.get_control_channel()).emits(
+ is_correct_information_response)
def test_extended_feature_info_response_streaming(self):
"""
self.cert_send_b_frame(information_request_l2cap)
def is_correct_information_response(l2cap_packet):
- packet_bytes = l2cap_packet.payload
- l2cap_view = l2cap_packets.BasicFrameView(
- bt_packets.PacketViewLittleEndian(list(packet_bytes)))
- if l2cap_view.GetChannelId() != 1:
- return False
l2cap_control_view = l2cap_packets.ControlView(
- l2cap_view.GetPayload())
+ l2cap_packet.GetPayload())
if l2cap_control_view.GetCode(
) != l2cap_packets.CommandCode.INFORMATION_RESPONSE:
return False
information_response_view)
return extended_features_view.GetFcsOption()
- assertThat(self.cert_acl).emits(is_correct_information_response)
+ assertThat(self.cert_l2cap.get_control_channel()).emits(
+ is_correct_information_response)
def test_extended_feature_info_response_fixed_channels(self):
"""
psm,
mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
- assertThat(self.cert_acl).emits(
+ assertThat(self.cert_l2cap.get_control_channel()).emits(
L2capMatchers.ConfigurationResponse(),
L2capMatchers.ConfigurationRequest()).inAnyOrder()
- self.dut.l2cap.SendDynamicChannelPacket(
- l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
- assertThat(self.cert_acl).emits(lambda packet: b"abc" in packet.payload)
+ self.dut_channel.send(b'abc')
+ assertThat(self.cert_channel).emits(L2capMatchers.PartialData(b'abc'))
def test_explicitly_request_use_FCS(self):
"""
psm,
mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
- assertThat(self.cert_acl).emits(
+ assertThat(self.cert_l2cap.get_control_channel()).emits(
L2capMatchers.ConfigurationResponse(),
L2capMatchers.ConfigurationRequest()).inAnyOrder()
- self.dut.l2cap.SendDynamicChannelPacket(
- l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
- assertThat(
- self.cert_acl).emits(lambda packet: b"abc\x4f\xa3" in packet.payload
- ) # TODO: Use packet parser
+ self.dut_channel.send(b'abc')
+ assertThat(self.cert_channel).emits(
+ L2capMatchers.PartialData(
+ b"abc\x4f\xa3")) # TODO: Use packet parser
def test_implicitly_request_use_FCS(self):
"""
psm,
mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
- assertThat(self.cert_acl).emits(
+ assertThat(self.cert_l2cap.get_control_channel()).emits(
L2capMatchers.ConfigurationResponse(),
L2capMatchers.ConfigurationRequest()).inAnyOrder()
- self.dut.l2cap.SendDynamicChannelPacket(
- l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
- assertThat(
- self.cert_acl).emits(lambda packet: b"abc\x4f\xa3" in packet.payload
- ) # TODO: Use packet parser
+ self.dut_channel.send(b'abc')
+ assertThat(self.cert_channel).emits(
+ L2capMatchers.PartialData(
+ b"abc\x4f\xa3")) # TODO: Use packet parser
def test_transmit_i_frames(self):
"""
dcid = self.cert_l2cap.get_dcid(scid)
- assertThat(self.cert_acl).emits(
+ assertThat(self.cert_l2cap.get_control_channel()).emits(
L2capMatchers.ConfigurationResponse(),
L2capMatchers.ConfigurationRequest()).inAnyOrder()
- self.dut.l2cap.SendDynamicChannelPacket(
- l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
- assertThat(self.cert_acl).emits(lambda packet: b"abc" in packet.payload)
+ self.dut_channel.send(b'abc')
+ assertThat(self.cert_channel).emits(L2capMatchers.PartialData(b"abc"))
# Assemble a sample packet. TODO: Use RawBuilder
SAMPLE_PACKET = l2cap_packets.CommandRejectNotUnderstoodBuilder(1)
l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET)
self.cert_send_b_frame(i_frame)
- self.dut.l2cap.SendDynamicChannelPacket(
- l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
- assertThat(self.cert_acl).emits(lambda packet: b"abc" in packet.payload)
+ self.dut_channel.send(b'abc')
+ assertThat(self.cert_channel).emits(L2capMatchers.PartialData(b"abc"))
i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
dcid, 1, l2cap_packets.Final.NOT_SET, 2,
l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET)
self.cert_send_b_frame(i_frame)
- self.dut.l2cap.SendDynamicChannelPacket(
- l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
- assertThat(self.cert_acl).emits(lambda packet: b"abc" in packet.payload)
+ self.dut_channel.send(b'abc')
+ assertThat(self.cert_channel).emits(L2capMatchers.PartialData(b"abc"))
i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
dcid, 2, l2cap_packets.Final.NOT_SET, 3,
dcid = self.cert_l2cap.get_dcid(scid)
- assertThat(self.cert_acl).emits(
+ assertThat(self.cert_l2cap.get_control_channel()).emits(
L2capMatchers.ConfigurationResponse(),
L2capMatchers.ConfigurationRequest()).inAnyOrder()
l2cap_packets.SegmentationAndReassembly.UNSEGMENTED,
SAMPLE_PACKET)
self.cert_send_b_frame(i_frame)
- assertThat(self.cert_acl).emits(
- L2capMatchers.SupervisoryFrame(scid, req_seq=i + 1))
+ assertThat(self.cert_channel).emits(
+ L2capMatchers.SupervisoryFrame(req_seq=i + 1))
i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
dcid, 3, l2cap_packets.Final.NOT_SET, 0,
l2cap_packets.SegmentationAndReassembly.START, SAMPLE_PACKET)
self.cert_send_b_frame(i_frame)
- assertThat(self.cert_acl).emits(
- L2capMatchers.SupervisoryFrame(scid, req_seq=4))
+ assertThat(self.cert_channel).emits(
+ L2capMatchers.SupervisoryFrame(req_seq=4))
i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
dcid, 4, l2cap_packets.Final.NOT_SET, 0,
l2cap_packets.SegmentationAndReassembly.CONTINUATION, SAMPLE_PACKET)
self.cert_send_b_frame(i_frame)
- assertThat(self.cert_acl).emits(
- L2capMatchers.SupervisoryFrame(scid, req_seq=5))
+ assertThat(self.cert_channel).emits(
+ L2capMatchers.SupervisoryFrame(req_seq=5))
i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
dcid, 5, l2cap_packets.Final.NOT_SET, 0,
l2cap_packets.SegmentationAndReassembly.END, SAMPLE_PACKET)
self.cert_send_b_frame(i_frame)
- assertThat(self.cert_acl).emits(
- L2capMatchers.SupervisoryFrame(scid, req_seq=6))
+ assertThat(self.cert_channel).emits(
+ L2capMatchers.SupervisoryFrame(req_seq=6))
def test_acknowledging_received_i_frames(self):
"""
dcid = self.cert_l2cap.get_dcid(scid)
- assertThat(self.cert_acl).emits(
+ assertThat(self.cert_l2cap.get_control_channel()).emits(
L2capMatchers.ConfigurationResponse(),
L2capMatchers.ConfigurationRequest()).inAnyOrder()
l2cap_packets.SegmentationAndReassembly.UNSEGMENTED,
SAMPLE_PACKET)
self.cert_send_b_frame(i_frame)
- assertThat(self.cert_acl).emits(
- L2capMatchers.SupervisoryFrame(scid, req_seq=i + 1))
+ assertThat(self.cert_channel).emits(
+ L2capMatchers.SupervisoryFrame(req_seq=i + 1))
- assertThat(self.cert_acl).emitsNone(
- L2capMatchers.SupervisoryFrame(scid, req_seq=4),
+ assertThat(self.cert_channel).emitsNone(
+ L2capMatchers.SupervisoryFrame(req_seq=4),
timeout=timedelta(seconds=1))
def test_resume_transmitting_when_received_rr(self):
dcid = self.cert_l2cap.get_dcid(scid)
- assertThat(self.cert_acl).emits(
+ assertThat(self.cert_l2cap.get_control_channel()).emits(
L2capMatchers.ConfigurationResponse(),
L2capMatchers.ConfigurationRequest()).inAnyOrder()
- self.dut.l2cap.SendDynamicChannelPacket(
- l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
- self.dut.l2cap.SendDynamicChannelPacket(
- l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'def'))
+ self.dut_channel.send(b'abc')
+ self.dut_channel.send(b'def')
- # TODO: Besides checking TxSeq, we also want to check payload, once we can get it from packet view
- assertThat(self.cert_acl).emits(
- L2capMatchers.InformationFrame(scid, tx_seq=0))
- assertThat(self.cert_acl).emitsNone(
- L2capMatchers.InformationFrame(scid, tx_seq=1))
+ # TODO: Besides checking TxSeq, we also want to chpacketpayload, once we can get it from packet view
+ assertThat(self.cert_channel).emits(
+ L2capMatchers.InformationFrame(tx_seq=0))
+ assertThat(self.cert_channel).emitsNone(
+ L2capMatchers.InformationFrame(tx_seq=1))
s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY,
l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE, 1)
self.cert_send_b_frame(s_frame)
- assertThat(self.cert_acl).emits(
- L2capMatchers.InformationFrame(scid, tx_seq=1))
+ assertThat(self.cert_channel).emits(
+ L2capMatchers.InformationFrame(tx_seq=1))
def test_resume_transmitting_when_acknowledge_previously_sent(self):
"""
dcid = self.cert_l2cap.get_dcid(scid)
- assertThat(self.cert_acl).emits(
+ assertThat(self.cert_l2cap.get_control_channel()).emits(
L2capMatchers.ConfigurationResponse(),
L2capMatchers.ConfigurationRequest()).inAnyOrder()
- self.dut.l2cap.SendDynamicChannelPacket(
- l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
- self.dut.l2cap.SendDynamicChannelPacket(
- l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'def'))
+ self.dut_channel.send(b'abc')
+ self.dut_channel.send(b'def')
- assertThat(self.cert_acl).emits(
- L2capMatchers.InformationFrame(scid, tx_seq=0))
+ assertThat(self.cert_channel).emits(
+ L2capMatchers.InformationFrame(tx_seq=0))
# TODO: If 1 second is greater than their retransmit timeout, use a smaller timeout
- assertThat(self.cert_acl).emitsNone(
- L2capMatchers.InformationFrame(scid, tx_seq=1),
+ assertThat(self.cert_channel).emitsNone(
+ L2capMatchers.InformationFrame(tx_seq=1),
timeout=timedelta(seconds=1))
i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET)
self.cert_send_b_frame(i_frame)
- assertThat(self.cert_acl).emits(
- L2capMatchers.InformationFrame(scid, tx_seq=1))
+ assertThat(self.cert_channel).emits(
+ L2capMatchers.InformationFrame(tx_seq=1))
i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
dcid, 1, l2cap_packets.Final.NOT_SET, 2,
psm,
mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
- assertThat(self.cert_acl).emits(
+ assertThat(self.cert_l2cap.get_control_channel()).emits(
L2capMatchers.ConfigurationResponse(),
L2capMatchers.ConfigurationRequest()).inAnyOrder()
- self.dut.l2cap.SendDynamicChannelPacket(
- l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
+ self.dut_channel.send(b'abc')
# TODO: Always use their retransmission timeout value
time.sleep(2)
- assertThat(self.cert_acl).emits(
- L2capMatchers.SupervisoryFrame(scid, p=l2cap_packets.Poll.POLL))
+ assertThat(self.cert_channel).emits(
+ L2capMatchers.SupervisoryFrame(p=l2cap_packets.Poll.POLL))
def test_transmit_s_frame_rr_with_final_bit_set(self):
"""
psm,
mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
- assertThat(self.cert_acl).emits(
+ assertThat(self.cert_l2cap.get_control_channel()).emits(
L2capMatchers.ConfigurationResponse(),
L2capMatchers.ConfigurationRequest()).inAnyOrder()
l2cap_packets.Poll.POLL, l2cap_packets.Final.NOT_SET, 0)
self.cert_send_b_frame(s_frame)
- assertThat(self.cert_acl).emits(
- L2capMatchers.SupervisoryFrame(
- scid, f=l2cap_packets.Final.POLL_RESPONSE))
+ assertThat(self.cert_channel).emits(
+ L2capMatchers.SupervisoryFrame(f=l2cap_packets.Final.POLL_RESPONSE))
def test_s_frame_transmissions_exceed_max_transmit(self):
"""
psm,
mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
- assertThat(self.cert_acl).emits(
+ assertThat(self.cert_l2cap.get_control_channel()).emits(
L2capMatchers.ConfigurationResponse(),
L2capMatchers.ConfigurationRequest()).inAnyOrder()
dcid = self.cert_l2cap.get_dcid(scid)
- self.dut.l2cap.SendDynamicChannelPacket(
- l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
+ self.dut_channel.send(b'abc')
# Retransmission timer = 2, 20 * monitor timer = 360, so total timeout is 362
time.sleep(362)
- assertThat(self.cert_acl).emits(L2capMatchers.DisconnectionRequest())
+ assertThat(self.cert_l2cap.get_control_channel()).emits(
+ L2capMatchers.DisconnectionRequest())
def test_i_frame_transmissions_exceed_max_transmit(self):
"""
psm,
mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
- assertThat(self.cert_acl).emits(
+ assertThat(self.cert_l2cap.get_control_channel()).emits(
L2capMatchers.ConfigurationResponse(),
L2capMatchers.ConfigurationRequest()).inAnyOrder()
dcid = self.cert_l2cap.get_dcid(scid)
- self.dut.l2cap.SendDynamicChannelPacket(
- l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
+ self.dut_channel.send(b'abc')
- assertThat(self.cert_acl).emits(
- L2capMatchers.InformationFrame(scid, tx_seq=0))
+ assertThat(self.cert_channel).emits(
+ L2capMatchers.InformationFrame(tx_seq=0))
s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY,
l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE, 0)
self.cert_send_b_frame(s_frame)
- assertThat(self.cert_acl).emits(L2capMatchers.DisconnectionRequest())
+ assertThat(self.cert_l2cap.get_control_channel()).emits(
+ L2capMatchers.DisconnectionRequest())
def test_respond_to_rej(self):
"""
psm,
mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
- assertThat(self.cert_acl).emits(
+ assertThat(self.cert_l2cap.get_control_channel()).emits(
L2capMatchers.ConfigurationResponse(),
L2capMatchers.ConfigurationRequest()).inAnyOrder()
dcid = self.cert_l2cap.get_dcid(scid)
- self.dut.l2cap.SendDynamicChannelPacket(
- l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
- self.dut.l2cap.SendDynamicChannelPacket(
- l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
+ self.dut_channel.send(b'abc')
+ self.dut_channel.send(b'abc')
for i in range(2):
- assertThat(self.cert_acl).emits(
- L2capMatchers.InformationFrame(scid, tx_seq=i),
+ assertThat(self.cert_channel).emits(
+ L2capMatchers.InformationFrame(tx_seq=i),
timeout=timedelta(seconds=0.5))
s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
self.cert_send_b_frame(s_frame)
for i in range(2):
- assertThat(self.cert_acl).emits(
- L2capMatchers.InformationFrame(scid, tx_seq=i),
+ assertThat(self.cert_channel).emits(
+ L2capMatchers.InformationFrame(tx_seq=i),
timeout=timedelta(seconds=0.5))
def test_receive_s_frame_rr_final_bit_set(self):
psm,
mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
- assertThat(self.cert_acl).emits(
+ assertThat(self.cert_l2cap.get_control_channel()).emits(
L2capMatchers.ConfigurationResponse(),
L2capMatchers.ConfigurationRequest()).inAnyOrder()
dcid = self.cert_l2cap.get_dcid(scid)
- self.dut.l2cap.SendDynamicChannelPacket(
- l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
+ self.dut_channel.send(b'abc')
# TODO: Always use their retransmission timeout value
time.sleep(2)
- assertThat(self.cert_acl).emits(
- L2capMatchers.SupervisoryFrame(scid, p=l2cap_packets.Poll.POLL))
+ assertThat(self.cert_channel).emits(
+ L2capMatchers.SupervisoryFrame(p=l2cap_packets.Poll.POLL))
s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY,
l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE, 0)
self.cert_send_b_frame(s_frame)
- assertThat(self.cert_acl).emits(
- L2capMatchers.InformationFrame(scid, tx_seq=0))
+ assertThat(self.cert_channel).emits(
+ L2capMatchers.InformationFrame(tx_seq=0))
def test_receive_i_frame_final_bit_set(self):
"""
psm,
mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
- assertThat(self.cert_acl).emits(
+ assertThat(self.cert_l2cap.get_control_channel()).emits(
L2capMatchers.ConfigurationResponse(),
L2capMatchers.ConfigurationRequest()).inAnyOrder()
dcid = self.cert_l2cap.get_dcid(scid)
- self.dut.l2cap.SendDynamicChannelPacket(
- l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
+ self.dut_channel.send(b'abc')
# TODO: Always use their retransmission timeout value
time.sleep(2)
- assertThat(self.cert_acl).emits(
- L2capMatchers.SupervisoryFrame(scid, p=l2cap_packets.Poll.POLL))
+ assertThat(self.cert_channel).emits(
+ L2capMatchers.SupervisoryFrame(p=l2cap_packets.Poll.POLL))
i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
dcid, 0, l2cap_packets.Final.POLL_RESPONSE, 0,
l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET)
self.cert_send_b_frame(i_frame)
- assertThat(self.cert_acl).emits(
- L2capMatchers.InformationFrame(scid, tx_seq=0))
+ assertThat(self.cert_channel).emits(
+ L2capMatchers.InformationFrame(tx_seq=0))
def test_recieve_rnr(self):
"""
psm,
mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
- assertThat(self.cert_acl).emits(
+ assertThat(self.cert_l2cap.get_control_channel()).emits(
L2capMatchers.ConfigurationResponse(),
L2capMatchers.ConfigurationRequest()).inAnyOrder()
dcid = self.cert_l2cap.get_dcid(scid)
- self.dut.l2cap.SendDynamicChannelPacket(
- l2cap_facade_pb2.DynamicChannelPacket(psm=0x33, payload=b'abc'))
+ self.dut_channel.send(b'abc')
# TODO: Always use their retransmission timeout value
time.sleep(2)
- assertThat(self.cert_acl).emits(
- L2capMatchers.SupervisoryFrame(scid, p=l2cap_packets.Poll.POLL))
+ assertThat(self.cert_channel).emits(
+ L2capMatchers.SupervisoryFrame(p=l2cap_packets.Poll.POLL))
s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
dcid, l2cap_packets.SupervisoryFunction.RECEIVER_NOT_READY,
l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE, 0)
self.cert_send_b_frame(s_frame)
- assertThat(self.cert_acl).emitsNone(
- L2capMatchers.InformationFrame(scid, tx_seq=0))
+ assertThat(self.cert_channel).emitsNone(
+ L2capMatchers.InformationFrame(tx_seq=0))
def test_sent_rej_lost(self):
"""
psm,
mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
- assertThat(self.cert_acl).emits(
+ assertThat(self.cert_l2cap.get_control_channel()).emits(
L2capMatchers.ConfigurationResponse(),
L2capMatchers.ConfigurationRequest()).inAnyOrder()
dcid, 0, l2cap_packets.Final.NOT_SET, 0,
l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET)
self.cert_send_b_frame(i_frame)
- assertThat(self.cert_acl).emits(
- L2capMatchers.SupervisoryFrame(scid, req_seq=1))
+ assertThat(self.cert_channel).emits(
+ L2capMatchers.SupervisoryFrame(req_seq=1))
i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
dcid, ertm_tx_window_size - 1, l2cap_packets.Final.NOT_SET, 0,
l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET)
self.cert_send_b_frame(i_frame)
- assertThat(self.cert_acl).emits(
+ assertThat(self.cert_channel).emits(
L2capMatchers.SupervisoryFrame(
- scid, s=l2cap_packets.SupervisoryFunction.REJECT))
+ s=l2cap_packets.SupervisoryFunction.REJECT))
s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
dcid, l2cap_packets.SupervisoryFunction.RECEIVER_READY,
l2cap_packets.Poll.POLL, l2cap_packets.Final.NOT_SET, 0)
self.cert_send_b_frame(s_frame)
- assertThat(self.cert_acl).emits(
+ assertThat(self.cert_channel).emits(
L2capMatchers.SupervisoryFrame(
- scid, req_seq=1, f=l2cap_packets.Final.POLL_RESPONSE))
+ req_seq=1, f=l2cap_packets.Final.POLL_RESPONSE))
for i in range(1, ertm_tx_window_size):
i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
dcid, i, l2cap_packets.Final.NOT_SET, 0,
l2cap_packets.SegmentationAndReassembly.UNSEGMENTED,
SAMPLE_PACKET)
self.cert_send_b_frame(i_frame)
- assertThat(self.cert_acl).emits(
- L2capMatchers.SupervisoryFrame(scid, req_seq=i + 1))
+ assertThat(self.cert_channel).emits(
+ L2capMatchers.SupervisoryFrame(req_seq=i + 1))
def test_handle_duplicate_srej(self):
"""
psm,
mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
- assertThat(self.cert_acl).emits(
+ assertThat(self.cert_l2cap.get_control_channel()).emits(
L2capMatchers.ConfigurationResponse(),
L2capMatchers.ConfigurationRequest()).inAnyOrder()
dcid = self.cert_l2cap.get_dcid(scid)
- self.dut.l2cap.SendDynamicChannelPacket(
- l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
- self.dut.l2cap.SendDynamicChannelPacket(
- l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
- assertThat(self.cert_acl).emits(
- L2capMatchers.InformationFrame(scid, tx_seq=0),
- timeout=timedelta(0.5))
- assertThat(self.cert_acl).emits(
- L2capMatchers.InformationFrame(scid, tx_seq=1),
- timeout=timedelta(0.5))
- assertThat(self.cert_acl).emits(
- L2capMatchers.SupervisoryFrame(scid, p=l2cap_packets.Poll.POLL))
+ self.dut_channel.send(b'abc')
+ self.dut_channel.send(b'abc')
+ assertThat(self.cert_channel).emits(
+ L2capMatchers.InformationFrame(tx_seq=0), timeout=timedelta(0.5))
+ assertThat(self.cert_channel).emits(
+ L2capMatchers.InformationFrame(tx_seq=1), timeout=timedelta(0.5))
+ assertThat(self.cert_channel).emits(
+ L2capMatchers.SupervisoryFrame(p=l2cap_packets.Poll.POLL))
# Send SREJ with F not set
s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.NOT_SET, 0)
self.cert_send_b_frame(s_frame)
- assertThat(self.cert_acl).emitsNone(timeout=timedelta(seconds=0.5))
+ assertThat(self.cert_channel).emitsNone(timeout=timedelta(seconds=0.5))
# Send SREJ with F set
s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
dcid, l2cap_packets.SupervisoryFunction.SELECT_REJECT,
l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE, 0)
self.cert_send_b_frame(s_frame)
- assertThat(self.cert_acl).emits(
- L2capMatchers.InformationFrame(scid, tx_seq=0))
+ assertThat(self.cert_channel).emits(
+ L2capMatchers.InformationFrame(tx_seq=0))
def test_handle_receipt_rej_and_rr_with_f_set(self):
"""
psm,
mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
- assertThat(self.cert_acl).emits(
+ assertThat(self.cert_l2cap.get_control_channel()).emits(
L2capMatchers.ConfigurationResponse(),
L2capMatchers.ConfigurationRequest()).inAnyOrder()
dcid = self.cert_l2cap.get_dcid(scid)
- self.dut.l2cap.SendDynamicChannelPacket(
- l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
- self.dut.l2cap.SendDynamicChannelPacket(
- l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
- assertThat(self.cert_acl).emits(
- L2capMatchers.InformationFrame(scid, tx_seq=0),
- timeout=timedelta(0.5))
- assertThat(self.cert_acl).emits(
- L2capMatchers.InformationFrame(scid, tx_seq=1),
- timeout=timedelta(0.5))
- assertThat(self.cert_acl).emits(
- L2capMatchers.SupervisoryFrame(scid, p=l2cap_packets.Poll.POLL),
+ self.dut_channel.send(b'abc')
+ self.dut_channel.send(b'abc')
+ assertThat(self.cert_channel).emits(
+ L2capMatchers.InformationFrame(tx_seq=0), timeout=timedelta(0.5))
+ assertThat(self.cert_channel).emits(
+ L2capMatchers.InformationFrame(tx_seq=1), timeout=timedelta(0.5))
+ assertThat(self.cert_channel).emits(
+ L2capMatchers.SupervisoryFrame(p=l2cap_packets.Poll.POLL),
timeout=timedelta(2))
# Send REJ with F not set
l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.NOT_SET, 0)
self.cert_send_b_frame(s_frame)
- assertThat(self.cert_acl).emitsNone(timeout=timedelta(seconds=0.5))
+ assertThat(self.cert_channel).emitsNone(timeout=timedelta(seconds=0.5))
# Send RR with F set
s_frame = l2cap_packets.EnhancedSupervisoryFrameBuilder(
l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE, 0)
self.cert_send_b_frame(s_frame)
- assertThat(self.cert_acl).emits(
- L2capMatchers.InformationFrame(scid, tx_seq=0))
- assertThat(self.cert_acl).emits(
- L2capMatchers.InformationFrame(scid, tx_seq=1))
+ assertThat(self.cert_channel).emits(
+ L2capMatchers.InformationFrame(tx_seq=0))
+ assertThat(self.cert_channel).emits(
+ L2capMatchers.InformationFrame(tx_seq=1))
def test_handle_rej_and_i_frame_with_f_set(self):
"""
psm,
mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
- assertThat(self.cert_acl).emits(
+ assertThat(self.cert_l2cap.get_control_channel()).emits(
L2capMatchers.ConfigurationResponse(),
L2capMatchers.ConfigurationRequest()).inAnyOrder()
dcid = self.cert_l2cap.get_dcid(scid)
- self.dut.l2cap.SendDynamicChannelPacket(
- l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
- self.dut.l2cap.SendDynamicChannelPacket(
- l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
- assertThat(self.cert_acl).emits(
- L2capMatchers.InformationFrame(scid, tx_seq=0),
- timeout=timedelta(0.5))
- assertThat(self.cert_acl).emits(
- L2capMatchers.InformationFrame(scid, tx_seq=1),
- timeout=timedelta(0.5))
- assertThat(self.cert_acl).emits(
- L2capMatchers.SupervisoryFrame(scid, p=l2cap_packets.Poll.POLL),
+ self.dut_channel.send(b'abc')
+ self.dut_channel.send(b'abc')
+ assertThat(self.cert_channel).emits(
+ L2capMatchers.InformationFrame(tx_seq=0), timeout=timedelta(0.5))
+ assertThat(self.cert_channel).emits(
+ L2capMatchers.InformationFrame(tx_seq=1), timeout=timedelta(0.5))
+ assertThat(self.cert_channel).emits(
+ L2capMatchers.SupervisoryFrame(p=l2cap_packets.Poll.POLL),
timeout=timedelta(2))
# Send SREJ with F not set
l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.NOT_SET, 0)
self.cert_send_b_frame(s_frame)
- assertThat(self.cert_acl).emitsNone(timeout=timedelta(seconds=0.5))
+ assertThat(self.cert_channel).emitsNone(timeout=timedelta(seconds=0.5))
i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
dcid, 0, l2cap_packets.Final.POLL_RESPONSE, 0,
l2cap_packets.SegmentationAndReassembly.UNSEGMENTED, SAMPLE_PACKET)
self.cert_send_b_frame(i_frame)
- assertThat(self.cert_acl).emits(
- L2capMatchers.InformationFrame(scid, tx_seq=0))
- assertThat(self.cert_acl).emits(
- L2capMatchers.InformationFrame(scid, tx_seq=1))
+ assertThat(self.cert_channel).emits(
+ L2capMatchers.InformationFrame(tx_seq=0))
+ assertThat(self.cert_channel).emits(
+ L2capMatchers.InformationFrame(tx_seq=1))
def test_initiated_configuration_request_ertm(self):
"""
# TODO: Fix this test. It doesn't work so far with PDL struct
- assertThat(self.cert_acl).emits(L2capMatchers.ConfigurationRequest())
+ assertThat(self.cert_l2cap.get_control_channel()).emits(
+ L2capMatchers.ConfigurationRequest())
asserts.skip("Struct not working")
def test_respond_configuration_request_ertm(self):
self.cert_send_b_frame(open_channel_l2cap)
# TODO: Verify that the type should be ERTM
- assertThat(self.cert_acl).emits(L2capMatchers.ConfigurationResponse())
+ assertThat(self.cert_l2cap.get_control_channel()).emits(
+ L2capMatchers.ConfigurationResponse())