Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update hyper to 0.14 and tokio/bytes to 1.0 #511

Merged
merged 4 commits into from
Jan 19, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
update hyper to 0.14 and tokio/bytes to 1.0
  • Loading branch information
msrd0 committed Dec 24, 2020
commit 02ec42e5c46c45fa2d68388decae8f63b321bc16
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@ members = [
# example_contribution_template
"examples/example_contribution_template/name",

# websocket
# TODO: Re-enable when tokio-tungstenite is updated
"examples/websocket",
# "examples/websocket",

# finalizer
"examples/finalizers/",
Expand Down
2 changes: 1 addition & 1 deletion examples/diesel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ serde_derive = "1.0"

[dev-dependencies]
diesel_migrations = "1.4.0"
tokio = { version = "0.2.6", features = ["full"] }
tokio = { version = "1.0", features = ["full"] }
4 changes: 2 additions & 2 deletions examples/diesel/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ mod tests {
#[test]
fn get_empty_products() {
let repo = Repo::with_test_transactions(DATABASE_URL);
let mut runtime = tokio::runtime::Runtime::new().unwrap();
let runtime = tokio::runtime::Runtime::new().unwrap();
let _ = runtime.block_on(repo.run(|conn| embedded_migrations::run(&conn)));
let test_server = TestServer::new(router(repo)).unwrap();
let response = test_server
Expand All @@ -169,7 +169,7 @@ mod tests {
#[test]
fn create_and_retrieve_product() {
let repo = Repo::with_test_transactions(DATABASE_URL);
let mut runtime = tokio::runtime::Runtime::new().unwrap();
let runtime = tokio::runtime::Runtime::new().unwrap();
let _ = runtime.block_on(repo.run(|conn| embedded_migrations::run(&conn)));
let test_server = TestServer::new(router(repo)).unwrap();

Expand Down
2 changes: 1 addition & 1 deletion examples/handlers/simple_async_handlers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ mime = "0.3"
futures = "0.3.1"
serde = "1.0"
serde_derive = "1.0"
tokio = { version = "0.2.6", features = ["full"] }
tokio = { version = "1.0", features = ["full"] }
37 changes: 15 additions & 22 deletions examples/handlers/simple_async_handlers/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,16 @@ extern crate serde_derive;

use futures::prelude::*;
use std::pin::Pin;
use std::time::{Duration, Instant};

use gotham::hyper::StatusCode;
use std::time::Duration;

use gotham::handler::HandlerFuture;
use gotham::helpers::http::response::create_response;
use gotham::hyper::StatusCode;
use gotham::router::builder::DefineSingleRoute;
use gotham::router::builder::{build_simple_router, DrawRoutes};
use gotham::router::Router;
use gotham::state::{FromState, State};

use tokio::time::delay_until;

type SleepFuture = Pin<Box<dyn Future<Output = Vec<u8>> + Send>>;
use tokio::time::sleep;

#[derive(Deserialize, StateData, StaticResponseExtender)]
struct QueryStringExtractor {
Expand All @@ -35,6 +31,7 @@ fn get_duration(seconds: u64) -> Duration {
fn get_duration(seconds: u64) -> Duration {
Duration::from_millis(seconds)
}

/// All this function does is return a future that resolves after a number of
/// seconds, with a Vec<u8> that tells you how long it slept for.
///
Expand All @@ -49,15 +46,12 @@ fn get_duration(seconds: u64) -> Duration {
/// web apis) can be coerced into returning futures that yield useful data,
/// so the patterns that you learn in this example should be applicable to
/// real world problems.
fn sleep(seconds: u64) -> SleepFuture {
let when = Instant::now() + get_duration(seconds);
let delay = delay_until(when.into()).map(move |_| {
format!("slept for {} seconds\n", seconds)
.as_bytes()
.to_vec()
});

delay.boxed()
async fn sleep_for(seconds: u64) -> Vec<u8> {
sleep(get_duration(seconds)).await;

format!("slept for {} seconds\n", seconds)
.as_bytes()
.to_vec()
}

/// This handler sleeps for the requested number of seconds, using the `sleep()`
Expand All @@ -67,7 +61,7 @@ fn sleep_handler(mut state: State) -> Pin<Box<HandlerFuture>> {
println!("sleep for {} seconds once: starting", seconds);

// Here, we call our helper function that returns a future.
let sleep_future = sleep(seconds);
let sleep_future = sleep_for(seconds);

// Here, we convert the future from `sleep()` into the form that Gotham expects.
// We have to use .then() rather than .and_then() because we need to coerce both
Expand Down Expand Up @@ -95,16 +89,15 @@ fn loop_handler(mut state: State) -> Pin<Box<HandlerFuture>> {

// Here, we create a stream of Ok(_) that's as long as we need, and use fold
// to loop over it asyncronously, accumulating the return values from sleep().
let sleep_future: SleepFuture = futures::stream::iter(0..seconds)
.fold(Vec::new(), move |mut accumulator, _| {
let sleep_future =
futures::stream::iter(0..seconds).fold(Vec::new(), move |mut accumulator, _| {
// Do the sleep(), and append the result to the accumulator so that it can
// be returned.
sleep(1).map(move |body| {
sleep_for(1).map(move |body| {
accumulator.extend(body);
accumulator
})
})
.boxed();
});

// This bit is the same as the bit in the first example.
sleep_future
Expand Down
2 changes: 1 addition & 1 deletion examples/handlers/simple_async_handlers_await/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ mime = "0.3"
futures = "0.3.1"
serde = "1.0"
serde_derive = "1.0"
tokio = "0.2.9"
tokio = "1.0"
32 changes: 12 additions & 20 deletions examples/handlers/simple_async_handlers_await/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,17 @@
//! A basic example showing the request components

use futures::prelude::*;
use std::pin::Pin;
use std::time::{Duration, Instant};

use gotham::hyper::{Body, StatusCode};
use std::time::Duration;

use gotham::handler::{HandlerError, HandlerResult, IntoResponse};
use gotham::helpers::http::response::create_response;
use gotham::hyper::{Body, StatusCode};
use gotham::router::builder::DefineSingleRoute;
use gotham::router::builder::{build_simple_router, DrawRoutes};
use gotham::router::Router;
use gotham::state::{FromState, State};
use gotham_derive::{StateData, StaticResponseExtender};
use serde_derive::Deserialize;

use tokio::time::delay_until;

type SleepFuture = Pin<Box<dyn Future<Output = Vec<u8>> + Send>>;
use tokio::time::sleep;

#[derive(Deserialize, StateData, StaticResponseExtender)]
struct QueryStringExtractor {
Expand All @@ -33,6 +27,7 @@ fn get_duration(seconds: u64) -> Duration {
fn get_duration(seconds: u64) -> Duration {
Duration::from_millis(seconds)
}

/// All this function does is return a future that resolves after a number of
/// seconds, with a Vec<u8> that tells you how long it slept for.
///
Expand All @@ -47,15 +42,12 @@ fn get_duration(seconds: u64) -> Duration {
/// web apis) can be coerced into returning futures that yield useful data,
/// so the patterns that you learn in this example should be applicable to
/// real world problems.
fn sleep(seconds: u64) -> SleepFuture {
let when = Instant::now() + get_duration(seconds);
let delay = delay_until(when.into()).map(move |_| {
format!("slept for {} seconds\n", seconds)
.as_bytes()
.to_vec()
});

delay.boxed()
async fn sleep_for(seconds: u64) -> Vec<u8> {
sleep(get_duration(seconds)).await;

format!("slept for {} seconds\n", seconds)
.as_bytes()
.to_vec()
}

/// This handler sleeps for the requested number of seconds, using the `sleep()`
Expand All @@ -65,7 +57,7 @@ async fn sleep_handler(state: &mut State) -> Result<impl IntoResponse, HandlerEr
println!("sleep for {} seconds once: starting", seconds);
// Here, we call the sleep function. Note that this step doesn't block:
// it just sets up the timer so that we can use it later.
let sleep_future = sleep(seconds);
let sleep_future = sleep_for(seconds);

// Here is where the serious sleeping happens. We yield execution of
// this block until sleep_future is resolved.
Expand All @@ -92,7 +84,7 @@ async fn loop_handler(mut state: State) -> HandlerResult {
// logic in.
let mut accumulator = Vec::new();
for _ in 0..seconds {
let body = sleep(1).await;
let body = sleep_for(1).await;
accumulator.extend(body)
}

Expand Down
2 changes: 1 addition & 1 deletion examples/hello_world_until/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ edition = "2018"
gotham = { path = "../../gotham" }

futures = "0.3.1"
tokio = { version = "0.2.6", features = ["full"] }
tokio = { version = "1.0", features = ["full"] }
mime = "0.3"

[target.'cfg(unix)'.dev-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion examples/hello_world_until/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ mod tests {
let uri_parsed = uri.parse().unwrap();
let work = client.get(uri_parsed);

let mut rt = Runtime::new().unwrap();
let rt = Runtime::new().unwrap();

match rt.block_on(work) {
Ok(req) => {
Expand Down
6 changes: 3 additions & 3 deletions examples/websocket/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,9 @@ mod test {
);

server.run_future(async move {
let response: Response<_> = response.into();
let upgraded = response
.into_body()
let response: Response<_> = response.into();
let upgraded = response
.into_body()
.on_upgrade()
.await
.expect("Failed to upgrade client websocket.");
Expand Down
8 changes: 4 additions & 4 deletions gotham/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ rustls = ["tokio-rustls"]

[dependencies]
log = "0.4"
hyper = "0.13.1"
hyper = { version = "0.14", features = ["full"] }
serde = "1.0"
serde_derive = "1.0"
bincode = "1.0"
mime = "0.3.15"
mime_guess = "2.0.1"
futures = "0.3.1"
tokio = { version = "0.2", features = ["tcp", "rt-threaded", "time", "fs", "io-util"] }
bytes = "0.5"
tokio = { version = "1.0", features = ["net", "rt-multi-thread", "time", "fs", "io-util"] }
bytes = "1.0"
borrow-bag = { path = "../misc/borrow_bag", version = "1.0" }
percent-encoding = "2.1"
pin-project = "1.0.0"
Expand All @@ -47,7 +47,7 @@ http = "0.2"
httpdate = "0.3"
itertools = "0.9.0"
anyhow = "1.0"
tokio-rustls = { version = "0.14.0", optional = true }
tokio-rustls = { version = "0.22", optional = true }

[dev-dependencies]
gotham_derive = { path = "../gotham_derive" }
Expand Down
27 changes: 20 additions & 7 deletions gotham/src/handler/assets/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use mime::{self, Mime};
use mime_guess::from_path;
use serde_derive::Deserialize;
use tokio::fs::File;
use tokio::io::AsyncRead;
use tokio::io::{AsyncRead, ReadBuf};

use self::accepted_encoding::accepted_encodings;
use crate::handler::{Handler, HandlerError, HandlerFuture, NewHandler};
Expand All @@ -31,6 +31,7 @@ use std::convert::From;
use std::fs::Metadata;
use std::io;
use std::iter::FromIterator;
use std::mem::MaybeUninit;
use std::path::{Component, Path, PathBuf};
use std::pin::Pin;
use std::time::UNIX_EPOCH;
Expand Down Expand Up @@ -366,8 +367,9 @@ impl StaticResponseExtender for FilePathExtractor {
}

// Creates a Stream from the given file, for streaming as part of the Response.
// Borrowed from Warp https://github.com/seanmonstar/warp/blob/master/src/filters/fs.rs
// Thanks @seanmonstar.
// Inspired by Warp https://github.com/seanmonstar/warp/blob/master/src/filters/fs.rs
// Inspired by tokio https://github.com/tokio-rs/tokio/blob/master/tokio/src/io/util/read_buf.rs
// Thanks @seanmonstar and @carllerche.
fn file_stream(
mut f: File,
buf_size: usize,
Expand All @@ -382,20 +384,31 @@ fn file_stream(
buf.reserve(buf_size);
}

let read = Pin::new(&mut f).poll_read_buf(cx, &mut buf);
let n = ready!(read).map_err(|err| {
let dst = buf.chunk_mut();
let dst = unsafe { &mut *(dst as *mut _ as *mut [MaybeUninit<u8>]) };
let mut read_buf = ReadBuf::uninit(dst);
let read = Pin::new(&mut f).poll_read(cx, &mut read_buf);
ready!(read).map_err(|err| {
debug!("file read error: {}", err);
err
})? as u64;
})?;

if n == 0 {
if read_buf.filled().is_empty() {
debug!("file read found EOF before expected length");
return Poll::Ready(Some(Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"file read found EOF before expected length",
))));
}

let n = read_buf.filled().len();
// Safety: This is guaranteed to be the number of initialized (and read)
// bytes due to the invariants provided by `ReadBuf::filled`.
unsafe {
buf.advance_mut(n);
}
let n = n as u64;

let chunk = if n > len {
let chunk = buf.split_to(len as usize);
len = 0;
Expand Down
7 changes: 3 additions & 4 deletions gotham/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,8 @@ pub use plain::*;
pub use tls::start as start_with_tls;

fn new_runtime(threads: usize) -> Runtime {
runtime::Builder::new()
.threaded_scheduler()
.core_threads(threads)
runtime::Builder::new_multi_thread()
.worker_threads(threads)
.thread_name("gotham-worker")
.enable_all()
.build()
Expand All @@ -99,7 +98,7 @@ where
/// the socket as necessary. Errors returned by this function will be ignored and the connection
/// will be dropped if the future returned by the wrapper resolves to an error.
pub async fn bind_server<'a, NH, F, Wrapped, Wrap>(
mut listener: TcpListener,
listener: TcpListener,
new_handler: NH,
wrap: Wrap,
) -> !
Expand Down
2 changes: 1 addition & 1 deletion gotham/src/plain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ where
NH: NewHandler + 'static,
A: ToSocketAddrs + 'static + Send,
{
let mut runtime = new_runtime(threads);
let runtime = new_runtime(threads);
let _ = runtime.block_on(async { init_server(addr, new_handler).await });
}

Expand Down
9 changes: 5 additions & 4 deletions gotham/src/plain/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use futures::prelude::*;
use hyper::client::Client;
use tokio::net::TcpListener;
use tokio::runtime::Runtime;
use tokio::time::{delay_for, Delay};
use tokio::time::{sleep, Sleep};

use hyper::service::Service;
use tokio::net::TcpStream;
Expand Down Expand Up @@ -70,9 +70,10 @@ impl Clone for TestServer {
}

impl test::Server for TestServer {
fn request_expiry(&self) -> Delay {
fn request_expiry(&self) -> Sleep {
let runtime = self.data.runtime.write().unwrap();
runtime.enter(|| delay_for(Duration::from_secs(self.data.timeout)))
let _guard = runtime.enter();
sleep(Duration::from_secs(self.data.timeout))
}

fn run_future<F, O>(&self, future: F) -> O
Expand Down Expand Up @@ -108,7 +109,7 @@ impl TestServer {
where
NH::Instance: UnwindSafe,
{
let mut runtime = Runtime::new()?;
let runtime = Runtime::new()?;
// TODO: Fix this into an async flow
let listener = runtime.block_on(TcpListener::bind("127.0.0.1:0".parse::<SocketAddr>()?))?;
let addr = listener.local_addr()?;
Expand Down
Loading