pub struct Connection {
rx: Receiver<Bytes>,
tx: Sender<Bytes>,
+ handle: u16,
+ requests: Sender<Request>,
}
struct ConnectionInternal {
reassembler: Reassembler,
bt: Bluetooth,
+ close_tx: oneshot::Sender<()>,
+}
+
+impl Connection {
+ /// Close this connection. Consumes self.
+ pub async fn close(self) {
+ let (tx, rx) = oneshot::channel();
+ self.requests
+ .send(Request::Unregister(UnregisterRequest { handle: self.handle, fut: tx }))
+ .await
+ .unwrap();
+ rx.await.unwrap()
+ }
}
/// Manages rx and tx for open ACL connections
#[derive(Clone, Stoppable)]
pub struct AclDispatch {
- requests: Sender<RegistrationRequest>,
+ requests: Sender<Request>,
}
impl AclDispatch {
/// Register the provided connection with the ACL dispatch
pub async fn register(&mut self, handle: u16, bt: Bluetooth) -> Connection {
let (tx, rx) = oneshot::channel();
- self.requests.send(RegistrationRequest { handle, bt, fut: tx }).await.unwrap();
+ self.requests
+ .send(Request::Register(RegisterRequest { handle, bt, fut: tx }))
+ .await
+ .unwrap();
rx.await.unwrap()
}
}
#[derive(Debug)]
-struct RegistrationRequest {
+enum Request {
+ Register(RegisterRequest),
+ Unregister(UnregisterRequest),
+}
+
+#[derive(Debug)]
+struct RegisterRequest {
handle: u16,
bt: Bluetooth,
fut: oneshot::Sender<Connection>,
}
+#[derive(Debug)]
+struct UnregisterRequest {
+ handle: u16,
+ fut: oneshot::Sender<()>,
+}
+
const QCOM_DEBUG_HANDLE: u16 = 0xedc;
#[provides]
mut events: EventRegistry,
rt: Arc<Runtime>,
) -> AclDispatch {
- let (req_tx, mut req_rx) = channel::<RegistrationRequest>(10);
+ let (req_tx, mut req_rx) = channel::<Request>(10);
+ let req_tx_clone = req_tx.clone();
rt.spawn(async move {
let mut connections: HashMap<u16, ConnectionInternal> = HashMap::new();
loop {
select! {
Some(req) = req_rx.recv() => {
- let (out_tx, out_rx) = channel(10);
- let (in_tx, in_rx) = channel(10);
-
- assert!(connections.insert(
- req.handle,
- ConnectionInternal {
- reassembler: Reassembler::new(out_tx),
- bt: req.bt,
- }).is_none());
-
- match req.bt {
- Classic => {
- classic_outbound.push(fragmenting_stream(
- in_rx, controller.acl_buffer_length.into(), req.handle, req.bt));
- },
- Le => {
- le_outbound.push(fragmenting_stream(
- in_rx, controller.le_buffer_length.into(), req.handle, req.bt));
+ match req {
+ Request::Register(req) => {
+ let (out_tx, out_rx) = channel(10);
+ let (in_tx, in_rx) = channel(10);
+ let (close_tx, close_rx) = oneshot::channel();
+
+ assert!(connections.insert(
+ req.handle,
+ ConnectionInternal {
+ reassembler: Reassembler::new(out_tx),
+ bt: req.bt,
+ close_tx,
+ }).is_none());
+
+ match req.bt {
+ Classic => {
+ classic_outbound.push(fragmenting_stream(
+ in_rx, controller.acl_buffer_length.into(), req.handle, req.bt, close_rx));
+ },
+ Le => {
+ le_outbound.push(fragmenting_stream(
+ in_rx, controller.le_buffer_length.into(), req.handle, req.bt, close_rx));
+ },
+ }
+
+ req.fut.send(Connection {
+ rx: out_rx,
+ tx: in_tx,
+ handle: req.handle,
+ requests: req_tx_clone.clone()}).unwrap();
},
+ Request::Unregister(req) => {
+ if let Some(connection) = connections.remove(&req.handle) {
+ connection.close_tx.send(()).unwrap();
+ }
+ }
}
-
- req.fut.send(Connection { rx: out_rx, tx: in_tx }).unwrap();
},
Some(packet) = consume(&acl.rx) => {
match connections.get_mut(&packet.get_handle()) {