Skip to content

Commit

Permalink
Upgrade to Tokio 0.3
Browse files Browse the repository at this point in the history
Signed-off-by: Arve Knudsen <arve.knudsen@gmail.com>
  • Loading branch information
janpetschexain authored and aknuds1 committed Nov 29, 2020
1 parent 5d1f546 commit 5d2937c
Show file tree
Hide file tree
Showing 11 changed files with 193 additions and 34 deletions.
15 changes: 7 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ edition = "2018"
all-features = true

[dependencies]
async-compression = { version = "0.3.1", features = ["brotli", "deflate", "gzip", "stream"], optional = true }
bytes = "0.5"
async-compression = { git = "https://github.com/aknuds1/async-compression", rev = "e4d903b8ff9972f056f714c61bd9d0f3321a4463", features = ["brotli", "deflate", "gzip", "stream"], optional = true }
bytes = "0.6"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
headers = "0.3"
http = "0.2"
hyper = { version = "0.13", features = ["stream"] }
hyper = { git = "https://github.com/hyperium/hyper.git", rev = "1ba2a141a6f8736446ff4a0111df347c0dc66f6c", features = ["stream", "server", "http1", "http2", "tcp", "client"] }
log = "0.4"
mime = "0.3"
mime_guess = "2.0.0"
Expand All @@ -31,23 +31,23 @@ scoped-tls = "1.0"
serde = "1.0"
serde_json = "1.0"
serde_urlencoded = "0.7"
tokio = { version = "0.2", features = ["fs", "stream", "sync", "time"] }
tokio = { version = "0.3", features = ["fs", "stream", "sync", "time"] }
tracing = { version = "0.1", default-features = false, features = ["log", "std"] }
tracing-futures = { version = "0.2", default-features = false, features = ["std-future"] }
tower-service = "0.3"
# tls is enabled by default, we don't want that yet
tokio-tungstenite = { version = "0.11", default-features = false, optional = true }
tokio-tungstenite = { git = "https://github.com/snapview/tokio-tungstenite.git", rev = "71a5b72123db32b318d48964948fc76c943f1548", default-features = false, optional = true }
percent-encoding = "2.1"
pin-project = "1.0"
tokio-rustls = { version = "0.14", optional = true }
tokio-rustls = { version = "0.21", optional = true }

[dev-dependencies]
pretty_env_logger = "0.4"
tracing-subscriber = "0.2.7"
tracing-log = "0.1"
serde_derive = "1.0"
handlebars = "3.0.0"
tokio = { version = "0.2", features = ["macros"] }
tokio = { version = "0.3", features = ["macros", "rt-multi-thread"] }
listenfd = "0.3"

[features]
Expand Down Expand Up @@ -78,7 +78,6 @@ required-features = ["compression"]

[[example]]
name = "unix_socket"
required-features = ["tokio/uds"]

[[example]]
name = "websockets"
Expand Down
2 changes: 1 addition & 1 deletion examples/futures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async fn main() {
}

async fn sleepy(Seconds(seconds): Seconds) -> Result<impl warp::Reply, Infallible> {
tokio::time::delay_for(Duration::from_secs(seconds)).await;
tokio::time::sleep(Duration::from_secs(seconds)).await;
Ok(format!("I waited {} seconds!", seconds))
}

Expand Down
5 changes: 2 additions & 3 deletions examples/unix_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@ use tokio::net::UnixListener;
async fn main() {
pretty_env_logger::init();

let mut listener = UnixListener::bind("/tmp/warp.sock").unwrap();
let incoming = listener.incoming();
let listener = UnixListener::bind("/tmp/warp.sock").unwrap();
warp::serve(warp::fs::dir("examples/dir"))
.run_incoming(incoming)
.run_incoming(listener)
.await;
}
2 changes: 1 addition & 1 deletion src/filters/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::fmt;
use std::pin::Pin;
use std::task::{Context, Poll};

use bytes::{buf::BufExt, Buf, Bytes};
use bytes::{Buf, Bytes};
use futures::{future, ready, Stream, TryFutureExt};
use headers::ContentLength;
use http::header::CONTENT_TYPE;
Expand Down
39 changes: 35 additions & 4 deletions src/filters/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ use std::io;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::Arc;
use std::task::Poll;
use std::task::{Poll, Context};
use std::mem::MaybeUninit;

use bytes::{Bytes, BytesMut};
use bytes::{Bytes, BytesMut, BufMut};
use futures::future::Either;
use futures::{future, ready, stream, FutureExt, Stream, StreamExt, TryFutureExt};
use headers::{
Expand All @@ -22,7 +23,7 @@ use hyper::Body;
use mime_guess;
use percent_encoding::percent_decode_str;
use tokio::fs::File as TkFile;
use tokio::io::AsyncRead;
use tokio::io::{AsyncSeekExt, AsyncRead, ReadBuf};

use crate::filter::{Filter, FilterClone, One};
use crate::reject::{self, Rejection};
Expand Down Expand Up @@ -419,7 +420,7 @@ fn file_stream(
}
reserve_at_least(&mut buf, buf_size);

let n = match ready!(Pin::new(&mut f).poll_read_buf(cx, &mut buf)) {
let n = match ready!(poll_read_buf(Pin::new(&mut f), cx, &mut buf)) {
Ok(n) => n as u64,
Err(err) => {
tracing::debug!("file read error: {}", err);
Expand Down Expand Up @@ -524,3 +525,33 @@ mod tests {
assert_eq!(buf.capacity(), cap);
}
}

fn poll_read_buf<T: AsyncRead, B: BufMut>(
io: Pin<&mut T>,
cx: &mut Context<'_>,
buf: &mut B,
) -> Poll<io::Result<usize>> {
if !buf.has_remaining_mut() {
return Poll::Ready(Ok(0));
}

let n = {
let dst = buf.bytes_mut();
let dst = unsafe { &mut *(dst as *mut _ as *mut [MaybeUninit<u8>]) };
let mut buf = ReadBuf::uninit(dst);
let ptr = buf.filled().as_ptr();
ready!(io.poll_read(cx, &mut buf)?);

// Ensure the pointer does not change from under us
assert_eq!(ptr, buf.filled().as_ptr());
buf.filled().len()
};

// Safety: This is guaranteed to be the number of initialized (and read)
// bytes due to the invariants provided by `ReadBuf::filled`.
unsafe {
buf.advance_mut(n);
}

Poll::Ready(Ok(n))
}
8 changes: 4 additions & 4 deletions src/filters/sse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use hyper::Body;
use pin_project::pin_project;
use serde::Serialize;
use serde_json;
use tokio::time::{self, Delay};
use tokio::time::{self, Sleep};

use self::sealed::{
BoxedServerSentEvent, EitherServerSentEvent, SseError, SseField, SseFormat, SseWrapper,
Expand Down Expand Up @@ -467,7 +467,7 @@ impl KeepAlive {
S::Ok: ServerSentEvent + Send,
S::Error: StdError + Send + Sync + 'static,
{
let alive_timer = time::delay_for(self.max_interval);
let alive_timer = time::sleep(self.max_interval);
SseKeepAlive {
event_stream,
comment_text: self.comment_text,
Expand All @@ -484,7 +484,7 @@ struct SseKeepAlive<S> {
event_stream: S,
comment_text: Cow<'static, str>,
max_interval: Duration,
alive_timer: Delay,
alive_timer: Sleep,
}

#[doc(hidden)]
Expand All @@ -505,7 +505,7 @@ where
let max_interval = keep_interval
.into()
.unwrap_or_else(|| Duration::from_secs(15));
let alive_timer = time::delay_for(max_interval);
let alive_timer = time::sleep(max_interval);
SseKeepAlive {
event_stream,
comment_text: Cow::Borrowed(""),
Expand Down
5 changes: 1 addition & 4 deletions src/filters/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,7 @@ where
fn into_response(self) -> Response {
let on_upgrade = self.on_upgrade;
let config = self.ws.config;
let fut = self
.ws
.body
.on_upgrade()
let fut = hyper::upgrade::on(Response::new(self.ws.body))
.and_then(move |upgraded| {
tracing::trace!("websocket upgrade complete");
WebSocket::from_raw_socket(upgraded, protocol::Role::Server, config).map(Ok)
Expand Down
50 changes: 50 additions & 0 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,26 +416,76 @@ where
// TLS config methods

/// Specify the file path to read the private key.
///
/// *This function requires the `"tls"` feature.*
pub fn key_path(self, path: impl AsRef<Path>) -> Self {
self.with_tls(|tls| tls.key_path(path))
}

/// Specify the file path to read the certificate.
///
/// *This function requires the `"tls"` feature.*
pub fn cert_path(self, path: impl AsRef<Path>) -> Self {
self.with_tls(|tls| tls.cert_path(path))
}

/// Specify the file path to read the trust anchor for optional client authentication.
///
/// Anonymous and authenticated clients will be accepted. If no trust anchor is provided by any
/// of the `client_auth_` methods, then client authentication is disabled by default.
///
/// *This function requires the `"tls"` feature.*
pub fn client_auth_optional_path(self, path: impl AsRef<Path>) -> Self {
self.with_tls(|tls| tls.client_auth_optional_path(path))
}

/// Specify the file path to read the trust anchor for required client authentication.
///
/// Only authenticated clients will be accepted. If no trust anchor is provided by any of the
/// `client_auth_` methods, then client authentication is disabled by default.
///
/// *This function requires the `"tls"` feature.*
pub fn client_auth_required_path(self, path: impl AsRef<Path>) -> Self {
self.with_tls(|tls| tls.client_auth_required_path(path))
}

/// Specify the in-memory contents of the private key.
///
/// *This function requires the `"tls"` feature.*
pub fn key(self, key: impl AsRef<[u8]>) -> Self {
self.with_tls(|tls| tls.key(key.as_ref()))
}

/// Specify the in-memory contents of the certificate.
///
/// *This function requires the `"tls"` feature.*
pub fn cert(self, cert: impl AsRef<[u8]>) -> Self {
self.with_tls(|tls| tls.cert(cert.as_ref()))
}

/// Specify the in-memory contents of the trust anchor for optional client authentication.
///
/// Anonymous and authenticated clients will be accepted. If no trust anchor is provided by any
/// of the `client_auth_` methods, then client authentication is disabled by default.
///
/// *This function requires the `"tls"` feature.*
pub fn client_auth_optional(self, trust_anchor: impl AsRef<[u8]>) -> Self {
self.with_tls(|tls| tls.client_auth_optional(trust_anchor.as_ref()))
}

/// Specify the in-memory contents of the trust anchor for required client authentication.
///
/// Only authenticated clients will be accepted. If no trust anchor is provided by any of the
/// `client_auth_` methods, then client authentication is disabled by default.
///
/// *This function requires the `"tls"` feature.*
pub fn client_auth_required(self, trust_anchor: impl AsRef<[u8]>) -> Self {
self.with_tls(|tls| tls.client_auth_required(trust_anchor.as_ref()))
}

/// Specify the DER-encoded OCSP response.
///
/// *This function requires the `"tls"` feature.*
pub fn ocsp_resp(self, resp: impl AsRef<[u8]>) -> Self {
self.with_tls(|tls| tls.ocsp_resp(resp.as_ref()))
}
Expand Down
2 changes: 1 addition & 1 deletion src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ impl WsBuilder {
let upgrade = ::hyper::Client::builder()
.build(AddrConnect(addr))
.request(req)
.and_then(|res| res.into_body().on_upgrade());
.and_then(|res| hyper::upgrade::on(res));

let upgraded = match upgrade.await {
Ok(up) => {
Expand Down
Loading

0 comments on commit 5d2937c

Please sign in to comment.