OSDN Git Service

rusty-gd: move acl core from lib.rs to core.rs
authorZach Johnson <zachoverflow@google.com>
Wed, 30 Dec 2020 22:48:28 +0000 (14:48 -0800)
committerZach Johnson <zachoverflow@google.com>
Wed, 20 Jan 2021 00:55:52 +0000 (16:55 -0800)
this allows internals to stay internal

Bug: 171749953
Tag: #gd-refactor
Test: gd/cert/run --rhost
Change-Id: Ifc9af8a5403804a5cf15b39abf979c1dc570c7bc

gd/rust/acl/src/core.rs [new file with mode: 0644]
gd/rust/acl/src/lib.rs

diff --git a/gd/rust/acl/src/core.rs b/gd/rust/acl/src/core.rs
new file mode 100644 (file)
index 0000000..a29d817
--- /dev/null
@@ -0,0 +1,207 @@
+//! ACL core dispatch shared between LE and classic
+
+use crate::fragment::{fragmenting_stream, Reassembler};
+use bt_common::Bluetooth::{self, Classic, Le};
+use bt_hal::AclHal;
+use bt_hci::{ControllerExports, EventRegistry};
+use bt_packets::hci::EventChild::{DisconnectionComplete, NumberOfCompletedPackets};
+use bt_packets::hci::{AclPacket, ErrorCode, EventCode};
+use bytes::Bytes;
+use futures::stream::{SelectAll, StreamExt};
+use gddi::{module, provides, Stoppable};
+use log::info;
+use std::collections::HashMap;
+use std::sync::Arc;
+use tokio::runtime::Runtime;
+use tokio::select;
+use tokio::sync::mpsc::{channel, Receiver, Sender};
+use tokio::sync::{oneshot, Mutex};
+
+module! {
+    core_module,
+    providers {
+        AclDispatch => provide_acl_dispatch,
+    },
+}
+
+/// A basic ACL connection
+#[derive(Debug)]
+pub struct Connection {
+    rx: Receiver<Bytes>,
+    tx: Sender<Bytes>,
+    handle: u16,
+    requests: Sender<Request>,
+    evt_rx: Receiver<Event>,
+}
+
+struct ConnectionInternal {
+    reassembler: Reassembler,
+    bt: Bluetooth,
+    close_tx: oneshot::Sender<()>,
+    evt_tx: Sender<Event>,
+}
+
+impl Connection {
+    /// Close this connection. Consumes self.
+    #[allow(dead_code)]
+    pub async fn close(self) {
+        let (tx, rx) = oneshot::channel();
+        self.requests.send(Request::Unregister { handle: self.handle, fut: tx }).await.unwrap();
+        rx.await.unwrap()
+    }
+}
+
+/// Events that can be generated by the underlying layer
+#[derive(Debug)]
+pub enum Event {
+    /// Underlying connection was closed. Reports reason why.
+    Closed(ErrorCode),
+}
+
+/// Manages rx and tx for open ACL connections
+#[derive(Clone, Stoppable)]
+pub struct AclDispatch {
+    requests: Sender<Request>,
+}
+
+impl AclDispatch {
+    /// Register the provided connection with the ACL dispatch
+    #[allow(dead_code)]
+    pub async fn register(&mut self, handle: u16, bt: Bluetooth) -> Connection {
+        let (tx, rx) = oneshot::channel();
+        self.requests.send(Request::Register { handle, bt, fut: tx }).await.unwrap();
+        rx.await.unwrap()
+    }
+}
+
+#[derive(Debug)]
+enum Request {
+    Register { handle: u16, bt: Bluetooth, fut: oneshot::Sender<Connection> },
+    Unregister { handle: u16, fut: oneshot::Sender<()> },
+}
+
+const QCOM_DEBUG_HANDLE: u16 = 0xedc;
+
+#[provides]
+async fn provide_acl_dispatch(
+    acl: AclHal,
+    controller: Arc<ControllerExports>,
+    mut events: EventRegistry,
+    rt: Arc<Runtime>,
+) -> AclDispatch {
+    let (req_tx, mut req_rx) = channel::<Request>(10);
+    let req_tx_clone = req_tx.clone();
+
+    rt.spawn(async move {
+        let mut connections: HashMap<u16, ConnectionInternal> = HashMap::new();
+        let mut classic_outbound = SelectAll::new();
+        let mut classic_credits = controller.acl_buffers;
+        let mut le_outbound = SelectAll::new();
+        let mut le_credits: u16 = controller.le_buffers.into();
+
+        let (evt_tx, mut evt_rx) = channel(3);
+        events.register(EventCode::NumberOfCompletedPackets, evt_tx.clone()).await;
+        events.register(EventCode::DisconnectionComplete, evt_tx).await;
+
+        loop {
+            select! {
+                Some(req) = req_rx.recv() => {
+                    match req {
+                        Request::Register { handle, bt, fut } => {
+                            let (out_tx, out_rx) = channel(10);
+                            let (in_tx, in_rx) = channel(10);
+                            let (evt_tx, evt_rx) = channel(3);
+                            let (close_tx, close_rx) = oneshot::channel();
+
+                            assert!(connections.insert(
+                                handle,
+                                ConnectionInternal {
+                                    reassembler: Reassembler::new(out_tx),
+                                    bt,
+                                    close_tx,
+                                    evt_tx,
+                                }).is_none());
+
+                            match bt {
+                                Classic => {
+                                    classic_outbound.push(fragmenting_stream(
+                                        in_rx, controller.acl_buffer_length.into(), handle, bt, close_rx));
+                                },
+                                Le => {
+                                    le_outbound.push(fragmenting_stream(
+                                        in_rx, controller.le_buffer_length.into(), handle, bt, close_rx));
+                                },
+                            }
+
+                            fut.send(Connection {
+                                rx: out_rx,
+                                tx: in_tx,
+                                handle,
+                                requests: req_tx_clone.clone(),
+                                evt_rx
+                            }).unwrap();
+                        },
+                        Request::Unregister { handle, fut } => {
+                            if let Some(connection) = connections.remove(&handle) {
+                                connection.close_tx.send(()).unwrap();
+                            }
+                            fut.send(()).unwrap();
+                        }
+                    }
+                },
+                Some(packet) = consume(&acl.rx) => {
+                    match connections.get_mut(&packet.get_handle()) {
+                        Some(connection) => connection.reassembler.on_packet(packet).await,
+                        None if packet.get_handle() == QCOM_DEBUG_HANDLE => {},
+                        None => info!("no acl for {}", packet.get_handle()),
+                    }
+                },
+                Some(packet) = classic_outbound.next(), if classic_credits > 0 => {
+                    acl.tx.send(packet).await.unwrap();
+                    classic_credits -= 1;
+                },
+                Some(packet) = le_outbound.next(), if le_credits > 0 => {
+                    acl.tx.send(packet).await.unwrap();
+                    le_credits -= 1;
+                },
+                Some(evt) = evt_rx.recv() => {
+                    match evt.specialize() {
+                        NumberOfCompletedPackets(evt) => {
+                            for info in evt.get_completed_packets() {
+                                match connections.get(&info.connection_handle) {
+                                    Some(connection) => {
+                                        let credits = info.host_num_of_completed_packets;
+                                        match connection.bt {
+                                            Classic => {
+                                                classic_credits += credits;
+                                                assert!(classic_credits <= controller.acl_buffers);
+                                            },
+                                            Le => {
+                                                le_credits += credits;
+                                                assert!(le_credits <= controller.le_buffers.into());
+                                            },
+                                        }
+                                    },
+                                    None => info!("dropping credits for unknown connection {}", info.connection_handle),
+                                }
+                            }
+                        },
+                        DisconnectionComplete(evt) => {
+                            if let Some(connection) = connections.remove(&evt.get_connection_handle()) {
+                                connection.close_tx.send(()).unwrap();
+                                connection.evt_tx.send(Event::Closed(evt.get_reason())).await.unwrap();
+                            }
+                        },
+                        _ => unimplemented!(),
+                    }
+                },
+            }
+        }
+    });
+
+    AclDispatch { requests: req_tx }
+}
+
+async fn consume(rx: &Arc<Mutex<Receiver<AclPacket>>>) -> Option<AclPacket> {
+    rx.lock().await.recv().await
+}
index a6396a1..f413451 100644 (file)
 
 /// Exposes classic ACL functionality
 pub mod classic;
+mod core;
 mod fragment;
 
-use bt_common::Bluetooth::{self, Classic, Le};
-use bt_hal::AclHal;
-use bt_hci::{ControllerExports, EventRegistry};
-use bt_packets::hci::EventChild::{DisconnectionComplete, NumberOfCompletedPackets};
-use bt_packets::hci::{AclPacket, ErrorCode, EventCode};
-use bytes::Bytes;
-use fragment::{fragmenting_stream, Reassembler};
-use futures::stream::{SelectAll, StreamExt};
-use gddi::{module, provides, Stoppable};
-use log::info;
-use std::collections::HashMap;
-use std::sync::Arc;
-use tokio::runtime::Runtime;
-use tokio::select;
-use tokio::sync::mpsc::{channel, Receiver, Sender};
-use tokio::sync::{oneshot, Mutex};
+use gddi::module;
 
 module! {
     acl_module,
     submodules {
         classic::classic_acl_module,
+        core::core_module,
     },
-    providers {
-        AclDispatch => provide_acl_dispatch,
-    },
-}
-
-/// A basic ACL connection
-#[derive(Debug)]
-pub struct Connection {
-    rx: Receiver<Bytes>,
-    tx: Sender<Bytes>,
-    handle: u16,
-    requests: Sender<Request>,
-    evt_rx: Receiver<Event>,
-}
-
-struct ConnectionInternal {
-    reassembler: Reassembler,
-    bt: Bluetooth,
-    close_tx: oneshot::Sender<()>,
-    evt_tx: Sender<Event>,
-}
-
-impl Connection {
-    /// Close this connection. Consumes self.
-    pub async fn close(self) {
-        let (tx, rx) = oneshot::channel();
-        self.requests.send(Request::Unregister { handle: self.handle, fut: tx }).await.unwrap();
-        rx.await.unwrap()
-    }
-}
-
-/// Events that can be generated by the underlying layer
-#[derive(Debug)]
-pub enum Event {
-    /// Underlying connection was closed. Reports reason why.
-    Closed(ErrorCode),
-}
-
-/// Manages rx and tx for open ACL connections
-#[derive(Clone, Stoppable)]
-pub struct AclDispatch {
-    requests: Sender<Request>,
-}
-
-impl AclDispatch {
-    /// Register the provided connection with the ACL dispatch
-    pub async fn register(&mut self, handle: u16, bt: Bluetooth) -> Connection {
-        let (tx, rx) = oneshot::channel();
-        self.requests.send(Request::Register { handle, bt, fut: tx }).await.unwrap();
-        rx.await.unwrap()
-    }
-}
-
-#[derive(Debug)]
-enum Request {
-    Register { handle: u16, bt: Bluetooth, fut: oneshot::Sender<Connection> },
-    Unregister { handle: u16, fut: oneshot::Sender<()> },
-}
-
-const QCOM_DEBUG_HANDLE: u16 = 0xedc;
-
-#[provides]
-async fn provide_acl_dispatch(
-    acl: AclHal,
-    controller: Arc<ControllerExports>,
-    mut events: EventRegistry,
-    rt: Arc<Runtime>,
-) -> AclDispatch {
-    let (req_tx, mut req_rx) = channel::<Request>(10);
-    let req_tx_clone = req_tx.clone();
-
-    rt.spawn(async move {
-        let mut connections: HashMap<u16, ConnectionInternal> = HashMap::new();
-        let mut classic_outbound = SelectAll::new();
-        let mut classic_credits = controller.acl_buffers;
-        let mut le_outbound = SelectAll::new();
-        let mut le_credits: u16 = controller.le_buffers.into();
-
-        let (evt_tx, mut evt_rx) = channel(3);
-        events.register(EventCode::NumberOfCompletedPackets, evt_tx.clone()).await;
-        events.register(EventCode::DisconnectionComplete, evt_tx).await;
-
-        loop {
-            select! {
-                Some(req) = req_rx.recv() => {
-                    match req {
-                        Request::Register { handle, bt, fut } => {
-                            let (out_tx, out_rx) = channel(10);
-                            let (in_tx, in_rx) = channel(10);
-                            let (evt_tx, evt_rx) = channel(3);
-                            let (close_tx, close_rx) = oneshot::channel();
-
-                            assert!(connections.insert(
-                                handle,
-                                ConnectionInternal {
-                                    reassembler: Reassembler::new(out_tx),
-                                    bt,
-                                    close_tx,
-                                    evt_tx,
-                                }).is_none());
-
-                            match bt {
-                                Classic => {
-                                    classic_outbound.push(fragmenting_stream(
-                                        in_rx, controller.acl_buffer_length.into(), handle, bt, close_rx));
-                                },
-                                Le => {
-                                    le_outbound.push(fragmenting_stream(
-                                        in_rx, controller.le_buffer_length.into(), handle, bt, close_rx));
-                                },
-                            }
-
-                            fut.send(Connection {
-                                rx: out_rx,
-                                tx: in_tx,
-                                handle,
-                                requests: req_tx_clone.clone(),
-                                evt_rx
-                            }).unwrap();
-                        },
-                        Request::Unregister { handle, fut } => {
-                            if let Some(connection) = connections.remove(&handle) {
-                                connection.close_tx.send(()).unwrap();
-                            }
-                            fut.send(()).unwrap();
-                        }
-                    }
-                },
-                Some(packet) = consume(&acl.rx) => {
-                    match connections.get_mut(&packet.get_handle()) {
-                        Some(connection) => connection.reassembler.on_packet(packet).await,
-                        None if packet.get_handle() == QCOM_DEBUG_HANDLE => {},
-                        None => info!("no acl for {}", packet.get_handle()),
-                    }
-                },
-                Some(packet) = classic_outbound.next(), if classic_credits > 0 => {
-                    acl.tx.send(packet).await.unwrap();
-                    classic_credits -= 1;
-                },
-                Some(packet) = le_outbound.next(), if le_credits > 0 => {
-                    acl.tx.send(packet).await.unwrap();
-                    le_credits -= 1;
-                },
-                Some(evt) = evt_rx.recv() => {
-                    match evt.specialize() {
-                        NumberOfCompletedPackets(evt) => {
-                            for info in evt.get_completed_packets() {
-                                match connections.get(&info.connection_handle) {
-                                    Some(connection) => {
-                                        let credits = info.host_num_of_completed_packets;
-                                        match connection.bt {
-                                            Classic => {
-                                                classic_credits += credits;
-                                                assert!(classic_credits <= controller.acl_buffers);
-                                            },
-                                            Le => {
-                                                le_credits += credits;
-                                                assert!(le_credits <= controller.le_buffers.into());
-                                            },
-                                        }
-                                    },
-                                    None => info!("dropping credits for unknown connection {}", info.connection_handle),
-                                }
-                            }
-                        },
-                        DisconnectionComplete(evt) => {
-                            if let Some(connection) = connections.remove(&evt.get_connection_handle()) {
-                                connection.close_tx.send(()).unwrap();
-                                connection.evt_tx.send(Event::Closed(evt.get_reason())).await.unwrap();
-                            }
-                        },
-                        _ => unimplemented!(),
-                    }
-                },
-            }
-        }
-    });
-
-    AclDispatch { requests: req_tx }
-}
-
-async fn consume(rx: &Arc<Mutex<Receiver<AclPacket>>>) -> Option<AclPacket> {
-    rx.lock().await.recv().await
 }