OSDN Git Service

Use new tokio 1.x API functions.
authorIvan Lozano <ivanlozano@google.com>
Wed, 20 Jan 2021 14:28:28 +0000 (09:28 -0500)
committerIvan Lozano <ivanlozano@google.com>
Tue, 26 Jan 2021 02:28:24 +0000 (21:28 -0500)
Some functions have been renamed or removed as part of the tokio
1.x upgrade. This CL refactors the code to address that.

Also included are a couple minor rustfmt style changes as part of
submission.

Bug: 177808007
Test: mma
Change-Id: Ia147257b0cddb628866b34c9f755b1fe3c63e46d

gd/rust/common/Android.bp
gd/rust/facade/src/main.rs
gd/rust/link/Android.bp
gd/rust/link/src/acl/core.rs
gd/rust/link/src/acl/fragment.rs
gd/rust/shim/src/stack.rs

index 7ddc6f9..40e3da2 100644 (file)
@@ -7,6 +7,7 @@ rust_library {
     rustlibs: [
         "libtokio",
         "libnix",
+        "liblazy_static",
         "liblog_rust",
         "libcxx",
         "libgrpcio",
@@ -40,6 +41,7 @@ rust_test_host {
     rustlibs: [
         "libtokio",
         "libnix",
+        "liblazy_static",
         "liblog_rust",
         "libenv_logger",
         "libcxx",
index a7227d4..fd2fc48 100644 (file)
@@ -14,8 +14,9 @@ use futures::stream::StreamExt;
 use grpcio::*;
 use log::debug;
 use nix::sys::signal;
-use std::net::{IpAddr, Ipv4Addr, Shutdown, SocketAddr};
+use std::net::{IpAddr, Ipv4Addr, SocketAddr};
 use std::sync::{Arc, Mutex};
+use tokio::io::AsyncWriteExt;
 use tokio::net::TcpStream;
 use tokio::runtime::Runtime;
 
@@ -35,29 +36,16 @@ async fn async_main(rt: Arc<Runtime>, mut sigint: mpsc::UnboundedReceiver<()>) {
                 .default_value("8897")
                 .takes_value(true),
         )
-        .arg(
-            Arg::with_name("grpc-port")
-                .long("grpc-port")
-                .default_value("8899")
-                .takes_value(true),
-        )
+        .arg(Arg::with_name("grpc-port").long("grpc-port").default_value("8899").takes_value(true))
         .arg(
             Arg::with_name("signal-port")
                 .long("signal-port")
                 .default_value("8895")
                 .takes_value(true),
         )
-        .arg(
-            Arg::with_name("rootcanal-port")
-                .long("rootcanal-port")
-                .takes_value(true),
-        )
+        .arg(Arg::with_name("rootcanal-port").long("rootcanal-port").takes_value(true))
         .arg(Arg::with_name("btsnoop").long("btsnoop").takes_value(true))
-        .arg(
-            Arg::with_name("btconfig")
-                .long("btconfig")
-                .takes_value(true),
-        )
+        .arg(Arg::with_name("btconfig").long("btconfig").takes_value(true))
         .get_matches();
 
     let root_server_port = value_t!(matches, "root-server-port", u16).unwrap();
@@ -84,8 +72,8 @@ async fn async_main(rt: Arc<Runtime>, mut sigint: mpsc::UnboundedReceiver<()>) {
 
 async fn indicate_started(signal_port: u16) {
     let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), signal_port);
-    let stream = TcpStream::connect(address).await.unwrap();
-    stream.shutdown(Shutdown::Both).unwrap();
+    let mut stream = TcpStream::connect(address).await.unwrap();
+    stream.shutdown().await.unwrap();
 }
 
 // TODO: remove as this is a temporary nix-based hack to catch SIGINT
index a77409c..939fd02 100644 (file)
@@ -15,6 +15,7 @@ rust_library {
         "libnum_traits",
         "libthiserror",
         "libtokio",
+        "libtokio_stream",
         "libprotobuf",
         "libgddi",
         "liblog_rust",
index bb6583d..1e837ad 100644 (file)
@@ -16,6 +16,7 @@ use tokio::runtime::Runtime;
 use tokio::select;
 use tokio::sync::mpsc::{channel, Receiver, Sender};
 use tokio::sync::{oneshot, Mutex};
+use tokio_stream::wrappers::ReceiverStream;
 
 module! {
     core_module,
@@ -108,11 +109,11 @@ async fn provide_acl_dispatch(
                             match bt {
                                 Classic => {
                                     classic_outbound.push(fragmenting_stream(
-                                        in_rx, controller.acl_buffer_length.into(), handle, bt, close_rx));
+                                        ReceiverStream::new(in_rx), controller.acl_buffer_length.into(), handle, bt, close_rx));
                                 },
                                 Le => {
                                     le_outbound.push(fragmenting_stream(
-                                        in_rx, controller.le_buffer_length.into(), handle, bt, close_rx));
+                                        ReceiverStream::new(in_rx), controller.le_buffer_length.into(), handle, bt, close_rx));
                                 },
                             }
 
index b66b855..6e52cd5 100644 (file)
@@ -8,8 +8,9 @@ use bt_packets::hci::{AclBuilder, AclChild, AclPacket, BroadcastFlag};
 use bytes::{Buf, Bytes, BytesMut};
 use futures::stream::{self, StreamExt};
 use log::{error, info, warn};
-use tokio::sync::mpsc::{Receiver, Sender};
+use tokio::sync::mpsc::Sender;
 use tokio::sync::oneshot;
+use tokio_stream::wrappers::ReceiverStream;
 
 const L2CAP_BASIC_FRAME_HEADER_LEN: usize = 4;
 
@@ -87,7 +88,7 @@ fn get_l2cap_pdu_size(first_packet: &Bytes) -> usize {
 }
 
 pub fn fragmenting_stream(
-    rx: Receiver<Bytes>,
+    rx: ReceiverStream<Bytes>,
     mtu: usize,
     handle: u16,
     bt: Bluetooth,
index 67cf3dc..7dab8b9 100644 (file)
@@ -9,7 +9,12 @@ use tokio::runtime::{Builder, Runtime};
 
 lazy_static! {
     pub static ref RUNTIME: Arc<Runtime> = Arc::new(
-        Builder::new_multi_thread().worker_threads(1).max_threads(1).enable_all().build().unwrap()
+        Builder::new_multi_thread()
+            .worker_threads(1)
+            .max_blocking_threads(1)
+            .enable_all()
+            .build()
+            .unwrap()
     );
 }