Skip to content

Commit

Permalink
fix(volo-grpc): add TokioTimer for server and client of hyper (#290)
Browse files Browse the repository at this point in the history
* chore(volo-http): bump volo-http

* fix(volo-grpc): add `timer` for hyper client and server

Both server and client of `hyper` need to set a timer, otherwise they
will **panic** when timeout occurs.

But `hyper` does not provide an implementation of the trait `Timer`,
and even if `hyper-util` does provide one, updates are not released in
new versions.

To solve this problem, I copied `TokioTimer` and `TokioSleep` from the
latest repository of `hyper-util` and put them into `time.rs`.

`timer.rs` will be removed once the `hyper-util` 0.1.2 released, and
`hyper_util::rt::TokioTimer` will be used directly.

Refer: hyperium/hyper-util#73

---------

Signed-off-by: Yu Li <wfly1998@sina.com>
  • Loading branch information
yukiiiteru authored Dec 20, 2023
1 parent ce632c9 commit a651ca2
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 1 deletion.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions volo-grpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub mod request;
pub mod response;
pub mod server;
pub mod status;
mod timer;
pub mod transport;

pub type BoxError = Box<dyn std::error::Error + Send + Sync>;
Expand Down
1 change: 1 addition & 0 deletions volo-grpc/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ impl<L> Server<L> {
// init server
let mut server = http2::Builder::new(TokioExecutor::new());
server.initial_stream_window_size(self.http2_config.init_stream_window_size)
.timer(crate::timer::TokioTimer::new())
.initial_connection_window_size(self.http2_config.init_connection_window_size)
.adaptive_window(self.http2_config.adaptive_window)
.max_concurrent_streams(self.http2_config.max_concurrent_streams)
Expand Down
78 changes: 78 additions & 0 deletions volo-grpc/src/timer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// The `TokioTimer` and `TokioSleep` are copied from `hyper-util`.
//
// Since these are very important, but the new version of `hyper-util` containing these
// has not been released yet, I copied it temporarily.
//
// This file will be removed once the `hyper-util` 0.1.2 released, and
// `hyper_util::rt::TokioTimer` will be used directly.
//
// Refer: https://github.com/hyperium/hyper-util/pull/73

use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
time::{Duration, Instant},
};

use hyper::rt::{Sleep, Timer};
use pin_project::pin_project;

/// A Timer that uses the tokio runtime.
#[non_exhaustive]
#[derive(Default, Clone, Debug)]
pub struct TokioTimer;

// Use TokioSleep to get tokio::time::Sleep to implement Unpin.
// see https://docs.rs/tokio/latest/tokio/time/struct.Sleep.html
#[pin_project]
#[derive(Debug)]
struct TokioSleep {
#[pin]
inner: tokio::time::Sleep,
}

// ==== impl TokioTimer =====

impl Timer for TokioTimer {
fn sleep(&self, duration: Duration) -> Pin<Box<dyn Sleep>> {
Box::pin(TokioSleep {
inner: tokio::time::sleep(duration),
})
}

fn sleep_until(&self, deadline: Instant) -> Pin<Box<dyn Sleep>> {
Box::pin(TokioSleep {
inner: tokio::time::sleep_until(deadline.into()),
})
}

fn reset(&self, sleep: &mut Pin<Box<dyn Sleep>>, new_deadline: Instant) {
if let Some(sleep) = sleep.as_mut().downcast_mut_pin::<TokioSleep>() {
sleep.reset(new_deadline)
}
}
}

impl TokioTimer {
/// Create a new TokioTimer
pub fn new() -> Self {
Self {}
}
}

impl Future for TokioSleep {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.project().inner.poll(cx)
}
}

impl Sleep for TokioSleep {}

impl TokioSleep {
fn reset(self: Pin<&mut Self>, deadline: Instant) {
self.project().inner.as_mut().reset(deadline.into());
}
}
2 changes: 2 additions & 0 deletions volo-grpc/src/transport/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ impl<U> ClientTransport<U> {
);
let mut http_client = http2::Builder::new(TokioExecutor::new());
http_client
.timer(crate::timer::TokioTimer::new())
.initial_stream_window_size(http2_config.init_stream_window_size)
.initial_connection_window_size(http2_config.init_connection_window_size)
.max_frame_size(http2_config.max_frame_size)
Expand Down Expand Up @@ -81,6 +82,7 @@ impl<U> ClientTransport<U> {
);
let mut http_client = http2::Builder::new(TokioExecutor::new());
http_client
.timer(crate::timer::TokioTimer::new())
.initial_stream_window_size(http2_config.init_stream_window_size)
.initial_connection_window_size(http2_config.init_connection_window_size)
.max_frame_size(http2_config.max_frame_size)
Expand Down

0 comments on commit a651ca2

Please sign in to comment.