use bt_hci_proto::empty::Empty;
use bt_hci_proto::facade::*;
use bt_hci_proto::facade_grpc::{create_hci_layer_facade, HciLayerFacade};
+use bt_packet::HciEvent;
use futures::prelude::*;
use futures::sink::SinkExt;
use gddi::{module, provides};
use log::error;
use std::sync::Arc;
use tokio::runtime::Runtime;
+use tokio::sync::mpsc::{channel, Receiver, Sender};
+use tokio::sync::Mutex;
module! {
facade_module,
#[provides]
async fn provide_facade(hci_exports: HciExports, rt: Arc<Runtime>) -> HciLayerFacadeService {
- HciLayerFacadeService { hci_exports, rt }
+ let (from_hci_evt_tx, to_grpc_evt_rx) = channel::<HciEvent>(10);
+ HciLayerFacadeService {
+ hci_exports,
+ rt,
+ from_hci_evt_tx,
+ to_grpc_evt_rx: Arc::new(Mutex::new(to_grpc_evt_rx)),
+ }
}
/// HCI layer facade service
pub struct HciLayerFacadeService {
hci_exports: HciExports,
rt: Arc<Runtime>,
+ from_hci_evt_tx: Sender<HciEvent>,
+ to_grpc_evt_rx: Arc<Mutex<Receiver<HciEvent>>>,
}
impl GrpcFacade for HciLayerFacadeService {
fn request_event(&mut self, ctx: RpcContext<'_>, code: EventRequest, sink: UnarySink<Empty>) {
self.rt.block_on(
self.hci_exports
- .register_event_handler(code.get_code() as u8),
+ .register_event_handler(code.get_code() as u8, self.from_hci_evt_tx.clone()),
);
let f = sink
_req: Empty,
mut resp: ServerStreamingSink<Event>,
) {
- let evt_rx = self.hci_exports.evt_rx.clone();
+ let evt_rx = self.to_grpc_evt_rx.clone();
self.rt.spawn(async move {
while let Some(event) = evt_rx.lock().await.recv().await {
use error::Result;
use facade::facade_module;
use gddi::{module, provides};
-use std::collections::HashSet;
+use std::collections::HashMap;
use std::sync::Arc;
use tokio::runtime::Runtime;
-use tokio::sync::mpsc::{channel, Receiver, Sender};
-use tokio::sync::{mpsc, oneshot, Mutex};
+use tokio::select;
+use tokio::sync::mpsc::{channel, Receiver, Sender, UnboundedReceiver, UnboundedSender};
+use tokio::sync::{oneshot, Mutex};
module! {
hci_module,
#[provides]
async fn provide_hci(hal_exports: HalExports, rt: Arc<Runtime>) -> HciExports {
- let (evt_tx, evt_rx) = channel::<HciEvent>(10);
- let (cmd_tx, cmd_rx) = mpsc::channel::<HciCommandEntry>(10);
- let hashset = Arc::new(Mutex::new(HashSet::new()));
- let pending_cmds = Arc::new(Mutex::new(Vec::new()));
-
- rt.spawn(on_event(
- pending_cmds.clone(),
- evt_tx.clone(),
+ let (cmd_tx, cmd_rx) = channel::<Command>(10);
+ let evt_handlers = Arc::new(Mutex::new(HashMap::new()));
+
+ rt.spawn(dispatch(
+ evt_handlers.clone(),
hal_exports.evt_rx,
+ hal_exports.cmd_tx,
+ cmd_rx,
));
- rt.spawn(on_command(pending_cmds, hal_exports.cmd_tx, cmd_rx));
-
- let evt_rx = Arc::new(Mutex::new(evt_rx));
HciExports {
cmd_tx,
- registered_events: Arc::clone(&hashset),
- evt_tx,
- evt_rx,
+ evt_handlers,
}
}
/// Uses a oneshot channel to wait until the event corresponding
/// to the command is received
#[derive(Debug)]
-pub struct HciCommandEntry {
- /// The HCI command to send
+struct Command {
cmd: HciCommand,
- /// Transmit half of the oneshot
fut: oneshot::Sender<HciCommand>,
}
#[derive(Debug)]
-struct HciCommandEntryInner {
+struct PendingCommand {
opcode: u16,
fut: oneshot::Sender<HciCommand>,
}
#[derive(Clone)]
pub struct HciExports {
/// Transmit end of a channel used to send HCI commands
- cmd_tx: Sender<HciCommandEntry>,
- registered_events: Arc<Mutex<HashSet<u8>>>,
- evt_tx: Sender<HciEvent>,
- /// Receive channel half used to receive HCI events from the HAL
- pub evt_rx: Arc<Mutex<Receiver<HciEvent>>>,
+ cmd_tx: Sender<Command>,
+ evt_handlers: Arc<Mutex<HashMap<u8, Sender<HciEvent>>>>,
}
impl HciExports {
- /// Send the HCI command
async fn send(&mut self, cmd: HciCommand) -> Result<HciEvent> {
let (tx, rx) = oneshot::channel::<HciEvent>();
- self.cmd_tx.send(HciCommandEntry { cmd, fut: tx }).await?;
+ self.cmd_tx.send(Command { cmd, fut: tx }).await?;
let event = rx.await?;
Ok(event)
}
- /// Send the HCI event
- async fn dispatch_event(&mut self, event: HciEvent) -> Result<()> {
- let evt_code = bt_packet::get_evt_code(&event);
- if let Some(evt_code) = evt_code {
- let registered_events = self.registered_events.lock().await;
- if registered_events.contains(&evt_code) {
- self.evt_tx.send(event).await?;
- }
- }
- Ok(())
- }
-
/// Enqueue an HCI command expecting a command complete
/// response from the controller
- pub async fn enqueue_command_with_complete(&mut self, cmd: HciCommand) {
- let event = self.send(cmd).await.unwrap();
- self.dispatch_event(event).await.unwrap();
+ pub async fn enqueue_command_with_complete(&mut self, cmd: HciCommand) -> HciEvent {
+ self.send(cmd).await.unwrap()
}
/// Enqueue an HCI command expecting a status response
/// from the controller
- pub async fn enqueue_command_with_status(&mut self, cmd: HciCommand) {
- let event = self.send(cmd).await.unwrap();
- self.dispatch_event(event).await.unwrap();
+ pub async fn enqueue_command_with_status(&mut self, cmd: HciCommand) -> HciEvent {
+ self.send(cmd).await.unwrap()
}
/// Indicate interest in specific HCI events
- // TODO(qasimj): Add Sender<HciEvent> as an argument so that the calling
- // code can register its own event handler
- pub async fn register_event_handler(&mut self, evt_code: u8) {
- let mut registered_events = self.registered_events.lock().await;
- registered_events.insert(evt_code);
+ pub async fn register_event_handler(&mut self, evt_code: u8, sender: Sender<HciEvent>) {
+ self.evt_handlers.lock().await.insert(evt_code, sender);
}
}
-async fn on_event(
- pending_cmds: Arc<Mutex<Vec<HciCommandEntryInner>>>,
- evt_tx: Sender<HciEvent>,
- evt_rx: Arc<Mutex<mpsc::UnboundedReceiver<HciEvent>>>,
+async fn dispatch(
+ evt_handlers: Arc<Mutex<HashMap<u8, Sender<HciEvent>>>>,
+ evt_rx: Arc<Mutex<UnboundedReceiver<HciEvent>>>,
+ cmd_tx: UnboundedSender<HciCommand>,
+ mut cmd_rx: Receiver<Command>,
) {
- while let Some(evt) = evt_rx.lock().await.recv().await {
- let opcode = bt_packet::get_evt_opcode(&evt).unwrap();
- let mut pending_cmds = pending_cmds.lock().await;
- if let Some(pending_cmd) = remove_first(&mut pending_cmds, |entry| entry.opcode == opcode) {
- pending_cmd.fut.send(evt).unwrap();
- } else {
- evt_tx.send(evt).await.unwrap();
+ let mut pending_cmds: Vec<PendingCommand> = Vec::new();
+ loop {
+ select! {
+ Some(evt) = consume(&evt_rx) => {
+ let opcode = bt_packet::get_evt_opcode(&evt).unwrap();
+ let evt_code = bt_packet::get_evt_code(&evt).unwrap();
+ if let Some(pending_cmd) = remove_first(&mut pending_cmds, |entry| entry.opcode == opcode) {
+ pending_cmd.fut.send(evt).unwrap();
+ } else if let Some(sender) = evt_handlers.lock().await.get(&evt_code) {
+ sender.send(evt).await.unwrap();
+ }
+ },
+ Some(cmd) = cmd_rx.recv() => {
+ pending_cmds.push(PendingCommand {
+ opcode: bt_packet::get_cmd_opcode(&cmd.cmd).unwrap(),
+ fut: cmd.fut,
+ });
+ cmd_tx.send(cmd.cmd).unwrap();
+ },
+ else => break,
}
}
}
-async fn on_command(
- pending_cmds: Arc<Mutex<Vec<HciCommandEntryInner>>>,
- cmd_tx: mpsc::UnboundedSender<HciCommand>,
- mut cmd_rx: mpsc::Receiver<HciCommandEntry>,
-) {
- while let Some(cmd) = cmd_rx.recv().await {
- let mut pending_cmds = pending_cmds.lock().await;
- pending_cmds.push(HciCommandEntryInner {
- opcode: bt_packet::get_cmd_opcode(&cmd.cmd).unwrap(),
- fut: cmd.fut,
- });
- cmd_tx.send(cmd.cmd).unwrap();
- }
+async fn consume(evt_rx: &Arc<Mutex<UnboundedReceiver<HciEvent>>>) -> Option<HciEvent> {
+ evt_rx.lock().await.recv().await
}
fn remove_first<T, P>(vec: &mut Vec<T>, predicate: P) -> Option<T>