Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to Tokio 1.0 #753

Merged
merged 36 commits into from
Jan 15, 2021
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
e3c4c70
Upgrade to Tokio 0.3
janpetschexain Nov 20, 2020
fc71e96
Fix uds test
aknuds1 Nov 29, 2020
610f3dc
Merge remote-tracking branch 'seanmonstar/master' into chore/upgrade-…
aknuds1 Dec 6, 2020
d4c9344
Merge branch 'master' into chore/upgrade-tokio
jxs Dec 9, 2020
6308e90
Switch to tokio-tungstenite v0.12
aknuds1 Dec 10, 2020
2de7e08
Upgrade to tokio 1.0
aknuds1 Dec 24, 2020
305c888
Depend on forked tokio-tungstenite
aknuds1 Dec 24, 2020
b476d81
Upgrade tokio-tungstenite
aknuds1 Dec 24, 2020
fa8a9e8
Fix build failures
aknuds1 Dec 24, 2020
5d068ae
Update src/filters/body.rs
aknuds1 Dec 24, 2020
f242193
Fix build failures
aknuds1 Dec 24, 2020
3f038da
Update src/test.rs
aknuds1 Dec 24, 2020
2dd9bcf
Switch to async-compression 0.3.7
aknuds1 Dec 24, 2020
7bfc760
Fix build failures
aknuds1 Dec 24, 2020
9722b25
Fix build failures
aknuds1 Dec 24, 2020
7f97567
Upgrade to tokio 1.0
aknuds1 Dec 25, 2020
127fff2
Fix Unix socket example
aknuds1 Dec 25, 2020
79df1c5
Fix test
aknuds1 Dec 25, 2020
2df48ca
Fix websocket upgrading
aknuds1 Dec 25, 2020
f6508fa
Use poll_read_buf from tokio-util
aknuds1 Dec 26, 2020
3c1d0ad
Move dependency in Cargo.toml
aknuds1 Dec 28, 2020
2f2206c
Fix tokio-tungstenite dependency
aknuds1 Dec 31, 2020
a9f2885
Use tokio-stream wrapper types
paolobarbolini Jan 5, 2021
7ff37e3
Merge pull request #2 from paolobarbolini/tokio-stream
aknuds1 Jan 5, 2021
f98e363
Add route at `/` in routing example (#771)
ranile Jan 2, 2021
6326d05
Fix typo in routing example (#772)
teenjuna Jan 3, 2021
b9ef973
Merge remote-tracking branch 'seanmonstar/master' into chore/upgrade-…
aknuds1 Jan 7, 2021
33ede73
Upgrade to latest tokio-tungstenite
aknuds1 Jan 9, 2021
e50c375
Use ReaderStream instead of FramedRead
aknuds1 Jan 10, 2021
d8bfb02
Fix style
aknuds1 Jan 10, 2021
e989869
Merge remote-tracking branch 'seanmonstar/master' into chore/upgrade-…
aknuds1 Jan 12, 2021
4d92fa8
CI: Remove net job
aknuds1 Jan 12, 2021
657f1c6
Remove client feature for non-dev hyper dependency
aknuds1 Jan 12, 2021
bb2f511
Simplify websocket filter
aknuds1 Jan 12, 2021
5ac7a62
Reintroduce dependency on hyper's client feature
aknuds1 Jan 12, 2021
b095a6d
Merge remote-tracking branch 'seanmonstar/master' into chore/upgrade-…
aknuds1 Jan 15, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ jobs:
benches: true
- build: tls
features: "--features tls"
- build: uds
features: "--features tokio/uds"
- build: net
seanmonstar marked this conversation as resolved.
Show resolved Hide resolved
features: "--features tokio/net"
- build: no-default-features
features: "--no-default-features"
- build: compression
Expand Down
17 changes: 9 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ edition = "2018"
all-features = true

[dependencies]
async-compression = { version = "0.3.1", features = ["brotli", "deflate", "gzip", "stream"], optional = true }
bytes = "0.5"
async-compression = { version = "0.3.7", features = ["brotli", "deflate", "gzip", "tokio"], optional = true }
bytes = "1.0"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
headers = "0.3"
http = "0.2"
hyper = { version = "0.13", features = ["stream"] }
hyper = { version = "0.14", features = ["stream", "server", "http1", "http2", "tcp", "client"] }
seanmonstar marked this conversation as resolved.
Show resolved Hide resolved
log = "0.4"
mime = "0.3"
mime_guess = "2.0.0"
Expand All @@ -31,23 +31,25 @@ scoped-tls = "1.0"
serde = "1.0"
serde_json = "1.0"
serde_urlencoded = "0.7"
tokio = { version = "0.2", features = ["fs", "stream", "sync", "time"] }
tokio = { version = "1.0", features = ["fs", "sync", "time"] }
tokio-util = { version = "0.6", features = ["io"] }
tracing = { version = "0.1", default-features = false, features = ["log", "std"] }
tracing-futures = { version = "0.2", default-features = false, features = ["std-future"] }
tower-service = "0.3"
# tls is enabled by default, we don't want that yet
tokio-tungstenite = { version = "0.11", default-features = false, optional = true }
tokio-tungstenite = { git = "https://github.com/aknuds1/tokio-tungstenite.git", rev = "21c3d35b5ef91c2092dd67f6f2ada71d81c4b544", default-features = false, optional = true }
percent-encoding = "2.1"
pin-project = "1.0"
tokio-rustls = { version = "0.14", optional = true }
tokio-rustls = { version = "0.22", optional = true }
async-stream = "0.3"
aknuds1 marked this conversation as resolved.
Show resolved Hide resolved

[dev-dependencies]
pretty_env_logger = "0.4"
tracing-subscriber = "0.2.7"
tracing-log = "0.1"
serde_derive = "1.0"
handlebars = "3.0.0"
tokio = { version = "0.2", features = ["macros"] }
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
listenfd = "0.3"

[features]
Expand Down Expand Up @@ -78,7 +80,6 @@ required-features = ["compression"]

[[example]]
name = "unix_socket"
required-features = ["tokio/uds"]

[[example]]
name = "websockets"
Expand Down
2 changes: 1 addition & 1 deletion examples/futures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async fn main() {
}

async fn sleepy(Seconds(seconds): Seconds) -> Result<impl warp::Reply, Infallible> {
tokio::time::delay_for(Duration::from_secs(seconds)).await;
tokio::time::sleep(Duration::from_secs(seconds)).await;
Ok(format!("I waited {} seconds!", seconds))
}

Expand Down
9 changes: 8 additions & 1 deletion examples/sse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::convert::Infallible;
use std::time::Duration;
use tokio::time::interval;
use warp::{sse::ServerSentEvent, Filter};
use async_stream::stream;

// create server-sent event
fn sse_counter(counter: u64) -> Result<impl ServerSentEvent, Infallible> {
Expand All @@ -16,7 +17,13 @@ async fn main() {
let routes = warp::path("ticks").and(warp::get()).map(|| {
let mut counter: u64 = 0;
// create server event source
let event_stream = interval(Duration::from_secs(1)).map(move |_| {
let mut interval = interval(Duration::from_secs(1));
let stream = stream! {
while let item = interval.tick().await {
yield item;
}
};
let event_stream = stream.map(move |_| {
counter += 1;
sse_counter(counter)
});
Expand Down
3 changes: 2 additions & 1 deletion examples/sse_chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::sync::{
};
use tokio::sync::mpsc;
use warp::{sse::ServerSentEvent, Filter};
use warp::test;

#[tokio::main]
async fn main() {
Expand Down Expand Up @@ -85,7 +86,7 @@ fn user_connected(

// Use an unbounded channel to handle buffering and flushing of messages
// to the event source...
let (tx, rx) = mpsc::unbounded_channel();
let (tx, rx) = test::unbounded_channel_stream();

tx.send(Message::UserId(my_id))
// rx is right above, so this cannot fail
Expand Down
2 changes: 1 addition & 1 deletion examples/unix_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use tokio::net::UnixListener;
async fn main() {
pretty_env_logger::init();

let mut listener = UnixListener::bind("/tmp/warp.sock").unwrap();
let listener = UnixListener::bind("/tmp/warp.sock").unwrap();
let incoming = listener.incoming();
warp::serve(warp::fs::dir("examples/dir"))
.run_incoming(incoming)
Expand Down
3 changes: 2 additions & 1 deletion examples/websockets_chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use futures::{FutureExt, StreamExt};
use tokio::sync::{mpsc, RwLock};
use warp::ws::{Message, WebSocket};
use warp::Filter;
use warp::test;

/// Our global unique user id counter.
static NEXT_USER_ID: AtomicUsize = AtomicUsize::new(1);
Expand Down Expand Up @@ -58,7 +59,7 @@ async fn user_connected(ws: WebSocket, users: Users) {

// Use an unbounded channel to handle buffering and flushing of messages
// to the websocket...
let (tx, rx) = mpsc::unbounded_channel();
let (tx, rx) = test::unbounded_channel_stream();
tokio::task::spawn(rx.forward(user_ws_tx).map(|result| {
if let Err(e) = result {
eprintln!("websocket send error: {}", e);
Expand Down
4 changes: 2 additions & 2 deletions src/filters/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::fmt;
use std::pin::Pin;
use std::task::{Context, Poll};

use bytes::{buf::BufExt, Buf, Bytes};
use bytes::{Buf, Bytes};
use futures::{future, ready, Stream, TryFutureExt};
use headers::ContentLength;
use http::header::CONTENT_TYPE;
Expand Down Expand Up @@ -232,7 +232,7 @@ impl Decode for Json {
const WITH_NO_CONTENT_TYPE: bool = true;

fn decode<B: Buf, T: DeserializeOwned>(mut buf: B) -> Result<T, BoxError> {
serde_json::from_slice(&buf.to_bytes()).map_err(Into::into)
serde_json::from_slice(&buf.copy_to_bytes(buf.remaining())).map_err(Into::into)
}
}

Expand Down
10 changes: 6 additions & 4 deletions src/filters/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
//!
//! Filters that compress the body of a response.

use async_compression::stream::{BrotliEncoder, DeflateEncoder, GzipEncoder};
use async_compression::tokio::bufread::{BrotliEncoder, DeflateEncoder, GzipEncoder};
use http::header::HeaderValue;
use hyper::{
header::{CONTENT_ENCODING, CONTENT_LENGTH},
Body,
};
use tokio_util::io::StreamReader;
use tokio_util::codec::{BytesCodec, FramedRead};

use crate::filter::{Filter, WrapSealed};
use crate::reject::IsReject;
Expand Down Expand Up @@ -56,7 +58,7 @@ pub struct Compression<F> {
/// ```
pub fn gzip() -> Compression<impl Fn(CompressionProps) -> Response + Copy> {
let func = move |mut props: CompressionProps| {
let body = Body::wrap_stream(GzipEncoder::new(props.body));
let body = Body::wrap_stream(FramedRead::new(GzipEncoder::new(StreamReader::new(props.body)), BytesCodec::new()));
props
.head
.headers
Expand All @@ -82,7 +84,7 @@ pub fn gzip() -> Compression<impl Fn(CompressionProps) -> Response + Copy> {
/// ```
pub fn deflate() -> Compression<impl Fn(CompressionProps) -> Response + Copy> {
let func = move |mut props: CompressionProps| {
let body = Body::wrap_stream(DeflateEncoder::new(props.body));
let body = Body::wrap_stream(FramedRead::new(DeflateEncoder::new(StreamReader::new(props.body)), BytesCodec::new()));
props
.head
.headers
Expand All @@ -108,7 +110,7 @@ pub fn deflate() -> Compression<impl Fn(CompressionProps) -> Response + Copy> {
/// ```
pub fn brotli() -> Compression<impl Fn(CompressionProps) -> Response + Copy> {
let func = move |mut props: CompressionProps| {
let body = Body::wrap_stream(BrotliEncoder::new(props.body));
let body = Body::wrap_stream(FramedRead::new(BrotliEncoder::new(StreamReader::new(props.body)), BytesCodec::new()));
props
.head
.headers
Expand Down
39 changes: 35 additions & 4 deletions src/filters/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ use std::convert::Infallible;
use std::fs::Metadata;
use std::future::Future;
use std::io;
use std::mem::MaybeUninit;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::Arc;
use std::task::Poll;
use std::task::{Context, Poll};

use bytes::{Bytes, BytesMut};
use bytes::{BufMut, Bytes, BytesMut};
use futures::future::Either;
use futures::{future, ready, stream, FutureExt, Stream, StreamExt, TryFutureExt};
use headers::{
Expand All @@ -22,7 +23,7 @@ use hyper::Body;
use mime_guess;
use percent_encoding::percent_decode_str;
use tokio::fs::File as TkFile;
use tokio::io::AsyncRead;
use tokio::io::{AsyncRead, AsyncSeekExt, ReadBuf};

use crate::filter::{Filter, FilterClone, One};
use crate::reject::{self, Rejection};
Expand Down Expand Up @@ -419,7 +420,7 @@ fn file_stream(
}
reserve_at_least(&mut buf, buf_size);

let n = match ready!(Pin::new(&mut f).poll_read_buf(cx, &mut buf)) {
let n = match ready!(poll_read_buf(Pin::new(&mut f), cx, &mut buf)) {
Ok(n) => n as u64,
Err(err) => {
tracing::debug!("file read error: {}", err);
Expand Down Expand Up @@ -524,3 +525,33 @@ mod tests {
assert_eq!(buf.capacity(), cap);
}
}

fn poll_read_buf<T: AsyncRead, B: BufMut>(
io: Pin<&mut T>,
cx: &mut Context<'_>,
buf: &mut B,
) -> Poll<io::Result<usize>> {
if !buf.has_remaining_mut() {
return Poll::Ready(Ok(0));
}

let n = {
let dst = buf.chunk_mut();
let dst = unsafe { &mut *(dst as *mut _ as *mut [MaybeUninit<u8>]) };
jxs marked this conversation as resolved.
Show resolved Hide resolved
let mut buf = ReadBuf::uninit(dst);
let ptr = buf.filled().as_ptr();
ready!(io.poll_read(cx, &mut buf)?);

// Ensure the pointer does not change from under us
assert_eq!(ptr, buf.filled().as_ptr());
buf.filled().len()
};

// Safety: This is guaranteed to be the number of initialized (and read)
// bytes due to the invariants provided by `ReadBuf::filled`.
unsafe {
buf.advance_mut(n);
}

Poll::Ready(Ok(n))
}
9 changes: 5 additions & 4 deletions src/filters/sse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use hyper::Body;
use pin_project::pin_project;
use serde::Serialize;
use serde_json;
use tokio::time::{self, Delay};
use tokio::time::{self, Sleep};

use self::sealed::{
BoxedServerSentEvent, EitherServerSentEvent, SseError, SseField, SseFormat, SseWrapper,
Expand Down Expand Up @@ -467,7 +467,7 @@ impl KeepAlive {
S::Ok: ServerSentEvent + Send,
S::Error: StdError + Send + Sync + 'static,
{
let alive_timer = time::delay_for(self.max_interval);
let alive_timer = time::sleep(self.max_interval);
SseKeepAlive {
event_stream,
comment_text: self.comment_text,
Expand All @@ -484,7 +484,8 @@ struct SseKeepAlive<S> {
event_stream: S,
comment_text: Cow<'static, str>,
max_interval: Duration,
alive_timer: Delay,
#[pin]
alive_timer: Sleep,
}

#[doc(hidden)]
Expand All @@ -505,7 +506,7 @@ where
let max_interval = keep_interval
.into()
.unwrap_or_else(|| Duration::from_secs(15));
let alive_timer = time::delay_for(max_interval);
let alive_timer = time::sleep(max_interval);
SseKeepAlive {
event_stream,
comment_text: Cow::Borrowed(""),
Expand Down
5 changes: 1 addition & 4 deletions src/filters/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,7 @@ where
fn into_response(self) -> Response {
let on_upgrade = self.on_upgrade;
let config = self.ws.config;
let fut = self
.ws
.body
.on_upgrade()
let fut = hyper::upgrade::on(Response::new(self.ws.body))
aknuds1 marked this conversation as resolved.
Show resolved Hide resolved
.and_then(move |upgraded| {
tracing::trace!("websocket upgrade complete");
WebSocket::from_raw_socket(upgraded, protocol::Role::Server, config).map(Ok)
Expand Down
30 changes: 21 additions & 9 deletions src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,17 +91,17 @@ use std::pin::Pin;
use std::task::{self, Poll};

use bytes::Bytes;
#[cfg(feature = "websocket")]
use futures::StreamExt;
use futures::{future, FutureExt, TryFutureExt};
use futures::{future, FutureExt, TryFutureExt, StreamExt};
use http::{
header::{HeaderName, HeaderValue},
Response,
};
use serde::Serialize;
use serde_json;
#[cfg(feature = "websocket")]
use tokio::sync::{mpsc, oneshot};
use tokio::sync::{oneshot};
use tokio::sync::mpsc::{self, UnboundedSender};
use async_stream::stream;

use crate::filter::Filter;
use crate::reject::IsReject;
Expand Down Expand Up @@ -405,6 +405,18 @@ impl RequestBuilder {
}
}

/// Get an unbounded channel with a stream for the reader.
pub fn unbounded_channel_stream<T: Unpin>() -> (UnboundedSender<T>, impl StreamExt<Item = T>) {
let (tx, mut rx) = mpsc::unbounded_channel();
let stream = stream! {
while let Some(item) = rx.recv().await {
yield item;
}
};

(tx, stream)
}

#[cfg(feature = "websocket")]
impl WsBuilder {
/// Sets the request path of this builder.
Expand Down Expand Up @@ -482,7 +494,7 @@ impl WsBuilder {
F::Error: IsReject + Send,
{
let (upgraded_tx, upgraded_rx) = oneshot::channel();
let (wr_tx, wr_rx) = mpsc::unbounded_channel();
let (wr_tx, wr_rx) = unbounded_channel_stream();
let (rd_tx, rd_rx) = mpsc::unbounded_channel();

tokio::spawn(async move {
Expand Down Expand Up @@ -515,7 +527,7 @@ impl WsBuilder {
let upgrade = ::hyper::Client::builder()
.build(AddrConnect(addr))
.request(req)
.and_then(|res| res.into_body().on_upgrade());
.and_then(|res| hyper::upgrade::on(res));

let upgraded = match upgrade.await {
Ok(up) => {
Expand Down Expand Up @@ -576,9 +588,9 @@ impl WsClient {
/// Receive a websocket message from the server.
pub async fn recv(&mut self) -> Result<crate::filters::ws::Message, WsError> {
self.rx
.next()
.recv()
.await
.map(|unbounded_result| unbounded_result.map_err(WsError::new))
.map(|result| result.map_err(WsError::new))
.unwrap_or_else(|| {
// websocket is closed
Err(WsError::new("closed"))
Expand All @@ -588,7 +600,7 @@ impl WsClient {
/// Assert the server has closed the connection.
pub async fn recv_closed(&mut self) -> Result<(), WsError> {
self.rx
.next()
.recv()
.await
.map(|result| match result {
Ok(msg) => Err(WsError::new(format!("received message: {:?}", msg))),
Expand Down
Loading