OSDN Git Service

rusty-gd: reorg hci internals
authorZach Johnson <zachoverflow@google.com>
Thu, 3 Dec 2020 00:30:29 +0000 (16:30 -0800)
committerZach Johnson <zachoverflow@google.com>
Thu, 10 Dec 2020 17:37:56 +0000 (09:37 -0800)
with the intent of adding dynamically registered event receivers:

1. combine incoming and outgoing into a single selected dispatch (to
eliminate need to lock on pending commands)

2. add dynamically registered event receivers

Bug: 171749953
Tag: #gd-refactor
Test: gd/cert/run --rhost
Change-Id: I038364bc8d99b7ead4688f7570b79826fa7a1651

gd/rust/hci/src/facade.rs
gd/rust/hci/src/lib.rs

index 4667da4..29d6e07 100644 (file)
@@ -5,6 +5,7 @@ use bt_common::GrpcFacade;
 use bt_hci_proto::empty::Empty;
 use bt_hci_proto::facade::*;
 use bt_hci_proto::facade_grpc::{create_hci_layer_facade, HciLayerFacade};
+use bt_packet::HciEvent;
 use futures::prelude::*;
 use futures::sink::SinkExt;
 use gddi::{module, provides};
@@ -12,6 +13,8 @@ use grpcio::*;
 use log::error;
 use std::sync::Arc;
 use tokio::runtime::Runtime;
+use tokio::sync::mpsc::{channel, Receiver, Sender};
+use tokio::sync::Mutex;
 
 module! {
     facade_module,
@@ -22,7 +25,13 @@ module! {
 
 #[provides]
 async fn provide_facade(hci_exports: HciExports, rt: Arc<Runtime>) -> HciLayerFacadeService {
-    HciLayerFacadeService { hci_exports, rt }
+    let (from_hci_evt_tx, to_grpc_evt_rx) = channel::<HciEvent>(10);
+    HciLayerFacadeService {
+        hci_exports,
+        rt,
+        from_hci_evt_tx,
+        to_grpc_evt_rx: Arc::new(Mutex::new(to_grpc_evt_rx)),
+    }
 }
 
 /// HCI layer facade service
@@ -30,6 +39,8 @@ async fn provide_facade(hci_exports: HciExports, rt: Arc<Runtime>) -> HciLayerFa
 pub struct HciLayerFacadeService {
     hci_exports: HciExports,
     rt: Arc<Runtime>,
+    from_hci_evt_tx: Sender<HciEvent>,
+    to_grpc_evt_rx: Arc<Mutex<Receiver<HciEvent>>>,
 }
 
 impl GrpcFacade for HciLayerFacadeService {
@@ -90,7 +101,7 @@ impl HciLayerFacade for HciLayerFacadeService {
     fn request_event(&mut self, ctx: RpcContext<'_>, code: EventRequest, sink: UnarySink<Empty>) {
         self.rt.block_on(
             self.hci_exports
-                .register_event_handler(code.get_code() as u8),
+                .register_event_handler(code.get_code() as u8, self.from_hci_evt_tx.clone()),
         );
 
         let f = sink
@@ -125,7 +136,7 @@ impl HciLayerFacade for HciLayerFacadeService {
         _req: Empty,
         mut resp: ServerStreamingSink<Event>,
     ) {
-        let evt_rx = self.hci_exports.evt_rx.clone();
+        let evt_rx = self.to_grpc_evt_rx.clone();
 
         self.rt.spawn(async move {
             while let Some(event) = evt_rx.lock().await.recv().await {
index dc4ae56..e1b475f 100644 (file)
@@ -11,11 +11,12 @@ use bt_packet::{HciCommand, HciEvent};
 use error::Result;
 use facade::facade_module;
 use gddi::{module, provides};
-use std::collections::HashSet;
+use std::collections::HashMap;
 use std::sync::Arc;
 use tokio::runtime::Runtime;
-use tokio::sync::mpsc::{channel, Receiver, Sender};
-use tokio::sync::{mpsc, oneshot, Mutex};
+use tokio::select;
+use tokio::sync::mpsc::{channel, Receiver, Sender, UnboundedReceiver, UnboundedSender};
+use tokio::sync::{oneshot, Mutex};
 
 module! {
     hci_module,
@@ -29,25 +30,19 @@ module! {
 
 #[provides]
 async fn provide_hci(hal_exports: HalExports, rt: Arc<Runtime>) -> HciExports {
-    let (evt_tx, evt_rx) = channel::<HciEvent>(10);
-    let (cmd_tx, cmd_rx) = mpsc::channel::<HciCommandEntry>(10);
-    let hashset = Arc::new(Mutex::new(HashSet::new()));
-    let pending_cmds = Arc::new(Mutex::new(Vec::new()));
-
-    rt.spawn(on_event(
-        pending_cmds.clone(),
-        evt_tx.clone(),
+    let (cmd_tx, cmd_rx) = channel::<Command>(10);
+    let evt_handlers = Arc::new(Mutex::new(HashMap::new()));
+
+    rt.spawn(dispatch(
+        evt_handlers.clone(),
         hal_exports.evt_rx,
+        hal_exports.cmd_tx,
+        cmd_rx,
     ));
-    rt.spawn(on_command(pending_cmds, hal_exports.cmd_tx, cmd_rx));
-
-    let evt_rx = Arc::new(Mutex::new(evt_rx));
 
     HciExports {
         cmd_tx,
-        registered_events: Arc::clone(&hashset),
-        evt_tx,
-        evt_rx,
+        evt_handlers,
     }
 }
 
@@ -55,15 +50,13 @@ async fn provide_hci(hal_exports: HalExports, rt: Arc<Runtime>) -> HciExports {
 /// Uses a oneshot channel to wait until the event corresponding
 /// to the command is received
 #[derive(Debug)]
-pub struct HciCommandEntry {
-    /// The HCI command to send
+struct Command {
     cmd: HciCommand,
-    /// Transmit half of the oneshot
     fut: oneshot::Sender<HciCommand>,
 }
 
 #[derive(Debug)]
-struct HciCommandEntryInner {
+struct PendingCommand {
     opcode: u16,
     fut: oneshot::Sender<HciCommand>,
 }
@@ -72,86 +65,68 @@ struct HciCommandEntryInner {
 #[derive(Clone)]
 pub struct HciExports {
     /// Transmit end of a channel used to send HCI commands
-    cmd_tx: Sender<HciCommandEntry>,
-    registered_events: Arc<Mutex<HashSet<u8>>>,
-    evt_tx: Sender<HciEvent>,
-    /// Receive channel half used to receive HCI events from the HAL
-    pub evt_rx: Arc<Mutex<Receiver<HciEvent>>>,
+    cmd_tx: Sender<Command>,
+    evt_handlers: Arc<Mutex<HashMap<u8, Sender<HciEvent>>>>,
 }
 
 impl HciExports {
-    /// Send the HCI command
     async fn send(&mut self, cmd: HciCommand) -> Result<HciEvent> {
         let (tx, rx) = oneshot::channel::<HciEvent>();
-        self.cmd_tx.send(HciCommandEntry { cmd, fut: tx }).await?;
+        self.cmd_tx.send(Command { cmd, fut: tx }).await?;
         let event = rx.await?;
         Ok(event)
     }
 
-    /// Send the HCI event
-    async fn dispatch_event(&mut self, event: HciEvent) -> Result<()> {
-        let evt_code = bt_packet::get_evt_code(&event);
-        if let Some(evt_code) = evt_code {
-            let registered_events = self.registered_events.lock().await;
-            if registered_events.contains(&evt_code) {
-                self.evt_tx.send(event).await?;
-            }
-        }
-        Ok(())
-    }
-
     /// Enqueue an HCI command expecting a command complete
     /// response from the controller
-    pub async fn enqueue_command_with_complete(&mut self, cmd: HciCommand) {
-        let event = self.send(cmd).await.unwrap();
-        self.dispatch_event(event).await.unwrap();
+    pub async fn enqueue_command_with_complete(&mut self, cmd: HciCommand) -> HciEvent {
+        self.send(cmd).await.unwrap()
     }
 
     /// Enqueue an HCI command expecting a status response
     /// from the controller
-    pub async fn enqueue_command_with_status(&mut self, cmd: HciCommand) {
-        let event = self.send(cmd).await.unwrap();
-        self.dispatch_event(event).await.unwrap();
+    pub async fn enqueue_command_with_status(&mut self, cmd: HciCommand) -> HciEvent {
+        self.send(cmd).await.unwrap()
     }
 
     /// Indicate interest in specific HCI events
-    // TODO(qasimj): Add Sender<HciEvent> as an argument so that the calling
-    // code can register its own event handler
-    pub async fn register_event_handler(&mut self, evt_code: u8) {
-        let mut registered_events = self.registered_events.lock().await;
-        registered_events.insert(evt_code);
+    pub async fn register_event_handler(&mut self, evt_code: u8, sender: Sender<HciEvent>) {
+        self.evt_handlers.lock().await.insert(evt_code, sender);
     }
 }
 
-async fn on_event(
-    pending_cmds: Arc<Mutex<Vec<HciCommandEntryInner>>>,
-    evt_tx: Sender<HciEvent>,
-    evt_rx: Arc<Mutex<mpsc::UnboundedReceiver<HciEvent>>>,
+async fn dispatch(
+    evt_handlers: Arc<Mutex<HashMap<u8, Sender<HciEvent>>>>,
+    evt_rx: Arc<Mutex<UnboundedReceiver<HciEvent>>>,
+    cmd_tx: UnboundedSender<HciCommand>,
+    mut cmd_rx: Receiver<Command>,
 ) {
-    while let Some(evt) = evt_rx.lock().await.recv().await {
-        let opcode = bt_packet::get_evt_opcode(&evt).unwrap();
-        let mut pending_cmds = pending_cmds.lock().await;
-        if let Some(pending_cmd) = remove_first(&mut pending_cmds, |entry| entry.opcode == opcode) {
-            pending_cmd.fut.send(evt).unwrap();
-        } else {
-            evt_tx.send(evt).await.unwrap();
+    let mut pending_cmds: Vec<PendingCommand> = Vec::new();
+    loop {
+        select! {
+            Some(evt) = consume(&evt_rx) => {
+                let opcode = bt_packet::get_evt_opcode(&evt).unwrap();
+                let evt_code = bt_packet::get_evt_code(&evt).unwrap();
+                if let Some(pending_cmd) = remove_first(&mut pending_cmds, |entry| entry.opcode == opcode) {
+                    pending_cmd.fut.send(evt).unwrap();
+                } else if let Some(sender) = evt_handlers.lock().await.get(&evt_code) {
+                    sender.send(evt).await.unwrap();
+                }
+            },
+            Some(cmd) = cmd_rx.recv() => {
+                pending_cmds.push(PendingCommand {
+                    opcode: bt_packet::get_cmd_opcode(&cmd.cmd).unwrap(),
+                    fut: cmd.fut,
+                });
+                cmd_tx.send(cmd.cmd).unwrap();
+            },
+            else => break,
         }
     }
 }
 
-async fn on_command(
-    pending_cmds: Arc<Mutex<Vec<HciCommandEntryInner>>>,
-    cmd_tx: mpsc::UnboundedSender<HciCommand>,
-    mut cmd_rx: mpsc::Receiver<HciCommandEntry>,
-) {
-    while let Some(cmd) = cmd_rx.recv().await {
-        let mut pending_cmds = pending_cmds.lock().await;
-        pending_cmds.push(HciCommandEntryInner {
-            opcode: bt_packet::get_cmd_opcode(&cmd.cmd).unwrap(),
-            fut: cmd.fut,
-        });
-        cmd_tx.send(cmd.cmd).unwrap();
-    }
+async fn consume(evt_rx: &Arc<Mutex<UnboundedReceiver<HciEvent>>>) -> Option<HciEvent> {
+    evt_rx.lock().await.recv().await
 }
 
 fn remove_first<T, P>(vec: &mut Vec<T>, predicate: P) -> Option<T>