Some functions have been renamed or removed as part of the tokio
1.x upgrade. This CL refactors the code to address that.
Also included are a couple minor rustfmt style changes as part of
submission.
Bug:
177808007
Test: mma
Change-Id: Ia147257b0cddb628866b34c9f755b1fe3c63e46d
rustlibs: [
"libtokio",
"libnix",
+ "liblazy_static",
"liblog_rust",
"libcxx",
"libgrpcio",
rustlibs: [
"libtokio",
"libnix",
+ "liblazy_static",
"liblog_rust",
"libenv_logger",
"libcxx",
use grpcio::*;
use log::debug;
use nix::sys::signal;
-use std::net::{IpAddr, Ipv4Addr, Shutdown, SocketAddr};
+use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::{Arc, Mutex};
+use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
use tokio::runtime::Runtime;
.default_value("8897")
.takes_value(true),
)
- .arg(
- Arg::with_name("grpc-port")
- .long("grpc-port")
- .default_value("8899")
- .takes_value(true),
- )
+ .arg(Arg::with_name("grpc-port").long("grpc-port").default_value("8899").takes_value(true))
.arg(
Arg::with_name("signal-port")
.long("signal-port")
.default_value("8895")
.takes_value(true),
)
- .arg(
- Arg::with_name("rootcanal-port")
- .long("rootcanal-port")
- .takes_value(true),
- )
+ .arg(Arg::with_name("rootcanal-port").long("rootcanal-port").takes_value(true))
.arg(Arg::with_name("btsnoop").long("btsnoop").takes_value(true))
- .arg(
- Arg::with_name("btconfig")
- .long("btconfig")
- .takes_value(true),
- )
+ .arg(Arg::with_name("btconfig").long("btconfig").takes_value(true))
.get_matches();
let root_server_port = value_t!(matches, "root-server-port", u16).unwrap();
async fn indicate_started(signal_port: u16) {
let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), signal_port);
- let stream = TcpStream::connect(address).await.unwrap();
- stream.shutdown(Shutdown::Both).unwrap();
+ let mut stream = TcpStream::connect(address).await.unwrap();
+ stream.shutdown().await.unwrap();
}
// TODO: remove as this is a temporary nix-based hack to catch SIGINT
"libnum_traits",
"libthiserror",
"libtokio",
+ "libtokio_stream",
"libprotobuf",
"libgddi",
"liblog_rust",
use tokio::select;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::{oneshot, Mutex};
+use tokio_stream::wrappers::ReceiverStream;
module! {
core_module,
match bt {
Classic => {
classic_outbound.push(fragmenting_stream(
- in_rx, controller.acl_buffer_length.into(), handle, bt, close_rx));
+ ReceiverStream::new(in_rx), controller.acl_buffer_length.into(), handle, bt, close_rx));
},
Le => {
le_outbound.push(fragmenting_stream(
- in_rx, controller.le_buffer_length.into(), handle, bt, close_rx));
+ ReceiverStream::new(in_rx), controller.le_buffer_length.into(), handle, bt, close_rx));
},
}
use bytes::{Buf, Bytes, BytesMut};
use futures::stream::{self, StreamExt};
use log::{error, info, warn};
-use tokio::sync::mpsc::{Receiver, Sender};
+use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot;
+use tokio_stream::wrappers::ReceiverStream;
const L2CAP_BASIC_FRAME_HEADER_LEN: usize = 4;
}
pub fn fragmenting_stream(
- rx: Receiver<Bytes>,
+ rx: ReceiverStream<Bytes>,
mtu: usize,
handle: u16,
bt: Bluetooth,
lazy_static! {
pub static ref RUNTIME: Arc<Runtime> = Arc::new(
- Builder::new_multi_thread().worker_threads(1).max_threads(1).enable_all().build().unwrap()
+ Builder::new_multi_thread()
+ .worker_threads(1)
+ .max_blocking_threads(1)
+ .enable_all()
+ .build()
+ .unwrap()
);
}