return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.DISCONNECTION_REQUEST)
@staticmethod
+ def DisconnectionResponse(scid, dcid):
+ return lambda packet: L2capMatchers._is_matching_disconnection_response(packet, scid, dcid)
+
+ @staticmethod
+ def CommandReject():
+ return lambda packet: L2capMatchers._is_control_frame_with_code(packet, CommandCode.COMMAND_REJECT)
+
+ @staticmethod
def _basic_frame(packet):
if packet is None:
return None
return response.GetSourceCid() == scid and response.GetResult(
) == ConnectionResponseResult.SUCCESS and response.GetDestinationCid(
) != 0
+
+ @staticmethod
+ def _is_matching_disconnection_response(packet, scid, dcid):
+ frame = L2capMatchers._control_frame_with_code(
+ packet, CommandCode.DISCONNECTION_RESPONSE)
+ if frame is None:
+ return False
+ response = l2cap_packets.DisconnectionResponseView(frame)
+ return response.GetSourceCid() == scid and response.GetDestinationCid(
+ ) == dcid
close_channel_l2cap = l2cap_packets.BasicFrameBuilder(1, close_channel)
self.cert_send_b_frame(close_channel_l2cap)
- def verify_disconnection_response(packet):
- packet_bytes = packet.payload
- l2cap_view = l2cap_packets.BasicFrameView(
- bt_packets.PacketViewLittleEndian(list(packet_bytes)))
- l2cap_control_view = l2cap_packets.ControlView(
- l2cap_view.GetPayload())
- if l2cap_control_view.GetCode(
- ) != l2cap_packets.CommandCode.DISCONNECTION_RESPONSE:
- return False
- disconnection_response_view = l2cap_packets.DisconnectionResponseView(
- l2cap_control_view)
- return disconnection_response_view.GetSourceCid(
- ) == scid and disconnection_response_view.GetDestinationCid() == dcid
-
- assertThat(self.cert_acl).emits(verify_disconnection_response)
+ assertThat(self.cert_acl).emits(
+ L2capMatchers.DisconnectionResponse(scid, dcid))
def test_disconnect_on_timeout(self):
"""
self._open_channel(1, scid, psm)
- def is_configuration_response(l2cap_packet):
- packet_bytes = l2cap_packet.payload
- l2cap_view = l2cap_packets.BasicFrameView(
- bt_packets.PacketViewLittleEndian(list(packet_bytes)))
- if l2cap_view.GetChannelId() != 1:
- return False
- l2cap_control_view = l2cap_packets.ControlView(
- l2cap_view.GetPayload())
- return l2cap_control_view.GetCode(
- ) == l2cap_packets.CommandCode.CONFIGURATION_RESPONSE
-
- cert_acl_data_stream.assert_none_matching(is_configuration_response)
+ cert_acl_data_stream.assert_none_matching(
+ L2capMatchers.ConfigurationResponse())
def test_retry_config_after_rejection(self):
"""
invalid_command_packet = b"\x04\x00\x01\x00\xff\x01\x00\x00"
self.cert_acl.send(invalid_command_packet)
- def is_command_reject(l2cap_packet):
- packet_bytes = l2cap_packet.payload
- l2cap_view = l2cap_packets.BasicFrameView(
- bt_packets.PacketViewLittleEndian(list(packet_bytes)))
- if l2cap_view.GetChannelId() != 1:
- return False
- l2cap_control_view = l2cap_packets.ControlView(
- l2cap_view.GetPayload())
- return l2cap_control_view.GetCode(
- ) == l2cap_packets.CommandCode.COMMAND_REJECT
-
- assertThat(self.cert_acl).emits(is_command_reject)
+ assertThat(self.cert_acl).emits(L2capMatchers.CommandReject())
def test_query_for_1_2_features(self):
"""