Skip to content

Commit

Permalink
server: add serve and serve_with_graceful_shutdown helpers (#1382)
Browse files Browse the repository at this point in the history
* add `serve` and `serve_with_graceful_shutdown`

* tokio::net::TcpStream -> impl tokio::io::ReadWrite

* address grumbles
  • Loading branch information
niklasad1 authored Jun 6, 2024
1 parent 038a77f commit 8433f48
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 182 deletions.
36 changes: 3 additions & 33 deletions examples/examples/jsonrpsee_as_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,14 @@ use std::net::SocketAddr;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use futures::future::{self, Either};
use futures::FutureExt;
use hyper::header::AUTHORIZATION;
use hyper::HeaderMap;
use hyper_util::rt::{TokioExecutor, TokioIo};
use jsonrpsee::core::async_trait;
use jsonrpsee::http_client::HttpClientBuilder;
use jsonrpsee::proc_macros::rpc;
use jsonrpsee::server::middleware::rpc::{ResponseFuture, RpcServiceBuilder, RpcServiceT};
use jsonrpsee::server::{stop_channel, ServerHandle, StopHandle, TowerServiceBuilder};
use jsonrpsee::server::{serve_with_graceful_shutdown, stop_channel, ServerHandle, StopHandle, TowerServiceBuilder};
use jsonrpsee::types::{ErrorObject, ErrorObjectOwned, Request};
use jsonrpsee::ws_client::{HeaderValue, WsClientBuilder};
use jsonrpsee::{MethodResponse, Methods};
Expand Down Expand Up @@ -147,8 +145,6 @@ async fn main() -> anyhow::Result<()> {
}

async fn run_server(metrics: Metrics) -> anyhow::Result<ServerHandle> {
use hyper::service::service_fn;

let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 9944))).await?;

// This state is cloned for every connection
Expand Down Expand Up @@ -201,7 +197,7 @@ async fn run_server(metrics: Metrics) -> anyhow::Result<ServerHandle> {
};
let per_conn2 = per_conn.clone();

let svc = service_fn(move |req: hyper::Request<hyper::body::Incoming>| {
let svc = tower::service_fn(move |req: hyper::Request<hyper::body::Incoming>| {
let is_websocket = jsonrpsee::server::ws::is_upgrade_request(&req);
let transport_label = if is_websocket { "ws" } else { "http" };
let PerConnection { methods, stop_handle, metrics, svc_builder } = per_conn2.clone();
Expand Down Expand Up @@ -262,33 +258,7 @@ async fn run_server(metrics: Metrics) -> anyhow::Result<ServerHandle> {
}
});

let per_conn = per_conn.clone();
tokio::spawn(async move {
let stop_handle2 = per_conn.stop_handle.clone();

let builder = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new());
let conn = builder.serve_connection_with_upgrades(TokioIo::new(sock), svc);
let stopped = stop_handle2.shutdown();

// Pin the future so that it can be polled.
tokio::pin!(stopped, conn);

let res = match future::select(conn, stopped).await {
// Return the connection if not stopped.
Either::Left((conn, _)) => conn,
// If the server is stopped, we should gracefully shutdown
// the connection and poll it until it finishes.
Either::Right((_, mut conn)) => {
conn.as_mut().graceful_shutdown();
conn.await
}
};

// Log any errors that might have occurred.
if let Err(err) = res {
tracing::error!(err=?err, "HTTP connection failed");
}
});
tokio::spawn(serve_with_graceful_shutdown(sock, svc, stop_handle.clone().shutdown()));
}
});

Expand Down
35 changes: 5 additions & 30 deletions examples/examples/jsonrpsee_server_low_level_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@ use std::net::{IpAddr, SocketAddr};
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Mutex};

use futures::future::{self, BoxFuture, Either};
use futures::future::BoxFuture;
use futures::FutureExt;
use hyper_util::rt::{TokioExecutor, TokioIo};
use jsonrpsee::core::async_trait;
use jsonrpsee::http_client::HttpClientBuilder;
use jsonrpsee::proc_macros::rpc;
use jsonrpsee::server::middleware::rpc::RpcServiceT;
use jsonrpsee::server::{
http, stop_channel, ws, ConnectionGuard, ConnectionState, RpcServiceBuilder, ServerConfig, ServerHandle, StopHandle,
http, serve_with_graceful_shutdown, stop_channel, ws, ConnectionGuard, ConnectionState, RpcServiceBuilder,
ServerConfig, ServerHandle, StopHandle,
};
use jsonrpsee::types::{ErrorObject, ErrorObjectOwned, Request};
use jsonrpsee::ws_client::WsClientBuilder;
Expand Down Expand Up @@ -158,8 +158,6 @@ async fn main() -> anyhow::Result<()> {
}

async fn run_server() -> anyhow::Result<ServerHandle> {
use hyper::service::service_fn;

// Construct our SocketAddr to listen on...
let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 9944))).await?;

Expand Down Expand Up @@ -224,7 +222,7 @@ async fn run_server() -> anyhow::Result<ServerHandle> {
// Create a service handler.
let stop_handle2 = per_conn.stop_handle.clone();
let per_conn = per_conn.clone();
let svc = service_fn(move |req| {
let svc = tower::service_fn(move |req| {
let PerConnection {
methods,
stop_handle,
Expand Down Expand Up @@ -307,30 +305,7 @@ async fn run_server() -> anyhow::Result<ServerHandle> {
});

// Upgrade the connection to a HTTP service.
tokio::spawn(async move {
let builder = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new());
let conn = builder.serve_connection_with_upgrades(TokioIo::new(sock), svc);
let stopped = stop_handle2.shutdown();

// Pin the future so that it can be polled.
tokio::pin!(stopped, conn);

let res = match future::select(conn, stopped).await {
// Return the connection if not stopped.
Either::Left((conn, _)) => conn,
// If the server is stopped, we should gracefully shutdown
// the connection and poll it until it finishes.
Either::Right((_, mut conn)) => {
conn.as_mut().graceful_shutdown();
conn.await
}
};

// Log any errors that might have occurred.
if let Err(err) = res {
tracing::error!(err=?err, "HTTP connection failed");
}
});
tokio::spawn(serve_with_graceful_shutdown(sock, svc, stop_handle2.shutdown()));
}
});

Expand Down
35 changes: 2 additions & 33 deletions examples/examples/ws_dual_stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,8 @@
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use futures::future::{self, Either};
use hyper_util::rt::{TokioExecutor, TokioIo};
use hyper_util::service::TowerToHyperService;
use jsonrpsee::core::client::ClientT;
use jsonrpsee::server::{stop_channel, ServerHandle};
use jsonrpsee::server::{serve_with_graceful_shutdown, stop_channel, ServerHandle};
use jsonrpsee::ws_client::WsClientBuilder;
use jsonrpsee::{rpc_params, RpcModule};
use std::net::SocketAddr;
Expand Down Expand Up @@ -111,36 +108,8 @@ async fn run_server() -> anyhow::Result<(ServerHandle, Addrs)> {
_ = stop_hdl.clone().shutdown() => break,
};

// Clone the service and stop handle to be moved into the spawn
// below.
let svc = svc.clone();
let stop_hdl2 = stop_hdl.clone();

// Spawn a new task to serve each respective (Hyper) connection.
tokio::spawn(async move {
let builder = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new());
let conn = builder.serve_connection_with_upgrades(TokioIo::new(stream), TowerToHyperService::new(svc));
let stopped = stop_hdl2.shutdown();

// Pin the future so that it can be polled.
tokio::pin!(stopped, conn);

let res = match future::select(conn, stopped).await {
// Return the connection if not stopped.
Either::Left((conn, _)) => conn,
// If the server is stopped, we should gracefully shutdown
// the connection and poll it until it finishes.
Either::Right((_, mut conn)) => {
conn.as_mut().graceful_shutdown();
conn.await
}
};

// Log any errors that might have occurred.
if let Err(err) = res {
tracing::error!(err=?err, "HTTP connection failed");
}
});
tokio::spawn(serve_with_graceful_shutdown(stream, svc.clone(), stop_hdl.clone().shutdown()));
}
});

Expand Down
1 change: 1 addition & 0 deletions server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,6 @@ pub use tracing;
pub use jsonrpsee_core::http_helpers::{Body as HttpBody, Request as HttpRequest, Response as HttpResponse};
pub use transport::http;
pub use transport::ws;
pub use utils::{serve, serve_with_graceful_shutdown};

pub(crate) const LOG_TARGET: &str = "jsonrpsee-server";
30 changes: 3 additions & 27 deletions server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,7 @@ impl<HttpMiddleware, RpcMiddleware> Builder<HttpMiddleware, RpcMiddleware> {
/// # Examples
///
/// ```no_run
/// use jsonrpsee_server::{Methods, ServerHandle, ws, stop_channel};
/// use jsonrpsee_server::{Methods, ServerHandle, ws, stop_channel, serve_with_graceful_shutdown};
/// use tower::Service;
/// use std::{error::Error as StdError, net::SocketAddr};
/// use futures_util::future::{self, Either};
Expand Down Expand Up @@ -820,7 +820,7 @@ impl<HttpMiddleware, RpcMiddleware> Builder<HttpMiddleware, RpcMiddleware> {
/// let svc_builder2 = svc_builder.clone();
/// let methods2 = methods.clone();
///
/// let svc = hyper::service::service_fn(move |req| {
/// let svc = tower::service_fn(move |req| {
/// let stop_handle = stop_handle2.clone();
/// let svc_builder = svc_builder2.clone();
/// let methods = methods2.clone();
Expand All @@ -846,32 +846,8 @@ impl<HttpMiddleware, RpcMiddleware> Builder<HttpMiddleware, RpcMiddleware> {
/// async move { svc.call(req).await.map_err(|e| anyhow::anyhow!("{:?}", e)) }
/// });
///
/// let stop_handle = stop_handle.clone();
/// // Upgrade the connection to a HTTP service with graceful shutdown.
/// tokio::spawn(async move {
/// let builder = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new());
/// let conn = builder.serve_connection_with_upgrades(TokioIo::new(sock), svc);
/// let stopped = stop_handle.shutdown();
///
/// // Pin the future so that it can be polled.
/// tokio::pin!(stopped, conn);
///
/// let res = match future::select(conn, stopped).await {
/// // Return the connection if not stopped.
/// Either::Left((conn, _)) => conn,
/// // If the server is stopped, we should gracefully shutdown
/// // the connection and poll it until it finishes.
/// Either::Right((_, mut conn)) => {
/// conn.as_mut().graceful_shutdown();
/// conn.await
/// }
/// };
///
/// // Log any errors that might have occurred.
/// if let Err(err) = res {
/// tracing::error!(err=?err, "HTTP connection failed");
/// }
/// });
/// tokio::spawn(serve_with_graceful_shutdown(sock, svc, stop_handle.clone().shutdown()));
/// }
/// });
///
Expand Down
33 changes: 4 additions & 29 deletions server/src/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@ use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::{fmt, sync::atomic::AtomicUsize};

use crate::{stop_channel, RpcModule, Server, ServerBuilder, ServerHandle};
use crate::{serve_with_graceful_shutdown, stop_channel, RpcModule, Server, ServerBuilder, ServerHandle};

use futures_util::future::Either;
use futures_util::{future, FutureExt};
use hyper_util::rt::{TokioExecutor, TokioIo};
use futures_util::FutureExt;
use jsonrpsee_core::server::Methods;
use jsonrpsee_core::{DeserializeOwned, RpcResult, StringError};
use jsonrpsee_test_utils::TimeoutFutureExt;
Expand Down Expand Up @@ -210,8 +208,6 @@ pub(crate) struct Metrics {
}

pub(crate) async fn ws_server_with_stats(metrics: Metrics) -> SocketAddr {
use hyper::service::service_fn;

let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0))).await.unwrap();
let addr = listener.local_addr().unwrap();
let (stop_handle, server_handle) = stop_channel();
Expand Down Expand Up @@ -241,7 +237,7 @@ pub(crate) async fn ws_server_with_stats(metrics: Metrics) -> SocketAddr {
tokio::spawn(async move {
let rpc_svc = rpc_svc.clone();

let svc = service_fn(move |req| {
let svc = tower::service_fn(move |req| {
let is_websocket = crate::ws::is_upgrade_request(&req);
let metrics = metrics.clone();
let mut rpc_svc = rpc_svc.clone();
Expand All @@ -264,28 +260,7 @@ pub(crate) async fn ws_server_with_stats(metrics: Metrics) -> SocketAddr {
}
});

let builder = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new());
let conn = builder.serve_connection_with_upgrades(TokioIo::new(sock), svc);
let stopped = stop_handle.shutdown();

// Pin the future so that it can be polled.
tokio::pin!(stopped, conn);

let res = match future::select(conn, stopped).await {
// Return the connection if not stopped.
Either::Left((conn, _)) => conn,
// If the server is stopped, we should gracefully shutdown
// the connection and poll it until it finishes.
Either::Right((_, mut conn)) => {
conn.as_mut().graceful_shutdown();
conn.await
}
};

// Log any errors that might have occurred.
if let Err(err) = res {
tracing::error!(err=?err, "HTTP connection failed");
}
tokio::spawn(serve_with_graceful_shutdown(sock, svc, stop_handle.clone().shutdown()));
});
}
});
Expand Down
62 changes: 62 additions & 0 deletions server/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,15 @@
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

use crate::{HttpBody, HttpRequest};

use futures_util::future::{self, Either};
use hyper_util::rt::{TokioExecutor, TokioIo};
use jsonrpsee_core::BoxError;
use pin_project::pin_project;
use tower::util::Oneshot;
use tower::ServiceExt;
Expand Down Expand Up @@ -79,6 +83,64 @@ where
}
}

/// Serve a service over a TCP connection without graceful shutdown.
/// This means that pending requests will be dropped when the server is stopped.
///
/// If you want to gracefully shutdown the server, use [`serve_with_graceful_shutdown`] instead.
pub async fn serve<S, B, I>(io: I, service: S) -> Result<(), BoxError>
where
S: tower::Service<http::Request<hyper::body::Incoming>, Response = http::Response<B>> + Clone + Send + 'static,
S::Future: Send,
S::Response: Send,
S::Error: Into<BoxError>,
B: http_body::Body<Data = hyper::body::Bytes> + Send + 'static,
B::Error: Into<BoxError>,
I: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin + 'static,
{
let service = hyper_util::service::TowerToHyperService::new(service);
let io = TokioIo::new(io);

let builder = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new());
let conn = builder.serve_connection_with_upgrades(io, service);
conn.await
}

/// Serve a service over a TCP connection with graceful shutdown.
/// This means that pending requests will be completed before the server is stopped.
pub async fn serve_with_graceful_shutdown<S, B, I>(
io: I,
service: S,
stopped: impl Future<Output = ()>,
) -> Result<(), BoxError>
where
S: tower::Service<http::Request<hyper::body::Incoming>, Response = http::Response<B>> + Clone + Send + 'static,
S::Future: Send,
S::Response: Send,
S::Error: Into<BoxError>,
B: http_body::Body<Data = hyper::body::Bytes> + Send + 'static,
B::Error: Into<BoxError>,
I: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin + 'static,
{
let service = hyper_util::service::TowerToHyperService::new(service);
let io = TokioIo::new(io);

let builder = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new());
let conn = builder.serve_connection_with_upgrades(io, service);

tokio::pin!(stopped, conn);

match future::select(conn, stopped).await {
// Return if the connection was completed.
Either::Left((conn, _)) => conn,
// If the server is stopped, we should gracefully shutdown
// the connection and poll it until it finishes.
Either::Right((_, mut conn)) => {
conn.as_mut().graceful_shutdown();
conn.await
}
}
}

/// Helpers to deserialize a request with extensions.
pub(crate) mod deserialize {
/// Helper to deserialize a request with extensions.
Expand Down
Loading

0 comments on commit 8433f48

Please sign in to comment.