From: Zach Johnson Date: Thu, 3 Dec 2020 00:30:29 +0000 (-0800) Subject: rusty-gd: reorg hci internals X-Git-Url: http://git.osdn.net/view?a=commitdiff_plain;h=e66d2d8682c6584b108314d85ee415f3decd1c27;p=android-x86%2Fsystem-bt.git rusty-gd: reorg hci internals with the intent of adding dynamically registered event receivers: 1. combine incoming and outgoing into a single selected dispatch (to eliminate need to lock on pending commands) 2. add dynamically registered event receivers Bug: 171749953 Tag: #gd-refactor Test: gd/cert/run --rhost Change-Id: I038364bc8d99b7ead4688f7570b79826fa7a1651 --- diff --git a/gd/rust/hci/src/facade.rs b/gd/rust/hci/src/facade.rs index 4667da400..29d6e070a 100644 --- a/gd/rust/hci/src/facade.rs +++ b/gd/rust/hci/src/facade.rs @@ -5,6 +5,7 @@ use bt_common::GrpcFacade; use bt_hci_proto::empty::Empty; use bt_hci_proto::facade::*; use bt_hci_proto::facade_grpc::{create_hci_layer_facade, HciLayerFacade}; +use bt_packet::HciEvent; use futures::prelude::*; use futures::sink::SinkExt; use gddi::{module, provides}; @@ -12,6 +13,8 @@ use grpcio::*; use log::error; use std::sync::Arc; use tokio::runtime::Runtime; +use tokio::sync::mpsc::{channel, Receiver, Sender}; +use tokio::sync::Mutex; module! { facade_module, @@ -22,7 +25,13 @@ module! { #[provides] async fn provide_facade(hci_exports: HciExports, rt: Arc) -> HciLayerFacadeService { - HciLayerFacadeService { hci_exports, rt } + let (from_hci_evt_tx, to_grpc_evt_rx) = channel::(10); + HciLayerFacadeService { + hci_exports, + rt, + from_hci_evt_tx, + to_grpc_evt_rx: Arc::new(Mutex::new(to_grpc_evt_rx)), + } } /// HCI layer facade service @@ -30,6 +39,8 @@ async fn provide_facade(hci_exports: HciExports, rt: Arc) -> HciLayerFa pub struct HciLayerFacadeService { hci_exports: HciExports, rt: Arc, + from_hci_evt_tx: Sender, + to_grpc_evt_rx: Arc>>, } impl GrpcFacade for HciLayerFacadeService { @@ -90,7 +101,7 @@ impl HciLayerFacade for HciLayerFacadeService { fn request_event(&mut self, ctx: RpcContext<'_>, code: EventRequest, sink: UnarySink) { self.rt.block_on( self.hci_exports - .register_event_handler(code.get_code() as u8), + .register_event_handler(code.get_code() as u8, self.from_hci_evt_tx.clone()), ); let f = sink @@ -125,7 +136,7 @@ impl HciLayerFacade for HciLayerFacadeService { _req: Empty, mut resp: ServerStreamingSink, ) { - let evt_rx = self.hci_exports.evt_rx.clone(); + let evt_rx = self.to_grpc_evt_rx.clone(); self.rt.spawn(async move { while let Some(event) = evt_rx.lock().await.recv().await { diff --git a/gd/rust/hci/src/lib.rs b/gd/rust/hci/src/lib.rs index dc4ae5616..e1b475f6a 100644 --- a/gd/rust/hci/src/lib.rs +++ b/gd/rust/hci/src/lib.rs @@ -11,11 +11,12 @@ use bt_packet::{HciCommand, HciEvent}; use error::Result; use facade::facade_module; use gddi::{module, provides}; -use std::collections::HashSet; +use std::collections::HashMap; use std::sync::Arc; use tokio::runtime::Runtime; -use tokio::sync::mpsc::{channel, Receiver, Sender}; -use tokio::sync::{mpsc, oneshot, Mutex}; +use tokio::select; +use tokio::sync::mpsc::{channel, Receiver, Sender, UnboundedReceiver, UnboundedSender}; +use tokio::sync::{oneshot, Mutex}; module! { hci_module, @@ -29,25 +30,19 @@ module! { #[provides] async fn provide_hci(hal_exports: HalExports, rt: Arc) -> HciExports { - let (evt_tx, evt_rx) = channel::(10); - let (cmd_tx, cmd_rx) = mpsc::channel::(10); - let hashset = Arc::new(Mutex::new(HashSet::new())); - let pending_cmds = Arc::new(Mutex::new(Vec::new())); - - rt.spawn(on_event( - pending_cmds.clone(), - evt_tx.clone(), + let (cmd_tx, cmd_rx) = channel::(10); + let evt_handlers = Arc::new(Mutex::new(HashMap::new())); + + rt.spawn(dispatch( + evt_handlers.clone(), hal_exports.evt_rx, + hal_exports.cmd_tx, + cmd_rx, )); - rt.spawn(on_command(pending_cmds, hal_exports.cmd_tx, cmd_rx)); - - let evt_rx = Arc::new(Mutex::new(evt_rx)); HciExports { cmd_tx, - registered_events: Arc::clone(&hashset), - evt_tx, - evt_rx, + evt_handlers, } } @@ -55,15 +50,13 @@ async fn provide_hci(hal_exports: HalExports, rt: Arc) -> HciExports { /// Uses a oneshot channel to wait until the event corresponding /// to the command is received #[derive(Debug)] -pub struct HciCommandEntry { - /// The HCI command to send +struct Command { cmd: HciCommand, - /// Transmit half of the oneshot fut: oneshot::Sender, } #[derive(Debug)] -struct HciCommandEntryInner { +struct PendingCommand { opcode: u16, fut: oneshot::Sender, } @@ -72,86 +65,68 @@ struct HciCommandEntryInner { #[derive(Clone)] pub struct HciExports { /// Transmit end of a channel used to send HCI commands - cmd_tx: Sender, - registered_events: Arc>>, - evt_tx: Sender, - /// Receive channel half used to receive HCI events from the HAL - pub evt_rx: Arc>>, + cmd_tx: Sender, + evt_handlers: Arc>>>, } impl HciExports { - /// Send the HCI command async fn send(&mut self, cmd: HciCommand) -> Result { let (tx, rx) = oneshot::channel::(); - self.cmd_tx.send(HciCommandEntry { cmd, fut: tx }).await?; + self.cmd_tx.send(Command { cmd, fut: tx }).await?; let event = rx.await?; Ok(event) } - /// Send the HCI event - async fn dispatch_event(&mut self, event: HciEvent) -> Result<()> { - let evt_code = bt_packet::get_evt_code(&event); - if let Some(evt_code) = evt_code { - let registered_events = self.registered_events.lock().await; - if registered_events.contains(&evt_code) { - self.evt_tx.send(event).await?; - } - } - Ok(()) - } - /// Enqueue an HCI command expecting a command complete /// response from the controller - pub async fn enqueue_command_with_complete(&mut self, cmd: HciCommand) { - let event = self.send(cmd).await.unwrap(); - self.dispatch_event(event).await.unwrap(); + pub async fn enqueue_command_with_complete(&mut self, cmd: HciCommand) -> HciEvent { + self.send(cmd).await.unwrap() } /// Enqueue an HCI command expecting a status response /// from the controller - pub async fn enqueue_command_with_status(&mut self, cmd: HciCommand) { - let event = self.send(cmd).await.unwrap(); - self.dispatch_event(event).await.unwrap(); + pub async fn enqueue_command_with_status(&mut self, cmd: HciCommand) -> HciEvent { + self.send(cmd).await.unwrap() } /// Indicate interest in specific HCI events - // TODO(qasimj): Add Sender as an argument so that the calling - // code can register its own event handler - pub async fn register_event_handler(&mut self, evt_code: u8) { - let mut registered_events = self.registered_events.lock().await; - registered_events.insert(evt_code); + pub async fn register_event_handler(&mut self, evt_code: u8, sender: Sender) { + self.evt_handlers.lock().await.insert(evt_code, sender); } } -async fn on_event( - pending_cmds: Arc>>, - evt_tx: Sender, - evt_rx: Arc>>, +async fn dispatch( + evt_handlers: Arc>>>, + evt_rx: Arc>>, + cmd_tx: UnboundedSender, + mut cmd_rx: Receiver, ) { - while let Some(evt) = evt_rx.lock().await.recv().await { - let opcode = bt_packet::get_evt_opcode(&evt).unwrap(); - let mut pending_cmds = pending_cmds.lock().await; - if let Some(pending_cmd) = remove_first(&mut pending_cmds, |entry| entry.opcode == opcode) { - pending_cmd.fut.send(evt).unwrap(); - } else { - evt_tx.send(evt).await.unwrap(); + let mut pending_cmds: Vec = Vec::new(); + loop { + select! { + Some(evt) = consume(&evt_rx) => { + let opcode = bt_packet::get_evt_opcode(&evt).unwrap(); + let evt_code = bt_packet::get_evt_code(&evt).unwrap(); + if let Some(pending_cmd) = remove_first(&mut pending_cmds, |entry| entry.opcode == opcode) { + pending_cmd.fut.send(evt).unwrap(); + } else if let Some(sender) = evt_handlers.lock().await.get(&evt_code) { + sender.send(evt).await.unwrap(); + } + }, + Some(cmd) = cmd_rx.recv() => { + pending_cmds.push(PendingCommand { + opcode: bt_packet::get_cmd_opcode(&cmd.cmd).unwrap(), + fut: cmd.fut, + }); + cmd_tx.send(cmd.cmd).unwrap(); + }, + else => break, } } } -async fn on_command( - pending_cmds: Arc>>, - cmd_tx: mpsc::UnboundedSender, - mut cmd_rx: mpsc::Receiver, -) { - while let Some(cmd) = cmd_rx.recv().await { - let mut pending_cmds = pending_cmds.lock().await; - pending_cmds.push(HciCommandEntryInner { - opcode: bt_packet::get_cmd_opcode(&cmd.cmd).unwrap(), - fut: cmd.fut, - }); - cmd_tx.send(cmd.cmd).unwrap(); - } +async fn consume(evt_rx: &Arc>>) -> Option { + evt_rx.lock().await.recv().await } fn remove_first(vec: &mut Vec, predicate: P) -> Option