From b4976675fa9af48fff61c05842154c4c04cded49 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Wed, 23 Dec 2020 10:01:44 -0800 Subject: [PATCH] Update to Tokio and Bytes 1.0 (#504) --- Cargo.toml | 16 ++++++++-------- examples/akamai.rs | 5 +++++ src/codec/framed_write.rs | 8 ++++---- src/hpack/decoder.rs | 4 ++-- src/proto/streams/prioritize.rs | 8 ++++++-- tests/h2-fuzz/Cargo.toml | 4 ++-- tests/h2-support/Cargo.toml | 6 +++--- tests/h2-support/src/mock.rs | 2 +- tests/h2-tests/Cargo.toml | 2 +- tests/h2-tests/tests/flow_control.rs | 15 ++++++--------- 10 files changed, 38 insertions(+), 32 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 390a1576f..b6668993d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,9 +45,9 @@ members = [ futures-core = { version = "0.3", default-features = false } futures-sink = { version = "0.3", default-features = false } futures-util = { version = "0.3", default-features = false } -tokio-util = { version = "0.5", features = ["codec"] } -tokio = { version = "0.3.4", features = ["io-util"] } -bytes = "0.6" +tokio-util = { version = "0.6", features = ["codec"] } +tokio = { version = "1", features = ["io-util"] } +bytes = "1" http = "0.2" tracing = { version = "0.1.13", default-features = false, features = ["std"] } tracing-futures = { version = "0.2", default-features = false, features = ["std-future"]} @@ -68,9 +68,9 @@ serde = "1.0.0" serde_json = "1.0.0" # Examples -tokio = { version = "0.3.4", features = ["rt-multi-thread", "macros", "sync", "net"] } +tokio = { version = "1", features = ["rt-multi-thread", "macros", "sync", "net"] } env_logger = { version = "0.5.3", default-features = false } -rustls = "0.18" -tokio-rustls = "0.20.0" -webpki = "0.21" -webpki-roots = "0.17" +#rustls = "0.18" +#tokio-rustls = "0.20.0" +#webpki = "0.21" +#webpki-roots = "0.17" diff --git a/examples/akamai.rs b/examples/akamai.rs index 29d8a9347..ebd09ad0d 100644 --- a/examples/akamai.rs +++ b/examples/akamai.rs @@ -1,3 +1,7 @@ +fn main() { + eprintln!("TODO: Re-enable when tokio-rustls is upgraded."); +} +/* use h2::client; use http::{Method, Request}; use tokio::net::TcpStream; @@ -73,3 +77,4 @@ pub async fn main() -> Result<(), Box> { } Ok(()) } +*/ diff --git a/src/codec/framed_write.rs b/src/codec/framed_write.rs index e2151d660..8ec2045ce 100644 --- a/src/codec/framed_write.rs +++ b/src/codec/framed_write.rs @@ -202,10 +202,10 @@ where // could just use `poll_write_buf`... let n = if self.is_write_vectored { let mut bufs = [IoSlice::new(&[]); MAX_IOVS]; - let cnt = buf.bytes_vectored(&mut bufs); + let cnt = buf.chunks_vectored(&mut bufs); ready!(Pin::new(&mut self.inner).poll_write_vectored(cx, &bufs[..cnt]))? } else { - ready!(Pin::new(&mut self.inner).poll_write(cx, buf.bytes()))? + ready!(Pin::new(&mut self.inner).poll_write(cx, buf.chunk()))? }; buf.advance(n); } @@ -213,12 +213,12 @@ where tracing::trace!(queued_data_frame = false); let n = if self.is_write_vectored { let mut iovs = [IoSlice::new(&[]); MAX_IOVS]; - let cnt = self.buf.bytes_vectored(&mut iovs); + let cnt = self.buf.chunks_vectored(&mut iovs); ready!( Pin::new(&mut self.inner).poll_write_vectored(cx, &mut iovs[..cnt]) )? } else { - ready!(Pin::new(&mut self.inner).poll_write(cx, &mut self.buf.bytes()))? + ready!(Pin::new(&mut self.inner).poll_write(cx, &mut self.buf.chunk()))? }; self.buf.advance(n); } diff --git a/src/hpack/decoder.rs b/src/hpack/decoder.rs index aba673d37..39afc8ad1 100644 --- a/src/hpack/decoder.rs +++ b/src/hpack/decoder.rs @@ -311,7 +311,7 @@ impl Decoder { if huff { let ret = { - let raw = &buf.bytes()[..len]; + let raw = &buf.chunk()[..len]; huffman::decode(raw, &mut self.buffer).map(BytesMut::freeze) }; @@ -419,7 +419,7 @@ fn decode_int(buf: &mut B, prefix_size: u8) -> Result(buf: &mut B) -> Option { if buf.has_remaining() { - Some(buf.bytes()[0]) + Some(buf.chunk()[0]) } else { None } diff --git a/src/proto/streams/prioritize.rs b/src/proto/streams/prioritize.rs index 96b65d7ad..b7b616fac 100644 --- a/src/proto/streams/prioritize.rs +++ b/src/proto/streams/prioritize.rs @@ -847,8 +847,12 @@ where self.inner.remaining() } - fn bytes(&self) -> &[u8] { - self.inner.bytes() + fn chunk(&self) -> &[u8] { + self.inner.chunk() + } + + fn chunks_vectored<'a>(&'a self, dst: &mut [std::io::IoSlice<'a>]) -> usize { + self.inner.chunks_vectored(dst) } fn advance(&mut self, cnt: usize) { diff --git a/tests/h2-fuzz/Cargo.toml b/tests/h2-fuzz/Cargo.toml index d76a8f609..7fbf4c3f3 100644 --- a/tests/h2-fuzz/Cargo.toml +++ b/tests/h2-fuzz/Cargo.toml @@ -11,5 +11,5 @@ h2 = { path = "../.." } env_logger = { version = "0.5.3", default-features = false } futures = { version = "0.3", default-features = false, features = ["std"] } honggfuzz = "0.5" -http = { git = "https://github.com/paolobarbolini/http.git", branch = "bytes06" } -tokio = { version = "0.3.2", features = [] } +http = "0.2" +tokio = "1" diff --git a/tests/h2-support/Cargo.toml b/tests/h2-support/Cargo.toml index c441ded9c..e97c6b310 100644 --- a/tests/h2-support/Cargo.toml +++ b/tests/h2-support/Cargo.toml @@ -7,10 +7,10 @@ edition = "2018" [dependencies] h2 = { path = "../..", features = ["stream", "unstable"] } -bytes = "0.6" +bytes = "1" tracing = "0.1" tracing-subscriber = { version = "0.2", default-features = false, features = ["fmt", "chrono", "ansi"] } futures = { version = "0.3", default-features = false } http = "0.2" -tokio = { version = "0.3.2", features = ["time"] } -tokio-test = "0.3" +tokio = { version = "1", features = ["time"] } +tokio-test = "0.4" diff --git a/tests/h2-support/src/mock.rs b/tests/h2-support/src/mock.rs index ebfc094c1..4f81de239 100644 --- a/tests/h2-support/src/mock.rs +++ b/tests/h2-support/src/mock.rs @@ -147,7 +147,7 @@ impl Handle { poll_fn(move |cx| { while buf.has_remaining() { let res = Pin::new(self.codec.get_mut()) - .poll_write(cx, &mut buf.bytes()) + .poll_write(cx, &mut buf.chunk()) .map_err(|e| panic!("write err={:?}", e)); let n = ready!(res).unwrap(); diff --git a/tests/h2-tests/Cargo.toml b/tests/h2-tests/Cargo.toml index ac16043b3..33436f3c4 100644 --- a/tests/h2-tests/Cargo.toml +++ b/tests/h2-tests/Cargo.toml @@ -11,4 +11,4 @@ edition = "2018" h2-support = { path = "../h2-support" } tracing = "0.1.13" futures = { version = "0.3", default-features = false, features = ["alloc"] } -tokio = { version = "0.3.2", features = ["macros", "net", "rt", "io-util"] } +tokio = { version = "1", features = ["macros", "net", "rt", "io-util"] } diff --git a/tests/h2-tests/tests/flow_control.rs b/tests/h2-tests/tests/flow_control.rs index 08019bbae..1b86cadb2 100644 --- a/tests/h2-tests/tests/flow_control.rs +++ b/tests/h2-tests/tests/flow_control.rs @@ -940,7 +940,6 @@ async fn recv_no_init_window_then_receive_some_init_window() { #[tokio::test] async fn settings_lowered_capacity_returns_capacity_to_connection() { use futures::channel::oneshot; - use futures::future::{select, Either}; h2_support::trace_init!(); let (io, mut srv) = mock::new(); @@ -972,10 +971,9 @@ async fn settings_lowered_capacity_returns_capacity_to_connection() { // // A timeout is used here to avoid blocking forever if there is a // failure - let result = select(rx2, tokio::time::sleep(Duration::from_secs(5))).await; - if let Either::Right((_, _)) = result { - panic!("Timed out"); - } + let _ = tokio::time::timeout(Duration::from_secs(5), rx2) + .await + .unwrap(); idle_ms(500).await; @@ -1004,10 +1002,9 @@ async fn settings_lowered_capacity_returns_capacity_to_connection() { }); // Wait for server handshake to complete. - let result = select(rx1, tokio::time::sleep(Duration::from_secs(5))).await; - if let Either::Right((_, _)) = result { - panic!("Timed out"); - } + let _ = tokio::time::timeout(Duration::from_secs(5), rx1) + .await + .unwrap(); let request = Request::post("https://example.com/one").body(()).unwrap();