From b84fa4dc29aa94cfcc51fdb4856f0deb2e5a7625 Mon Sep 17 00:00:00 2001 From: Zach Johnson Date: Tue, 22 Dec 2020 16:06:15 -0800 Subject: [PATCH] rusty-gd: introduce RxAdapter this consolidates logic for dispatching channels over gRPC or callbacks into legacy Bug: 171749953 Tag: #gd-refactor Test: gd/cert/run --rhost SimpleHalTest Change-Id: I724939c1e55312bfb2c33b906252e6df111bd9f5 --- gd/packet/parser/gen_rust.cc | 5 +++ gd/packet/parser/packet_def.cc | 19 ++++++----- gd/rust/facade_helpers/Android.bp | 18 +++++++++++ gd/rust/facade_helpers/src/lib.rs | 67 ++++++++++++++++++++++++++++++++++++++ gd/rust/hal/src/facade.rs | 2 +- gd/rust/hal/src/hidl_hal.rs | 2 +- gd/rust/hal/src/rootcanal_hal.rs | 2 +- gd/rust/hal/src/snoop.rs | 2 +- gd/rust/hci/Android.bp | 1 + gd/rust/hci/src/facade.rs | 68 ++++++++++----------------------------- gd/rust/shim/Android.bp | 1 + gd/rust/shim/src/hci.rs | 62 ++++++++++++----------------------- 12 files changed, 144 insertions(+), 105 deletions(-) create mode 100644 gd/rust/facade_helpers/Android.bp create mode 100644 gd/rust/facade_helpers/src/lib.rs diff --git a/gd/packet/parser/gen_rust.cc b/gd/packet/parser/gen_rust.cc index 3780d1496..6456b8caa 100644 --- a/gd/packet/parser/gen_rust.cc +++ b/gd/packet/parser/gen_rust.cc @@ -37,6 +37,11 @@ pub enum Error { InvalidPacketError } +pub trait Packet { + fn to_bytes(self) -> Bytes; + fn to_vec(self) -> Vec; +} + )"; } diff --git a/gd/packet/parser/packet_def.cc b/gd/packet/parser/packet_def.cc index ba76f179f..b11c6ed16 100644 --- a/gd/packet/parser/packet_def.cc +++ b/gd/packet/parser/packet_def.cc @@ -1041,22 +1041,25 @@ void PacketDef::GenRustAccessStructImpls(std::ostream& s) const { s << "}"; } - s << "impl " << name_ << "Packet {"; - if (parent_ == nullptr) { - s << "pub fn parse(bytes: &[u8]) -> Result { "; - s << "Ok(Self::new(Arc::new(" << name_ << "Data::parse(bytes)?)))"; - s << "}"; - } + s << "impl Packet for " << name_ << "Packet {"; auto root = GetRootDef(); auto root_accessor = util::CamelCaseToUnderScore(root->name_); - s << "pub fn to_bytes(self) -> Bytes {"; + s << "fn to_bytes(self) -> Bytes {"; s << " let mut buffer = BytesMut::new();"; s << " self." << root_accessor << ".write_to(&mut buffer);"; s << " buffer.freeze()"; s << "}\n"; - s << "pub fn to_vec(self) -> Vec { self.to_bytes().to_vec() }\n"; + s << "fn to_vec(self) -> Vec { self.to_bytes().to_vec() }\n"; + s << "}"; + + s << "impl " << name_ << "Packet {"; + if (parent_ == nullptr) { + s << "pub fn parse(bytes: &[u8]) -> Result { "; + s << "Ok(Self::new(Arc::new(" << name_ << "Data::parse(bytes)?)))"; + s << "}"; + } if (!children_.empty()) { s << " pub fn specialize(&self) -> " << name_ << "Child {"; diff --git a/gd/rust/facade_helpers/Android.bp b/gd/rust/facade_helpers/Android.bp new file mode 100644 index 000000000..5bb852c16 --- /dev/null +++ b/gd/rust/facade_helpers/Android.bp @@ -0,0 +1,18 @@ +rust_library { + name: "libbt_facade_helpers", + defaults: ["gd_rust_defaults"], + crate_name: "bt_facade_helpers", + srcs: ["src/lib.rs"], + edition: "2018", + rustlibs: [ + "libbt_facade_proto", + "libbt_packets", + "libbytes", + "libfutures", + "libgrpcio", + "libtokio", + "libprotobuf", + "liblog_rust", + "libcxx", + ], +} diff --git a/gd/rust/facade_helpers/src/lib.rs b/gd/rust/facade_helpers/src/lib.rs new file mode 100644 index 000000000..fb8b8c822 --- /dev/null +++ b/gd/rust/facade_helpers/src/lib.rs @@ -0,0 +1,67 @@ +//! common facade & shim helpers + +use bt_facade_proto::common::Data; +use bt_packets::hci::Packet; +use futures::sink::SinkExt; +use grpcio::*; +use std::sync::Arc; +use tokio::runtime::Runtime; +use tokio::sync::mpsc::Receiver; +use tokio::sync::Mutex; + +/// Wrapper so we can invoke callbacks +pub trait U8SliceRunnable { + /// Do the thing + fn run(&self, data: &[u8]); +} + +/// Helper for interfacing channels with shim or gRPC boundaries +#[derive(Clone)] +pub struct RxAdapter { + rx: Arc>>, + running: bool, +} + +impl RxAdapter { + /// New, from an unwrapped receiver + pub fn new(rx: Receiver) -> Self { + Self::from_arc(Arc::new(Mutex::new(rx))) + } + + /// New, from an already arc mutexed receiver + pub fn from_arc(rx: Arc>>) -> Self { + Self { rx, running: false } + } + + /// Stream out the channel over the provided sink + pub fn stream_grpc(&mut self, ctx: RpcContext<'_>, mut sink: ServerStreamingSink) { + assert!(!self.running); + self.running = true; + + let clone_rx = self.rx.clone(); + ctx.spawn(async move { + while let Some(payload) = clone_rx.lock().await.recv().await { + let mut data = Data::default(); + data.set_payload(payload.to_vec()); + sink.send((data, WriteFlags::default())).await.unwrap(); + } + }); + } + + /// Stream out the channel over the provided shim runnable + pub fn stream_runnable( + &mut self, + rt: &Arc, + runnable: R, + ) { + assert!(!self.running); + self.running = true; + + let clone_rx = self.rx.clone(); + rt.spawn(async move { + while let Some(payload) = clone_rx.lock().await.recv().await { + runnable.run(&payload.to_bytes()); + } + }); + } +} diff --git a/gd/rust/hal/src/facade.rs b/gd/rust/hal/src/facade.rs index 1f5fb692e..411ada9e1 100644 --- a/gd/rust/hal/src/facade.rs +++ b/gd/rust/hal/src/facade.rs @@ -5,7 +5,7 @@ use bt_common::GrpcFacade; use bt_facade_proto::common::Data; use bt_facade_proto::empty::Empty; use bt_facade_proto::hal_facade_grpc::{create_hci_hal_facade, HciHalFacade}; -use bt_packets::hci::{AclPacket, CommandPacket}; +use bt_packets::hci::{AclPacket, CommandPacket, Packet}; use futures::sink::SinkExt; use gddi::{module, provides, Stoppable}; use grpcio::*; diff --git a/gd/rust/hal/src/hidl_hal.rs b/gd/rust/hal/src/hidl_hal.rs index 4ef2919b7..c5eb838da 100644 --- a/gd/rust/hal/src/hidl_hal.rs +++ b/gd/rust/hal/src/hidl_hal.rs @@ -1,6 +1,6 @@ //! Implementation of the HAl that talks to BT controller over Android's HIDL use crate::internal::{InnerHal, RawHal}; -use bt_packets::hci::{AclPacket, CommandPacket, EventPacket}; +use bt_packets::hci::{AclPacket, CommandPacket, EventPacket, Packet}; use gddi::{module, provides}; use std::sync::Arc; use std::sync::Mutex; diff --git a/gd/rust/hal/src/rootcanal_hal.rs b/gd/rust/hal/src/rootcanal_hal.rs index 3cca452fc..f6cddfc8b 100644 --- a/gd/rust/hal/src/rootcanal_hal.rs +++ b/gd/rust/hal/src/rootcanal_hal.rs @@ -4,7 +4,7 @@ use crate::internal::{InnerHal, RawHal}; use crate::{Result, H4_HEADER_SIZE}; -use bt_packets::hci::{AclPacket, CommandPacket, EventPacket}; +use bt_packets::hci::{AclPacket, CommandPacket, EventPacket, Packet}; use bytes::{BufMut, Bytes, BytesMut}; use gddi::{module, provides, Stoppable}; use num_derive::{FromPrimitive, ToPrimitive}; diff --git a/gd/rust/hal/src/snoop.rs b/gd/rust/hal/src/snoop.rs index fd2859b02..cd7801987 100644 --- a/gd/rust/hal/src/snoop.rs +++ b/gd/rust/hal/src/snoop.rs @@ -2,7 +2,7 @@ use crate::internal::RawHal; use bt_common::sys_prop; -use bt_packets::hci::{AclPacket, CommandPacket, EventPacket}; +use bt_packets::hci::{AclPacket, CommandPacket, EventPacket, Packet}; use bytes::{BufMut, Bytes, BytesMut}; use gddi::{module, part_out, provides, Stoppable}; use log::error; diff --git a/gd/rust/hci/Android.bp b/gd/rust/hci/Android.bp index 2b6e4c907..5dafe155f 100644 --- a/gd/rust/hci/Android.bp +++ b/gd/rust/hci/Android.bp @@ -19,6 +19,7 @@ rust_library { "liblog_rust", "libbt_common", "libbt_hci_custom_types", + "libbt_facade_helpers", ], proc_macros: [ "libnum_derive", diff --git a/gd/rust/hci/src/facade.rs b/gd/rust/hci/src/facade.rs index c07865731..c76df0b61 100644 --- a/gd/rust/hci/src/facade.rs +++ b/gd/rust/hci/src/facade.rs @@ -2,6 +2,7 @@ use crate::{EventRegistry, RawCommandSender}; use bt_common::GrpcFacade; +use bt_facade_helpers::RxAdapter; use bt_facade_proto::common::Data; use bt_facade_proto::empty::Empty; use bt_facade_proto::hci_facade::EventRequest; @@ -10,13 +11,10 @@ use bt_hal::AclHal; use bt_packets::hci::{ AclPacket, CommandPacket, EventCode, EventPacket, LeMetaEventPacket, SubeventCode, }; -use futures::sink::SinkExt; use gddi::{module, provides, Stoppable}; use grpcio::*; use num_traits::FromPrimitive; -use std::sync::Arc; -use tokio::sync::mpsc::{channel, Receiver, Sender}; -use tokio::sync::Mutex; +use tokio::sync::mpsc::{channel, Sender}; module! { facade_module, @@ -36,11 +34,12 @@ async fn provide_facade( HciFacadeService { commands, events, - acl, evt_tx, - evt_rx: Arc::new(Mutex::new(evt_rx)), + evt_rx: RxAdapter::new(evt_rx), le_evt_tx, - le_evt_rx: Arc::new(Mutex::new(le_evt_rx)), + le_evt_rx: RxAdapter::new(le_evt_rx), + acl_tx: acl.tx, + acl_rx: RxAdapter::from_arc(acl.rx), } } @@ -50,11 +49,12 @@ async fn provide_facade( pub struct HciFacadeService { pub commands: RawCommandSender, events: EventRegistry, - pub acl: AclHal, evt_tx: Sender, - pub evt_rx: Arc>>, + pub evt_rx: RxAdapter, le_evt_tx: Sender, - pub le_evt_rx: Arc>>, + pub le_evt_rx: RxAdapter, + pub acl_tx: Sender, + pub acl_rx: RxAdapter, } impl HciFacadeService { @@ -109,61 +109,27 @@ impl HciFacade for HciFacadeService { } fn send_acl(&mut self, ctx: RpcContext<'_>, mut packet: Data, sink: UnarySink) { - let acl_tx = self.acl.tx.clone(); + let acl_tx = self.acl_tx.clone(); ctx.spawn(async move { acl_tx.send(AclPacket::parse(&packet.take_payload()).unwrap()).await.unwrap(); sink.success(Empty::default()).await.unwrap(); }); } - fn stream_events( - &mut self, - ctx: RpcContext<'_>, - _req: Empty, - mut resp: ServerStreamingSink, - ) { - let evt_rx = self.evt_rx.clone(); - - ctx.spawn(async move { - while let Some(event) = evt_rx.lock().await.recv().await { - let mut evt = Data::default(); - evt.set_payload(event.to_vec()); - resp.send((evt, WriteFlags::default())).await.unwrap(); - } - }); + fn stream_events(&mut self, ctx: RpcContext<'_>, _req: Empty, sink: ServerStreamingSink) { + self.evt_rx.stream_grpc(ctx, sink); } fn stream_le_subevents( &mut self, ctx: RpcContext<'_>, _req: Empty, - mut resp: ServerStreamingSink, + sink: ServerStreamingSink, ) { - let evt_rx = self.le_evt_rx.clone(); - - ctx.spawn(async move { - while let Some(event) = evt_rx.lock().await.recv().await { - let mut evt = Data::default(); - evt.set_payload(event.to_vec()); - resp.send((evt, WriteFlags::default())).await.unwrap(); - } - }); + self.le_evt_rx.stream_grpc(ctx, sink); } - fn stream_acl( - &mut self, - ctx: RpcContext<'_>, - _req: Empty, - mut resp: ServerStreamingSink, - ) { - let acl_rx = self.acl.rx.clone(); - - ctx.spawn(async move { - while let Some(data) = acl_rx.lock().await.recv().await { - let mut packet = Data::default(); - packet.set_payload(data.to_vec()); - resp.send((packet, WriteFlags::default())).await.unwrap(); - } - }); + fn stream_acl(&mut self, ctx: RpcContext<'_>, _req: Empty, sink: ServerStreamingSink) { + self.acl_rx.stream_grpc(ctx, sink); } } diff --git a/gd/rust/shim/Android.bp b/gd/rust/shim/Android.bp index 6ae4f4f1d..e315ea704 100644 --- a/gd/rust/shim/Android.bp +++ b/gd/rust/shim/Android.bp @@ -34,6 +34,7 @@ rust_ffi_static { "libnum_traits", "libnix", "liblog_rust", + "libbt_facade_helpers", ], proc_macros: [ "libpaste", diff --git a/gd/rust/shim/src/hci.rs b/gd/rust/shim/src/hci.rs index 4407a168c..f7e255fa7 100644 --- a/gd/rust/shim/src/hci.rs +++ b/gd/rust/shim/src/hci.rs @@ -1,7 +1,8 @@ //! Hci shim +use bt_facade_helpers::U8SliceRunnable; use bt_hci::facade::HciFacadeService; -use bt_packets::hci::{AclPacket, CommandPacket}; +use bt_packets::hci::{AclPacket, CommandPacket, Packet}; use std::sync::Arc; use tokio::runtime::Runtime; @@ -35,23 +36,24 @@ mod ffi { unsafe impl Send for ffi::u8SliceCallback {} unsafe impl Send for ffi::u8SliceOnceCallback {} +struct CallbackWrapper { + cb: cxx::UniquePtr, +} + +impl U8SliceRunnable for CallbackWrapper { + fn run(&self, data: &[u8]) { + self.cb.Run(data); + } +} + pub struct Hci { internal: HciFacadeService, rt: Arc, - acl_callback_set: bool, - evt_callback_set: bool, - le_evt_callback_set: bool, } impl Hci { pub fn new(rt: Arc, internal: HciFacadeService) -> Self { - Self { - rt, - internal, - acl_callback_set: false, - evt_callback_set: false, - le_evt_callback_set: false, - } + Self { rt, internal } } } @@ -69,7 +71,7 @@ pub fn hci_send_command( } pub fn hci_send_acl(hci: &mut Hci, data: &[u8]) { - hci.rt.block_on(hci.internal.acl.tx.send(AclPacket::parse(data).unwrap())).unwrap(); + hci.rt.block_on(hci.internal.acl_tx.send(AclPacket::parse(data).unwrap())).unwrap(); } pub fn hci_register_event(hci: &mut Hci, event: u8) { @@ -80,38 +82,14 @@ pub fn hci_register_le_event(hci: &mut Hci, subevent: u8) { hci.rt.block_on(hci.internal.register_le_event(subevent.into())); } -pub fn hci_set_acl_callback(hci: &mut Hci, callback: cxx::UniquePtr) { - assert!(!hci.acl_callback_set); - hci.acl_callback_set = true; - - let stream = hci.internal.acl.rx.clone(); - hci.rt.spawn(async move { - while let Some(item) = stream.lock().await.recv().await { - callback.Run(&item.to_bytes()); - } - }); +pub fn hci_set_acl_callback(hci: &mut Hci, cb: cxx::UniquePtr) { + hci.internal.acl_rx.stream_runnable(&hci.rt, CallbackWrapper { cb }); } -pub fn hci_set_evt_callback(hci: &mut Hci, callback: cxx::UniquePtr) { - assert!(!hci.evt_callback_set); - hci.evt_callback_set = true; - - let stream = hci.internal.evt_rx.clone(); - hci.rt.spawn(async move { - while let Some(item) = stream.lock().await.recv().await { - callback.Run(&item.to_bytes()); - } - }); +pub fn hci_set_evt_callback(hci: &mut Hci, cb: cxx::UniquePtr) { + hci.internal.evt_rx.stream_runnable(&hci.rt, CallbackWrapper { cb }); } -pub fn hci_set_le_evt_callback(hci: &mut Hci, callback: cxx::UniquePtr) { - assert!(!hci.le_evt_callback_set); - hci.le_evt_callback_set = true; - - let stream = hci.internal.le_evt_rx.clone(); - hci.rt.spawn(async move { - while let Some(item) = stream.lock().await.recv().await { - callback.Run(&item.to_bytes()); - } - }); +pub fn hci_set_le_evt_callback(hci: &mut Hci, cb: cxx::UniquePtr) { + hci.internal.le_evt_rx.stream_runnable(&hci.rt, CallbackWrapper { cb }); } -- 2.11.0