)
cert_packet_stream.unsubscribe()
+ def test_reject_unknown_command(self):
+ """
+ L2CAP/COS/CED/BI-01-C
+ """
+ cert_connection_stream = self.cert_device.l2cap.connection_complete_stream
+ cert_connection_stream.subscribe()
+ self.device_under_test.l2cap.RegisterChannel(l2cap_facade_pb2.RegisterChannelRequest(channel=2))
+ self.device_under_test.l2cap.Connect(self.cert_address)
+ cert_connection_stream.assert_event_occurs(
+ lambda device: device.remote == self.dut_address
+ )
+ cert_connection_stream.unsubscribe()
+ cert_packet_stream = self.cert_device.l2cap.packet_stream
+ cert_packet_stream.subscribe()
+ invalid_command_packet = b"\xff\x01\x00\x00"
+ self.cert_device.l2cap.SendL2capPacket(l2cap_facade_pb2.L2capPacket(channel=1, payload=invalid_command_packet))
+ command_reject_packet = b"\x01\x01\x02\x00\x00\x00"
+ cert_packet_stream.assert_event_occurs(
+ lambda packet: command_reject_packet in packet.payload
+ )
+ cert_packet_stream.unsubscribe()
+
+ time.sleep(ASYNC_OP_TIME_SECONDS) # TODO(b/144186649): Remove this line
}
default:
LOG_WARN("Unhandled event 0x%x", static_cast<int>(code));
+ auto builder = CommandRejectNotUnderstoodBuilder::Create(control_packet_view.GetIdentifier());
+ enqueue_buffer_->Enqueue(std::move(builder), handler_);
return;
}
}