OSDN Git Service

rusty-gd: update hal facade
authorZach Johnson <zachoverflow@google.com>
Wed, 23 Dec 2020 02:02:19 +0000 (18:02 -0800)
committerZach Johnson <zachoverflow@google.com>
Wed, 13 Jan 2021 23:13:59 +0000 (15:13 -0800)
use RpcContext & RxAdapter

Bug: 171749953
Tag: #gd-refactor
Test: gd/cert/run --rhost SimpleHalTest
Change-Id: Ia401bcf0f4fcf4c14db4c93d96590c88b606c3af

gd/rust/hal/Android.bp
gd/rust/hal/src/facade.rs

index 3429683..71817cc 100644 (file)
@@ -20,6 +20,7 @@ rust_library {
         "liblog_rust",
         "libbt_common",
         "libnum_traits",
+        "libbt_facade_helpers",
     ],
     proc_macros: [
         "libnum_derive",
index 411ada9..49ab08c 100644 (file)
@@ -2,15 +2,13 @@
 
 use crate::{AclHal, ControlHal};
 use bt_common::GrpcFacade;
+use bt_facade_helpers::RxAdapter;
 use bt_facade_proto::common::Data;
 use bt_facade_proto::empty::Empty;
 use bt_facade_proto::hal_facade_grpc::{create_hci_hal_facade, HciHalFacade};
-use bt_packets::hci::{AclPacket, CommandPacket, Packet};
-use futures::sink::SinkExt;
+use bt_packets::hci::{AclPacket, CommandPacket, EventPacket};
 use gddi::{module, provides, Stoppable};
 use grpcio::*;
-use std::sync::Arc;
-use tokio::runtime::Runtime;
 
 module! {
     hal_facade_module,
@@ -20,14 +18,20 @@ module! {
 }
 
 #[provides]
-async fn provide_facade(control: ControlHal, acl: AclHal, rt: Arc<Runtime>) -> HciHalFacadeService {
-    HciHalFacadeService { rt, control, acl }
+async fn provide_facade(control: ControlHal, acl: AclHal) -> HciHalFacadeService {
+    HciHalFacadeService {
+        evt_rx: RxAdapter::from_arc(control.rx.clone()),
+        acl_rx: RxAdapter::from_arc(acl.rx.clone()),
+        control,
+        acl,
+    }
 }
 
 /// HCI HAL facade service
 #[derive(Clone, Stoppable)]
 pub struct HciHalFacadeService {
-    rt: Arc<Runtime>,
+    evt_rx: RxAdapter<EventPacket>,
+    acl_rx: RxAdapter<AclPacket>,
     control: ControlHal,
     acl: AclHal,
 }
@@ -39,20 +43,20 @@ impl GrpcFacade for HciHalFacadeService {
 }
 
 impl HciHalFacade for HciHalFacadeService {
-    fn send_command(&mut self, _ctx: RpcContext<'_>, mut data: Data, sink: UnarySink<Empty>) {
+    fn send_command(&mut self, ctx: RpcContext<'_>, mut data: Data, sink: UnarySink<Empty>) {
         let cmd_tx = self.control.tx.clone();
-        self.rt.block_on(async move {
+        ctx.spawn(async move {
             cmd_tx.send(CommandPacket::parse(&data.take_payload()).unwrap()).await.unwrap();
+            sink.success(Empty::default()).await.unwrap();
         });
-        sink.success(Empty::default());
     }
 
-    fn send_acl(&mut self, _ctx: RpcContext<'_>, mut data: Data, sink: UnarySink<Empty>) {
+    fn send_acl(&mut self, ctx: RpcContext<'_>, mut data: Data, sink: UnarySink<Empty>) {
         let acl_tx = self.acl.tx.clone();
-        self.rt.block_on(async move {
+        ctx.spawn(async move {
             acl_tx.send(AclPacket::parse(&data.take_payload()).unwrap()).await.unwrap();
+            sink.success(Empty::default()).await.unwrap();
         });
-        sink.success(Empty::default());
     }
 
     fn send_sco(&mut self, _ctx: RpcContext<'_>, _sco: Data, _sink: UnarySink<Empty>) {
@@ -63,31 +67,12 @@ impl HciHalFacade for HciHalFacadeService {
         unimplemented!()
     }
 
-    fn stream_events(
-        &mut self,
-        _ctx: RpcContext<'_>,
-        _: Empty,
-        mut sink: ServerStreamingSink<Data>,
-    ) {
-        let evt_rx = self.control.rx.clone();
-        self.rt.spawn(async move {
-            while let Some(event) = evt_rx.lock().await.recv().await {
-                let mut output = Data::default();
-                output.set_payload(event.to_vec());
-                sink.send((output, WriteFlags::default())).await.unwrap();
-            }
-        });
+    fn stream_events(&mut self, ctx: RpcContext<'_>, _: Empty, sink: ServerStreamingSink<Data>) {
+        self.evt_rx.stream_grpc(ctx, sink);
     }
 
-    fn stream_acl(&mut self, _ctx: RpcContext<'_>, _: Empty, mut sink: ServerStreamingSink<Data>) {
-        let acl_rx = self.acl.rx.clone();
-        self.rt.spawn(async move {
-            while let Some(acl) = acl_rx.lock().await.recv().await {
-                let mut output = Data::default();
-                output.set_payload(acl.to_vec());
-                sink.send((output, WriteFlags::default())).await.unwrap();
-            }
-        });
+    fn stream_acl(&mut self, ctx: RpcContext<'_>, _: Empty, sink: ServerStreamingSink<Data>) {
+        self.acl_rx.stream_grpc(ctx, sink);
     }
 
     fn stream_sco(&mut self, _ctx: RpcContext<'_>, _: Empty, _sink: ServerStreamingSink<Data>) {