Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to Tokio 1.0 #753

Merged
merged 36 commits into from
Jan 15, 2021
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
e3c4c70
Upgrade to Tokio 0.3
janpetschexain Nov 20, 2020
fc71e96
Fix uds test
aknuds1 Nov 29, 2020
610f3dc
Merge remote-tracking branch 'seanmonstar/master' into chore/upgrade-…
aknuds1 Dec 6, 2020
d4c9344
Merge branch 'master' into chore/upgrade-tokio
jxs Dec 9, 2020
6308e90
Switch to tokio-tungstenite v0.12
aknuds1 Dec 10, 2020
2de7e08
Upgrade to tokio 1.0
aknuds1 Dec 24, 2020
305c888
Depend on forked tokio-tungstenite
aknuds1 Dec 24, 2020
b476d81
Upgrade tokio-tungstenite
aknuds1 Dec 24, 2020
fa8a9e8
Fix build failures
aknuds1 Dec 24, 2020
5d068ae
Update src/filters/body.rs
aknuds1 Dec 24, 2020
f242193
Fix build failures
aknuds1 Dec 24, 2020
3f038da
Update src/test.rs
aknuds1 Dec 24, 2020
2dd9bcf
Switch to async-compression 0.3.7
aknuds1 Dec 24, 2020
7bfc760
Fix build failures
aknuds1 Dec 24, 2020
9722b25
Fix build failures
aknuds1 Dec 24, 2020
7f97567
Upgrade to tokio 1.0
aknuds1 Dec 25, 2020
127fff2
Fix Unix socket example
aknuds1 Dec 25, 2020
79df1c5
Fix test
aknuds1 Dec 25, 2020
2df48ca
Fix websocket upgrading
aknuds1 Dec 25, 2020
f6508fa
Use poll_read_buf from tokio-util
aknuds1 Dec 26, 2020
3c1d0ad
Move dependency in Cargo.toml
aknuds1 Dec 28, 2020
2f2206c
Fix tokio-tungstenite dependency
aknuds1 Dec 31, 2020
a9f2885
Use tokio-stream wrapper types
paolobarbolini Jan 5, 2021
7ff37e3
Merge pull request #2 from paolobarbolini/tokio-stream
aknuds1 Jan 5, 2021
f98e363
Add route at `/` in routing example (#771)
ranile Jan 2, 2021
6326d05
Fix typo in routing example (#772)
teenjuna Jan 3, 2021
b9ef973
Merge remote-tracking branch 'seanmonstar/master' into chore/upgrade-…
aknuds1 Jan 7, 2021
33ede73
Upgrade to latest tokio-tungstenite
aknuds1 Jan 9, 2021
e50c375
Use ReaderStream instead of FramedRead
aknuds1 Jan 10, 2021
d8bfb02
Fix style
aknuds1 Jan 10, 2021
e989869
Merge remote-tracking branch 'seanmonstar/master' into chore/upgrade-…
aknuds1 Jan 12, 2021
4d92fa8
CI: Remove net job
aknuds1 Jan 12, 2021
657f1c6
Remove client feature for non-dev hyper dependency
aknuds1 Jan 12, 2021
bb2f511
Simplify websocket filter
aknuds1 Jan 12, 2021
5ac7a62
Reintroduce dependency on hyper's client feature
aknuds1 Jan 12, 2021
b095a6d
Merge remote-tracking branch 'seanmonstar/master' into chore/upgrade-…
aknuds1 Jan 15, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ jobs:
benches: true
- build: tls
features: "--features tls"
- build: uds
features: "--features tokio/uds"
- build: net
seanmonstar marked this conversation as resolved.
Show resolved Hide resolved
features: "--features tokio/net"
- build: no-default-features
features: "--no-default-features"
- build: compression
Expand Down
18 changes: 10 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ edition = "2018"
all-features = true

[dependencies]
async-compression = { version = "0.3.1", features = ["brotli", "deflate", "gzip", "stream"], optional = true }
bytes = "0.5"
async-compression = { version = "0.3.7", features = ["brotli", "deflate", "gzip", "tokio"], optional = true }
bytes = "1.0"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
headers = "0.3"
http = "0.2"
hyper = { version = "0.13", features = ["stream"] }
hyper = { version = "0.14", features = ["stream", "server", "http1", "http2", "tcp", "client"] }
seanmonstar marked this conversation as resolved.
Show resolved Hide resolved
log = "0.4"
mime = "0.3"
mime_guess = "2.0.0"
Expand All @@ -31,23 +31,26 @@ scoped-tls = "1.0"
serde = "1.0"
serde_json = "1.0"
serde_urlencoded = "0.7"
tokio = { version = "0.2", features = ["fs", "stream", "sync", "time"] }
tokio = { version = "1.0", features = ["fs", "sync", "time"] }
tokio-stream = "0.1.1"
tokio-util = { version = "0.6", features = ["io"] }
tracing = { version = "0.1", default-features = false, features = ["log", "std"] }
tracing-futures = { version = "0.2", default-features = false, features = ["std-future"] }
tower-service = "0.3"
# tls is enabled by default, we don't want that yet
tokio-tungstenite = { version = "0.11", default-features = false, optional = true }
tokio-tungstenite = { version = "0.13", default-features = false, optional = true }
percent-encoding = "2.1"
pin-project = "1.0"
tokio-rustls = { version = "0.14", optional = true }
tokio-rustls = { version = "0.22", optional = true }

[dev-dependencies]
pretty_env_logger = "0.4"
tracing-subscriber = "0.2.7"
tracing-log = "0.1"
serde_derive = "1.0"
handlebars = "3.0.0"
tokio = { version = "0.2", features = ["macros"] }
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
tokio-stream = { version = "0.1.1", features = ["net"] }
listenfd = "0.3"

[features]
Expand Down Expand Up @@ -78,7 +81,6 @@ required-features = ["compression"]

[[example]]
name = "unix_socket"
required-features = ["tokio/uds"]

[[example]]
name = "websockets"
Expand Down
2 changes: 1 addition & 1 deletion examples/futures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async fn main() {
}

async fn sleepy(Seconds(seconds): Seconds) -> Result<impl warp::Reply, Infallible> {
tokio::time::delay_for(Duration::from_secs(seconds)).await;
tokio::time::sleep(Duration::from_secs(seconds)).await;
Ok(format!("I waited {} seconds!", seconds))
}

Expand Down
5 changes: 4 additions & 1 deletion examples/sse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use futures::StreamExt;
use std::convert::Infallible;
use std::time::Duration;
use tokio::time::interval;
use tokio_stream::wrappers::IntervalStream;
use warp::{sse::ServerSentEvent, Filter};

// create server-sent event
Expand All @@ -16,7 +17,9 @@ async fn main() {
let routes = warp::path("ticks").and(warp::get()).map(|| {
let mut counter: u64 = 0;
// create server event source
let event_stream = interval(Duration::from_secs(1)).map(move |_| {
let interval = interval(Duration::from_secs(1));
let stream = IntervalStream::new(interval);
let event_stream = stream.map(move |_| {
counter += 1;
sse_counter(counter)
});
Expand Down
2 changes: 2 additions & 0 deletions examples/sse_chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::sync::{
Arc, Mutex,
};
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use warp::{sse::ServerSentEvent, Filter};

#[tokio::main]
Expand Down Expand Up @@ -86,6 +87,7 @@ fn user_connected(
// Use an unbounded channel to handle buffering and flushing of messages
// to the event source...
let (tx, rx) = mpsc::unbounded_channel();
let rx = UnboundedReceiverStream::new(rx);

tx.send(Message::UserId(my_id))
// rx is right above, so this cannot fail
Expand Down
5 changes: 3 additions & 2 deletions examples/unix_socket.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
#![deny(warnings)]

use tokio::net::UnixListener;
use tokio_stream::wrappers::UnixListenerStream;

#[tokio::main]
async fn main() {
pretty_env_logger::init();

let mut listener = UnixListener::bind("/tmp/warp.sock").unwrap();
let incoming = listener.incoming();
let listener = UnixListener::bind("/tmp/warp.sock").unwrap();
let incoming = UnixListenerStream::new(listener);
warp::serve(warp::fs::dir("examples/dir"))
.run_incoming(incoming)
.await;
Expand Down
2 changes: 2 additions & 0 deletions examples/websockets_chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::sync::{

use futures::{FutureExt, StreamExt};
use tokio::sync::{mpsc, RwLock};
use tokio_stream::wrappers::UnboundedReceiverStream;
use warp::ws::{Message, WebSocket};
use warp::Filter;

Expand Down Expand Up @@ -59,6 +60,7 @@ async fn user_connected(ws: WebSocket, users: Users) {
// Use an unbounded channel to handle buffering and flushing of messages
// to the websocket...
let (tx, rx) = mpsc::unbounded_channel();
let rx = UnboundedReceiverStream::new(rx);
tokio::task::spawn(rx.forward(user_ws_tx).map(|result| {
if let Err(e) = result {
eprintln!("websocket send error: {}", e);
Expand Down
8 changes: 4 additions & 4 deletions src/filters/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::fmt;
use std::pin::Pin;
use std::task::{Context, Poll};

use bytes::{buf::BufExt, Buf, Bytes};
use bytes::{Buf, Bytes};
use futures::{future, ready, Stream, TryFutureExt};
use headers::ContentLength;
use http::header::CONTENT_TYPE;
Expand Down Expand Up @@ -131,8 +131,8 @@ pub fn bytes() -> impl Filter<Extract = (Bytes,), Error = Rejection> + Copy {
/// fn full_body(mut body: impl Buf) {
/// // It could have several non-contiguous slices of memory...
/// while body.has_remaining() {
/// println!("slice = {:?}", body.bytes());
/// let cnt = body.bytes().len();
/// println!("slice = {:?}", body.chunk());
/// let cnt = body.chunk().len();
/// body.advance(cnt);
/// }
/// }
Expand Down Expand Up @@ -232,7 +232,7 @@ impl Decode for Json {
const WITH_NO_CONTENT_TYPE: bool = true;

fn decode<B: Buf, T: DeserializeOwned>(mut buf: B) -> Result<T, BoxError> {
serde_json::from_slice(&buf.to_bytes()).map_err(Into::into)
serde_json::from_slice(&buf.copy_to_bytes(buf.remaining())).map_err(Into::into)
}
}

Expand Down
19 changes: 15 additions & 4 deletions src/filters/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@
//!
//! Filters that compress the body of a response.

use async_compression::stream::{BrotliEncoder, DeflateEncoder, GzipEncoder};
use async_compression::tokio::bufread::{BrotliEncoder, DeflateEncoder, GzipEncoder};
use http::header::HeaderValue;
use hyper::{
header::{CONTENT_ENCODING, CONTENT_LENGTH},
Body,
};
use tokio_util::codec::{BytesCodec, FramedRead};
use tokio_util::io::StreamReader;

use crate::filter::{Filter, WrapSealed};
use crate::reject::IsReject;
Expand Down Expand Up @@ -56,7 +58,10 @@ pub struct Compression<F> {
/// ```
pub fn gzip() -> Compression<impl Fn(CompressionProps) -> Response + Copy> {
let func = move |mut props: CompressionProps| {
let body = Body::wrap_stream(GzipEncoder::new(props.body));
let body = Body::wrap_stream(FramedRead::new(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did you prefer FramedRead over ReaderStream?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jxs To be honest, I don't remember exactly, but it was probably from an example. Do you think ReaderStream is preferable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @paolobarbolini, I can't bring myself to remember where I found this solution.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @paolobarbolini. @aknuds1 I asked because I was not sure either, Previously FrameReader was the way to achieve AsyncRead to Stream conversion, but since ReaderStream was introduced I would maybe suggest ReaderStream as it's simpler, and recommended for this cases on tokio-util doc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@seanmonstar Do you have any opinion on this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I converted from FramedRead into ReaderStream, ass @jxs recommended. I don't really know personally if there are any practical differences, but I can tell that the API is simpler (a second argument to FramedRead::new can be dropped). I trust @jxs knows more about this than I do, and I'll let @seanmonstar be the judge.

GzipEncoder::new(StreamReader::new(props.body)),
BytesCodec::new(),
));
props
.head
.headers
Expand All @@ -82,7 +87,10 @@ pub fn gzip() -> Compression<impl Fn(CompressionProps) -> Response + Copy> {
/// ```
pub fn deflate() -> Compression<impl Fn(CompressionProps) -> Response + Copy> {
let func = move |mut props: CompressionProps| {
let body = Body::wrap_stream(DeflateEncoder::new(props.body));
let body = Body::wrap_stream(FramedRead::new(
DeflateEncoder::new(StreamReader::new(props.body)),
BytesCodec::new(),
));
props
.head
.headers
Expand All @@ -108,7 +116,10 @@ pub fn deflate() -> Compression<impl Fn(CompressionProps) -> Response + Copy> {
/// ```
pub fn brotli() -> Compression<impl Fn(CompressionProps) -> Response + Copy> {
let func = move |mut props: CompressionProps| {
let body = Body::wrap_stream(BrotliEncoder::new(props.body));
let body = Body::wrap_stream(FramedRead::new(
BrotliEncoder::new(StreamReader::new(props.body)),
BytesCodec::new(),
));
props
.head
.headers
Expand Down
5 changes: 3 additions & 2 deletions src/filters/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use hyper::Body;
use mime_guess;
use percent_encoding::percent_decode_str;
use tokio::fs::File as TkFile;
use tokio::io::AsyncRead;
use tokio::io::AsyncSeekExt;
use tokio_util::io::poll_read_buf;

use crate::filter::{Filter, FilterClone, One};
use crate::reject::{self, Rejection};
Expand Down Expand Up @@ -419,7 +420,7 @@ fn file_stream(
}
reserve_at_least(&mut buf, buf_size);

let n = match ready!(Pin::new(&mut f).poll_read_buf(cx, &mut buf)) {
let n = match ready!(poll_read_buf(Pin::new(&mut f), cx, &mut buf)) {
Ok(n) => n as u64,
Err(err) => {
tracing::debug!("file read error: {}", err);
Expand Down
14 changes: 9 additions & 5 deletions src/filters/sse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use hyper::Body;
use pin_project::pin_project;
use serde::Serialize;
use serde_json;
use tokio::time::{self, Delay};
use tokio::time::{self, Sleep};

use self::sealed::{
BoxedServerSentEvent, EitherServerSentEvent, SseError, SseField, SseFormat, SseWrapper,
Expand Down Expand Up @@ -467,7 +467,7 @@ impl KeepAlive {
S::Ok: ServerSentEvent + Send,
S::Error: StdError + Send + Sync + 'static,
{
let alive_timer = time::delay_for(self.max_interval);
let alive_timer = time::sleep(self.max_interval);
SseKeepAlive {
event_stream,
comment_text: self.comment_text,
Expand All @@ -484,7 +484,8 @@ struct SseKeepAlive<S> {
event_stream: S,
comment_text: Cow<'static, str>,
max_interval: Duration,
alive_timer: Delay,
#[pin]
alive_timer: Sleep,
}

#[doc(hidden)]
Expand All @@ -505,7 +506,7 @@ where
let max_interval = keep_interval
.into()
.unwrap_or_else(|| Duration::from_secs(15));
let alive_timer = time::delay_for(max_interval);
let alive_timer = time::sleep(max_interval);
SseKeepAlive {
event_stream,
comment_text: Cow::Borrowed(""),
Expand All @@ -529,6 +530,7 @@ where
/// use std::convert::Infallible;
/// use futures::StreamExt;
/// use tokio::time::interval;
/// use tokio_stream::wrappers::IntervalStream;
/// use warp::{Filter, Stream, sse::ServerSentEvent};
///
/// // create server-sent event
Expand All @@ -541,7 +543,9 @@ where
/// .and(warp::get())
/// .map(|| {
/// let mut counter: u64 = 0;
/// let event_stream = interval(Duration::from_secs(15)).map(move |_| {
/// let interval = interval(Duration::from_secs(15));
/// let stream = IntervalStream::new(interval);
/// let event_stream = stream.map(move |_| {
/// counter += 1;
/// sse_counter(counter)
/// });
Expand Down
32 changes: 22 additions & 10 deletions src/filters/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ use std::pin::Pin;
use std::task::{Context, Poll};

use super::{body, header};
use crate::filter::{Filter, One};
use crate::filter::{filter_fn_one, Filter, One};
use crate::reject::Rejection;
use crate::reply::{Reply, Response};
use futures::{future, ready, FutureExt, Sink, Stream, TryFutureExt};
use headers::{Connection, HeaderMapExt, SecWebsocketAccept, SecWebsocketKey, Upgrade};
use http;
use hyper::upgrade::OnUpgrade;
use tokio_tungstenite::{
tungstenite::protocol::{self, WebSocketConfig},
WebSocketStream,
Expand Down Expand Up @@ -58,18 +59,23 @@ pub fn ws() -> impl Filter<Extract = One<Ws>, Error = Rejection> + Copy {
//.and(header::exact2(SecWebsocketVersion::V13))
.and(header::header2::<SecWebsocketKey>())
.and(body::body())
.map(move |key: SecWebsocketKey, body: ::hyper::Body| Ws {
body,
config: None,
key,
})
.and(on_upgrade())
.map(
move |key: SecWebsocketKey, body: ::hyper::Body, on_upgrade: Option<OnUpgrade>| Ws {
body,
config: None,
key,
on_upgrade,
},
)
}

/// Extracted by the [`ws`](ws) filter, and used to finish an upgrade.
pub struct Ws {
body: ::hyper::Body,
config: Option<WebSocketConfig>,
key: SecWebsocketKey,
on_upgrade: Option<OnUpgrade>,
}

impl Ws {
Expand Down Expand Up @@ -134,10 +140,11 @@ where
fn into_response(self) -> Response {
let on_upgrade = self.on_upgrade;
let config = self.ws.config;
let fut = self
.ws
.body
.on_upgrade()
let mut resp = Response::new(self.ws.body);
if let Some(on_upgrade) = self.ws.on_upgrade {
resp.extensions_mut().insert(on_upgrade);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you already have the extension, you don't need to insert it here to remove it again. (on_upgrade is the same as fut).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@seanmonstar I think I see what you mean, and try to simplify it to the best of my ability. Please have a look.

}
let fut = hyper::upgrade::on(resp)
.and_then(move |upgraded| {
tracing::trace!("websocket upgrade complete");
WebSocket::from_raw_socket(upgraded, protocol::Role::Server, config).map(Ok)
Expand All @@ -163,6 +170,11 @@ where
}
}

// Extracts OnUpgrade state from the route.
fn on_upgrade() -> impl Filter<Extract = (Option<OnUpgrade>,), Error = Rejection> + Copy {
filter_fn_one(|route| future::ready(Ok(route.extensions_mut().remove::<OnUpgrade>())))
}

/// A websocket `Stream` and `Sink`, provided to `ws` filters.
///
/// Ping messages sent from the client will be handled internally by replying with a Pong message.
Expand Down
3 changes: 1 addition & 2 deletions src/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,10 @@ impl Route {
self.req.extensions()
}

/*
#[cfg(feature = "websocket")]
pub(crate) fn extensions_mut(&mut self) -> &mut http::Extensions {
self.req.extensions_mut()
}
*/

pub(crate) fn uri(&self) -> &http::Uri {
self.req.uri()
Expand Down
Loading