open_channels = self.cert_device.l2cap.FetchOpenedChannels(l2cap_cert_pb2.FetchOpenedChannelsRequest())
assert len(open_channels.dcid) == 2
+ def test_accept_disconnect(self):
+ """
+ L2CAP/COS/CED/BV-07-C
+ """
+ self.device_under_test.l2cap.OpenChannel(l2cap_facade_pb2.OpenChannelRequest(remote=self.cert_address, psm=0x01))
+ cert_connection_stream = self.cert_device.l2cap.connection_complete_stream
+ cert_connection_stream.subscribe()
+ self.device_under_test.l2cap.Connect(self.cert_address)
+ cert_connection_stream.assert_event_occurs(
+ lambda device: device.remote == self.dut_address
+ )
+ cert_connection_stream.unsubscribe()
+ time.sleep(ASYNC_OP_TIME_SECONDS)
+ cert_packet_stream = self.cert_device.l2cap.packet_stream
+ cert_packet_stream.subscribe()
+ open_channels = self.cert_device.l2cap.FetchOpenedChannels(l2cap_cert_pb2.FetchOpenedChannelsRequest())
+ cid = open_channels.dcid[0]
+ disconnection_request_packet = b"\x06\x01\x04\x00\x40\x00\x40\x01"
+ disconnection_response_packet = b"\x07\x01\x04\x00\x40\x00\x40\x01"
+ #TODO(b/143374372): Instead of hardcoding this, use packet builder
+ self.cert_device.l2cap.SendL2capPacket(l2cap_facade_pb2.L2capPacket(channel=1, payload=disconnection_request_packet))
+ cert_packet_stream.assert_event_occurs(
+ lambda packet: disconnection_response_packet in packet.payload
+ )
+ cert_packet_stream.unsubscribe()
+ time.sleep(ASYNC_OP_TIME_SECONDS) # TODO(b/144186649): Remove this line
+
def test_basic_operation_request_connection(self):
"""
L2CAP/COS/CED/BV-01-C [Request Connection]
lambda packet: echo_response_packet in packet.payload
)
cert_packet_stream.unsubscribe()
+ time.sleep(ASYNC_OP_TIME_SECONDS) # TODO(b/144186649): Remove this line
def test_reject_unknown_command(self):
"""
LOG_WARN("Disconnect request for an unknown channel");
return;
}
- auto builder = DisconnectionResponseBuilder::Create(signal_id.Value(), remote_cid, cid);
+ auto builder = DisconnectionResponseBuilder::Create(signal_id.Value(), cid, remote_cid);
enqueue_buffer_->Enqueue(std::move(builder), handler_);
channel->OnClosed(hci::ErrorCode::SUCCESS);
link_->FreeDynamicChannel(cid);