from cert.closable import safeClose
from cert.py_l2cap import PyL2cap
from cert.py_acl_manager import PyAclManager
+from cert.matchers import L2capMatchers
from facade import common_pb2
from facade import rootservice_pb2 as facade_rootservice
from google.protobuf import empty_pb2 as empty_proto
return l2cap_packets.BasicFrameView(
bt_packets.PacketViewLittleEndian(list(l2cap_packet.payload)))
- def get_control_frame(self, l2cap_packet, code):
- l2cap_view = self.get_basic_frame(l2cap_packet)
- if l2cap_view.GetChannelId() != 1:
- return None
- l2cap_control_view = l2cap_packets.ControlView(l2cap_view.GetPayload())
- if l2cap_control_view.GetCode() != code:
- return None
- return l2cap_control_view
-
- def is_correct_connection_request(self, l2cap_packet):
- return self.get_control_frame(
- l2cap_packet,
- l2cap_packets.CommandCode.CONNECTION_REQUEST) is not None
-
- def is_correct_configuration_response(self, l2cap_packet):
- control_frame = self.get_control_frame(
- l2cap_packet, l2cap_packets.CommandCode.CONFIGURATION_RESPONSE)
- if control_frame is None:
- return False
- configuration_response_view = l2cap_packets.ConfigurationResponseView(
- control_frame)
- return configuration_response_view.GetResult(
- ) == l2cap_packets.ConfigurationResponseResult.SUCCESS
-
- def is_correct_configuration_request(self, l2cap_packet):
- return self.get_control_frame(
- l2cap_packet,
- l2cap_packets.CommandCode.CONFIGURATION_REQUEST) is not None
-
- def is_correct_disconnection_request(self, l2cap_packet):
- return self.get_control_frame(
- l2cap_packet,
- l2cap_packets.CommandCode.DISCONNECTION_REQUEST) is not None
-
def cert_send_b_frame(self, b_frame):
self.cert_acl.send(b_frame.Serialize())
mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
assertThat(self.cert_acl).emits(
- self.is_correct_configuration_response,
- self.is_correct_configuration_request).inAnyOrder()
+ L2capMatchers.ConfigurationResponse(),
+ L2capMatchers.ConfigurationRequest()).inAnyOrder()
dcid = self.scid_to_dcid[scid]
self.dut.l2cap.OpenChannel(
l2cap_facade_pb2.OpenChannelRequest(
remote=self.cert_address, psm=psm))
- assertThat(self.cert_acl).emits(self.is_correct_connection_request)
+ assertThat(self.cert_acl).emits(L2capMatchers.ConnectionRequest())
def test_accept_disconnect(self):
"""
psm,
mode=l2cap_facade_pb2.RetransmissionFlowControlMode.BASIC)
- assertThat(self.cert_acl).emits(self.is_correct_configuration_response)
+ assertThat(self.cert_acl).emits(L2capMatchers.ConfigurationResponse())
assertThat(self.cert_acl).emits(
- self.is_correct_configuration_request, at_least_times=2)
+ L2capMatchers.ConfigurationRequest(), at_least_times=2)
def test_respond_to_echo_request(self):
"""
mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
assertThat(self.cert_acl).emits(
- self.is_correct_configuration_response,
- self.is_correct_configuration_request).inAnyOrder()
+ L2capMatchers.ConfigurationResponse(),
+ L2capMatchers.ConfigurationRequest()).inAnyOrder()
self.dut.l2cap.SendDynamicChannelPacket(
l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
assertThat(self.cert_acl).emits(
- self.is_correct_configuration_response,
- self.is_correct_configuration_request).inAnyOrder()
+ L2capMatchers.ConfigurationResponse(),
+ L2capMatchers.ConfigurationRequest()).inAnyOrder()
self.dut.l2cap.SendDynamicChannelPacket(
l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
assertThat(self.cert_acl).emits(
- self.is_correct_configuration_response,
- self.is_correct_configuration_request).inAnyOrder()
+ L2capMatchers.ConfigurationResponse(),
+ L2capMatchers.ConfigurationRequest()).inAnyOrder()
self.dut.l2cap.SendDynamicChannelPacket(
l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
dcid = self.scid_to_dcid[scid]
assertThat(self.cert_acl).emits(
- self.is_correct_configuration_response,
- self.is_correct_configuration_request).inAnyOrder()
+ L2capMatchers.ConfigurationResponse(),
+ L2capMatchers.ConfigurationRequest()).inAnyOrder()
self.dut.l2cap.SendDynamicChannelPacket(
l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
dcid = self.scid_to_dcid[scid]
assertThat(self.cert_acl).emits(
- self.is_correct_configuration_response,
- self.is_correct_configuration_request).inAnyOrder()
+ L2capMatchers.ConfigurationResponse(),
+ L2capMatchers.ConfigurationRequest()).inAnyOrder()
for i in range(3):
i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
dcid = self.scid_to_dcid[scid]
assertThat(self.cert_acl).emits(
- self.is_correct_configuration_response,
- self.is_correct_configuration_request).inAnyOrder()
+ L2capMatchers.ConfigurationResponse(),
+ L2capMatchers.ConfigurationRequest()).inAnyOrder()
for i in range(3):
i_frame = l2cap_packets.EnhancedInformationFrameBuilder(
dcid = self.scid_to_dcid[scid]
assertThat(self.cert_acl).emits(
- self.is_correct_configuration_response,
- self.is_correct_configuration_request).inAnyOrder()
+ L2capMatchers.ConfigurationResponse(),
+ L2capMatchers.ConfigurationRequest()).inAnyOrder()
self.dut.l2cap.SendDynamicChannelPacket(
l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
dcid = self.scid_to_dcid[scid]
assertThat(self.cert_acl).emits(
- self.is_correct_configuration_response,
- self.is_correct_configuration_request).inAnyOrder()
+ L2capMatchers.ConfigurationResponse(),
+ L2capMatchers.ConfigurationRequest()).inAnyOrder()
self.dut.l2cap.SendDynamicChannelPacket(
l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
assertThat(self.cert_acl).emits(
- self.is_correct_configuration_response,
- self.is_correct_configuration_request).inAnyOrder()
+ L2capMatchers.ConfigurationResponse(),
+ L2capMatchers.ConfigurationRequest()).inAnyOrder()
self.dut.l2cap.SendDynamicChannelPacket(
l2cap_facade_pb2.DynamicChannelPacket(psm=psm, payload=b'abc'))
mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
assertThat(self.cert_acl).emits(
- self.is_correct_configuration_response,
- self.is_correct_configuration_request).inAnyOrder()
+ L2capMatchers.ConfigurationResponse(),
+ L2capMatchers.ConfigurationRequest()).inAnyOrder()
dcid = self.scid_to_dcid[scid]
mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
assertThat(self.cert_acl).emits(
- self.is_correct_configuration_response,
- self.is_correct_configuration_request).inAnyOrder()
+ L2capMatchers.ConfigurationResponse(),
+ L2capMatchers.ConfigurationRequest()).inAnyOrder()
dcid = self.scid_to_dcid[scid]
# Retransmission timer = 2, 20 * monitor timer = 360, so total timeout is 362
time.sleep(362)
- assertThat(self.cert_acl).emits(self.is_correct_disconnection_request)
+ assertThat(self.cert_acl).emits(L2capMatchers.DisconnectionRequest())
def test_i_frame_transmissions_exceed_max_transmit(self):
"""
mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
assertThat(self.cert_acl).emits(
- self.is_correct_configuration_response,
- self.is_correct_configuration_request).inAnyOrder()
+ L2capMatchers.ConfigurationResponse(),
+ L2capMatchers.ConfigurationRequest()).inAnyOrder()
dcid = self.scid_to_dcid[scid]
l2cap_packets.Poll.NOT_SET, l2cap_packets.Final.POLL_RESPONSE, 0)
self.cert_send_b_frame(s_frame)
- assertThat(self.cert_acl).emits(self.is_correct_disconnection_request)
+ assertThat(self.cert_acl).emits(L2capMatchers.DisconnectionRequest())
def test_respond_to_rej(self):
"""
mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
assertThat(self.cert_acl).emits(
- self.is_correct_configuration_response,
- self.is_correct_configuration_request).inAnyOrder()
+ L2capMatchers.ConfigurationResponse(),
+ L2capMatchers.ConfigurationRequest()).inAnyOrder()
dcid = self.scid_to_dcid[scid]
mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
assertThat(self.cert_acl).emits(
- self.is_correct_configuration_response,
- self.is_correct_configuration_request).inAnyOrder()
+ L2capMatchers.ConfigurationResponse(),
+ L2capMatchers.ConfigurationRequest()).inAnyOrder()
dcid = self.scid_to_dcid[scid]
mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
assertThat(self.cert_acl).emits(
- self.is_correct_configuration_response,
- self.is_correct_configuration_request).inAnyOrder()
+ L2capMatchers.ConfigurationResponse(),
+ L2capMatchers.ConfigurationRequest()).inAnyOrder()
dcid = self.scid_to_dcid[scid]
mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
assertThat(self.cert_acl).emits(
- self.is_correct_configuration_response,
- self.is_correct_configuration_request).inAnyOrder()
+ L2capMatchers.ConfigurationResponse(),
+ L2capMatchers.ConfigurationRequest()).inAnyOrder()
dcid = self.scid_to_dcid[scid]
mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
assertThat(self.cert_acl).emits(
- self.is_correct_configuration_response,
- self.is_correct_configuration_request).inAnyOrder()
+ L2capMatchers.ConfigurationResponse(),
+ L2capMatchers.ConfigurationRequest()).inAnyOrder()
dcid = self.scid_to_dcid[scid]
mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
assertThat(self.cert_acl).emits(
- self.is_correct_configuration_response,
- self.is_correct_configuration_request).inAnyOrder()
+ L2capMatchers.ConfigurationResponse(),
+ L2capMatchers.ConfigurationRequest()).inAnyOrder()
dcid = self.scid_to_dcid[scid]
mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
assertThat(self.cert_acl).emits(
- self.is_correct_configuration_response,
- self.is_correct_configuration_request).inAnyOrder()
+ L2capMatchers.ConfigurationResponse(),
+ L2capMatchers.ConfigurationRequest()).inAnyOrder()
dcid = self.scid_to_dcid[scid]
mode=l2cap_facade_pb2.RetransmissionFlowControlMode.ERTM)
assertThat(self.cert_acl).emits(
- self.is_correct_configuration_response,
- self.is_correct_configuration_request).inAnyOrder()
+ L2capMatchers.ConfigurationResponse(),
+ L2capMatchers.ConfigurationRequest()).inAnyOrder()
dcid = self.scid_to_dcid[scid]
# TODO: Fix this test. It doesn't work so far with PDL struct
- assertThat(self.cert_acl).emits(self.is_correct_configuration_request)
+ assertThat(self.cert_acl).emits(L2capMatchers.ConfigurationRequest())
asserts.skip("Struct not working")
def test_respond_configuration_request_ertm(self):
self.cert_send_b_frame(open_channel_l2cap)
# TODO: Verify that the type should be ERTM
- assertThat(self.cert_acl).emits(self.is_correct_configuration_response)
+ assertThat(self.cert_acl).emits(L2capMatchers.ConfigurationResponse())