diff --git a/examples/examples/jsonrpsee_as_service.rs b/examples/examples/jsonrpsee_as_service.rs index db7b41922b..69ce81638a 100644 --- a/examples/examples/jsonrpsee_as_service.rs +++ b/examples/examples/jsonrpsee_as_service.rs @@ -35,16 +35,14 @@ use std::net::SocketAddr; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; -use futures::future::{self, Either}; use futures::FutureExt; use hyper::header::AUTHORIZATION; use hyper::HeaderMap; -use hyper_util::rt::{TokioExecutor, TokioIo}; use jsonrpsee::core::async_trait; use jsonrpsee::http_client::HttpClientBuilder; use jsonrpsee::proc_macros::rpc; use jsonrpsee::server::middleware::rpc::{ResponseFuture, RpcServiceBuilder, RpcServiceT}; -use jsonrpsee::server::{stop_channel, ServerHandle, StopHandle, TowerServiceBuilder}; +use jsonrpsee::server::{serve_with_graceful_shutdown, stop_channel, ServerHandle, StopHandle, TowerServiceBuilder}; use jsonrpsee::types::{ErrorObject, ErrorObjectOwned, Request}; use jsonrpsee::ws_client::{HeaderValue, WsClientBuilder}; use jsonrpsee::{MethodResponse, Methods}; @@ -147,8 +145,6 @@ async fn main() -> anyhow::Result<()> { } async fn run_server(metrics: Metrics) -> anyhow::Result { - use hyper::service::service_fn; - let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 9944))).await?; // This state is cloned for every connection @@ -201,7 +197,7 @@ async fn run_server(metrics: Metrics) -> anyhow::Result { }; let per_conn2 = per_conn.clone(); - let svc = service_fn(move |req: hyper::Request| { + let svc = tower::service_fn(move |req: hyper::Request| { let is_websocket = jsonrpsee::server::ws::is_upgrade_request(&req); let transport_label = if is_websocket { "ws" } else { "http" }; let PerConnection { methods, stop_handle, metrics, svc_builder } = per_conn2.clone(); @@ -262,33 +258,7 @@ async fn run_server(metrics: Metrics) -> anyhow::Result { } }); - let per_conn = per_conn.clone(); - tokio::spawn(async move { - let stop_handle2 = per_conn.stop_handle.clone(); - - let builder = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new()); - let conn = builder.serve_connection_with_upgrades(TokioIo::new(sock), svc); - let stopped = stop_handle2.shutdown(); - - // Pin the future so that it can be polled. - tokio::pin!(stopped, conn); - - let res = match future::select(conn, stopped).await { - // Return the connection if not stopped. - Either::Left((conn, _)) => conn, - // If the server is stopped, we should gracefully shutdown - // the connection and poll it until it finishes. - Either::Right((_, mut conn)) => { - conn.as_mut().graceful_shutdown(); - conn.await - } - }; - - // Log any errors that might have occurred. - if let Err(err) = res { - tracing::error!(err=?err, "HTTP connection failed"); - } - }); + tokio::spawn(serve_with_graceful_shutdown(sock, svc, stop_handle.clone().shutdown())); } }); diff --git a/examples/examples/jsonrpsee_server_low_level_api.rs b/examples/examples/jsonrpsee_server_low_level_api.rs index 9d6e194c3f..ea8d7d1473 100644 --- a/examples/examples/jsonrpsee_server_low_level_api.rs +++ b/examples/examples/jsonrpsee_server_low_level_api.rs @@ -44,15 +44,15 @@ use std::net::{IpAddr, SocketAddr}; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::{Arc, Mutex}; -use futures::future::{self, BoxFuture, Either}; +use futures::future::BoxFuture; use futures::FutureExt; -use hyper_util::rt::{TokioExecutor, TokioIo}; use jsonrpsee::core::async_trait; use jsonrpsee::http_client::HttpClientBuilder; use jsonrpsee::proc_macros::rpc; use jsonrpsee::server::middleware::rpc::RpcServiceT; use jsonrpsee::server::{ - http, stop_channel, ws, ConnectionGuard, ConnectionState, RpcServiceBuilder, ServerConfig, ServerHandle, StopHandle, + http, serve_with_graceful_shutdown, stop_channel, ws, ConnectionGuard, ConnectionState, RpcServiceBuilder, + ServerConfig, ServerHandle, StopHandle, }; use jsonrpsee::types::{ErrorObject, ErrorObjectOwned, Request}; use jsonrpsee::ws_client::WsClientBuilder; @@ -158,8 +158,6 @@ async fn main() -> anyhow::Result<()> { } async fn run_server() -> anyhow::Result { - use hyper::service::service_fn; - // Construct our SocketAddr to listen on... let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 9944))).await?; @@ -224,7 +222,7 @@ async fn run_server() -> anyhow::Result { // Create a service handler. let stop_handle2 = per_conn.stop_handle.clone(); let per_conn = per_conn.clone(); - let svc = service_fn(move |req| { + let svc = tower::service_fn(move |req| { let PerConnection { methods, stop_handle, @@ -307,30 +305,7 @@ async fn run_server() -> anyhow::Result { }); // Upgrade the connection to a HTTP service. - tokio::spawn(async move { - let builder = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new()); - let conn = builder.serve_connection_with_upgrades(TokioIo::new(sock), svc); - let stopped = stop_handle2.shutdown(); - - // Pin the future so that it can be polled. - tokio::pin!(stopped, conn); - - let res = match future::select(conn, stopped).await { - // Return the connection if not stopped. - Either::Left((conn, _)) => conn, - // If the server is stopped, we should gracefully shutdown - // the connection and poll it until it finishes. - Either::Right((_, mut conn)) => { - conn.as_mut().graceful_shutdown(); - conn.await - } - }; - - // Log any errors that might have occurred. - if let Err(err) = res { - tracing::error!(err=?err, "HTTP connection failed"); - } - }); + tokio::spawn(serve_with_graceful_shutdown(sock, svc, stop_handle2.shutdown())); } }); diff --git a/examples/examples/ws_dual_stack.rs b/examples/examples/ws_dual_stack.rs index 7556a7cc95..ac19176676 100644 --- a/examples/examples/ws_dual_stack.rs +++ b/examples/examples/ws_dual_stack.rs @@ -24,11 +24,8 @@ // IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use futures::future::{self, Either}; -use hyper_util::rt::{TokioExecutor, TokioIo}; -use hyper_util::service::TowerToHyperService; use jsonrpsee::core::client::ClientT; -use jsonrpsee::server::{stop_channel, ServerHandle}; +use jsonrpsee::server::{serve_with_graceful_shutdown, stop_channel, ServerHandle}; use jsonrpsee::ws_client::WsClientBuilder; use jsonrpsee::{rpc_params, RpcModule}; use std::net::SocketAddr; @@ -111,36 +108,8 @@ async fn run_server() -> anyhow::Result<(ServerHandle, Addrs)> { _ = stop_hdl.clone().shutdown() => break, }; - // Clone the service and stop handle to be moved into the spawn - // below. - let svc = svc.clone(); - let stop_hdl2 = stop_hdl.clone(); - // Spawn a new task to serve each respective (Hyper) connection. - tokio::spawn(async move { - let builder = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new()); - let conn = builder.serve_connection_with_upgrades(TokioIo::new(stream), TowerToHyperService::new(svc)); - let stopped = stop_hdl2.shutdown(); - - // Pin the future so that it can be polled. - tokio::pin!(stopped, conn); - - let res = match future::select(conn, stopped).await { - // Return the connection if not stopped. - Either::Left((conn, _)) => conn, - // If the server is stopped, we should gracefully shutdown - // the connection and poll it until it finishes. - Either::Right((_, mut conn)) => { - conn.as_mut().graceful_shutdown(); - conn.await - } - }; - - // Log any errors that might have occurred. - if let Err(err) = res { - tracing::error!(err=?err, "HTTP connection failed"); - } - }); + tokio::spawn(serve_with_graceful_shutdown(stream, svc.clone(), stop_hdl.clone().shutdown())); } }); diff --git a/server/src/lib.rs b/server/src/lib.rs index 7c0d710f0d..c343e1479c 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -56,5 +56,6 @@ pub use tracing; pub use jsonrpsee_core::http_helpers::{Body as HttpBody, Request as HttpRequest, Response as HttpResponse}; pub use transport::http; pub use transport::ws; +pub use utils::{serve, serve_with_graceful_shutdown}; pub(crate) const LOG_TARGET: &str = "jsonrpsee-server"; diff --git a/server/src/server.rs b/server/src/server.rs index 02aa6ecb4e..735ec307ed 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -784,7 +784,7 @@ impl Builder { /// # Examples /// /// ```no_run - /// use jsonrpsee_server::{Methods, ServerHandle, ws, stop_channel}; + /// use jsonrpsee_server::{Methods, ServerHandle, ws, stop_channel, serve_with_graceful_shutdown}; /// use tower::Service; /// use std::{error::Error as StdError, net::SocketAddr}; /// use futures_util::future::{self, Either}; @@ -820,7 +820,7 @@ impl Builder { /// let svc_builder2 = svc_builder.clone(); /// let methods2 = methods.clone(); /// - /// let svc = hyper::service::service_fn(move |req| { + /// let svc = tower::service_fn(move |req| { /// let stop_handle = stop_handle2.clone(); /// let svc_builder = svc_builder2.clone(); /// let methods = methods2.clone(); @@ -846,32 +846,8 @@ impl Builder { /// async move { svc.call(req).await.map_err(|e| anyhow::anyhow!("{:?}", e)) } /// }); /// - /// let stop_handle = stop_handle.clone(); /// // Upgrade the connection to a HTTP service with graceful shutdown. - /// tokio::spawn(async move { - /// let builder = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new()); - /// let conn = builder.serve_connection_with_upgrades(TokioIo::new(sock), svc); - /// let stopped = stop_handle.shutdown(); - /// - /// // Pin the future so that it can be polled. - /// tokio::pin!(stopped, conn); - /// - /// let res = match future::select(conn, stopped).await { - /// // Return the connection if not stopped. - /// Either::Left((conn, _)) => conn, - /// // If the server is stopped, we should gracefully shutdown - /// // the connection and poll it until it finishes. - /// Either::Right((_, mut conn)) => { - /// conn.as_mut().graceful_shutdown(); - /// conn.await - /// } - /// }; - /// - /// // Log any errors that might have occurred. - /// if let Err(err) = res { - /// tracing::error!(err=?err, "HTTP connection failed"); - /// } - /// }); + /// tokio::spawn(serve_with_graceful_shutdown(sock, svc, stop_handle.clone().shutdown())); /// } /// }); /// diff --git a/server/src/tests/helpers.rs b/server/src/tests/helpers.rs index 3b575e9952..49b807e81a 100644 --- a/server/src/tests/helpers.rs +++ b/server/src/tests/helpers.rs @@ -3,11 +3,9 @@ use std::sync::atomic::Ordering; use std::sync::Arc; use std::{fmt, sync::atomic::AtomicUsize}; -use crate::{stop_channel, RpcModule, Server, ServerBuilder, ServerHandle}; +use crate::{serve_with_graceful_shutdown, stop_channel, RpcModule, Server, ServerBuilder, ServerHandle}; -use futures_util::future::Either; -use futures_util::{future, FutureExt}; -use hyper_util::rt::{TokioExecutor, TokioIo}; +use futures_util::FutureExt; use jsonrpsee_core::server::Methods; use jsonrpsee_core::{DeserializeOwned, RpcResult, StringError}; use jsonrpsee_test_utils::TimeoutFutureExt; @@ -210,8 +208,6 @@ pub(crate) struct Metrics { } pub(crate) async fn ws_server_with_stats(metrics: Metrics) -> SocketAddr { - use hyper::service::service_fn; - let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0))).await.unwrap(); let addr = listener.local_addr().unwrap(); let (stop_handle, server_handle) = stop_channel(); @@ -241,7 +237,7 @@ pub(crate) async fn ws_server_with_stats(metrics: Metrics) -> SocketAddr { tokio::spawn(async move { let rpc_svc = rpc_svc.clone(); - let svc = service_fn(move |req| { + let svc = tower::service_fn(move |req| { let is_websocket = crate::ws::is_upgrade_request(&req); let metrics = metrics.clone(); let mut rpc_svc = rpc_svc.clone(); @@ -264,28 +260,7 @@ pub(crate) async fn ws_server_with_stats(metrics: Metrics) -> SocketAddr { } }); - let builder = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new()); - let conn = builder.serve_connection_with_upgrades(TokioIo::new(sock), svc); - let stopped = stop_handle.shutdown(); - - // Pin the future so that it can be polled. - tokio::pin!(stopped, conn); - - let res = match future::select(conn, stopped).await { - // Return the connection if not stopped. - Either::Left((conn, _)) => conn, - // If the server is stopped, we should gracefully shutdown - // the connection and poll it until it finishes. - Either::Right((_, mut conn)) => { - conn.as_mut().graceful_shutdown(); - conn.await - } - }; - - // Log any errors that might have occurred. - if let Err(err) = res { - tracing::error!(err=?err, "HTTP connection failed"); - } + tokio::spawn(serve_with_graceful_shutdown(sock, svc, stop_handle.clone().shutdown())); }); } }); diff --git a/server/src/utils.rs b/server/src/utils.rs index 0337c65d65..d510a84661 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -24,11 +24,15 @@ // IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use crate::{HttpBody, HttpRequest}; +use futures_util::future::{self, Either}; +use hyper_util::rt::{TokioExecutor, TokioIo}; +use jsonrpsee_core::BoxError; use pin_project::pin_project; use tower::util::Oneshot; use tower::ServiceExt; @@ -79,6 +83,64 @@ where } } +/// Serve a service over a TCP connection without graceful shutdown. +/// This means that pending requests will be dropped when the server is stopped. +/// +/// If you want to gracefully shutdown the server, use [`serve_with_graceful_shutdown`] instead. +pub async fn serve(io: I, service: S) -> Result<(), BoxError> +where + S: tower::Service, Response = http::Response> + Clone + Send + 'static, + S::Future: Send, + S::Response: Send, + S::Error: Into, + B: http_body::Body + Send + 'static, + B::Error: Into, + I: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin + 'static, +{ + let service = hyper_util::service::TowerToHyperService::new(service); + let io = TokioIo::new(io); + + let builder = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new()); + let conn = builder.serve_connection_with_upgrades(io, service); + conn.await +} + +/// Serve a service over a TCP connection with graceful shutdown. +/// This means that pending requests will be completed before the server is stopped. +pub async fn serve_with_graceful_shutdown( + io: I, + service: S, + stopped: impl Future, +) -> Result<(), BoxError> +where + S: tower::Service, Response = http::Response> + Clone + Send + 'static, + S::Future: Send, + S::Response: Send, + S::Error: Into, + B: http_body::Body + Send + 'static, + B::Error: Into, + I: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin + 'static, +{ + let service = hyper_util::service::TowerToHyperService::new(service); + let io = TokioIo::new(io); + + let builder = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new()); + let conn = builder.serve_connection_with_upgrades(io, service); + + tokio::pin!(stopped, conn); + + match future::select(conn, stopped).await { + // Return if the connection was completed. + Either::Left((conn, _)) => conn, + // If the server is stopped, we should gracefully shutdown + // the connection and poll it until it finishes. + Either::Right((_, mut conn)) => { + conn.as_mut().graceful_shutdown(); + conn.await + } + } +} + /// Helpers to deserialize a request with extensions. pub(crate) mod deserialize { /// Helper to deserialize a request with extensions. diff --git a/tests/tests/helpers.rs b/tests/tests/helpers.rs index a62875d11d..215336a6b5 100644 --- a/tests/tests/helpers.rs +++ b/tests/tests/helpers.rs @@ -34,15 +34,13 @@ use std::time::Duration; use fast_socks5::client::Socks5Stream; use fast_socks5::server; -use futures::future::{self, Either}; use futures::{SinkExt, Stream, StreamExt}; -use hyper_util::rt::{TokioExecutor, TokioIo}; use jsonrpsee::server::middleware::http::ProxyGetRequestLayer; use jsonrpsee::server::middleware::rpc::RpcServiceT; use jsonrpsee::server::{ - stop_channel, PendingSubscriptionSink, RpcModule, RpcServiceBuilder, Server, ServerBuilder, ServerHandle, - SubscriptionMessage, TrySendError, + serve_with_graceful_shutdown, stop_channel, PendingSubscriptionSink, RpcModule, RpcServiceBuilder, Server, + ServerBuilder, ServerHandle, SubscriptionMessage, TrySendError, }; use jsonrpsee::types::{ErrorObject, ErrorObjectOwned}; use jsonrpsee::{Methods, SubscriptionCloseResponse}; @@ -215,7 +213,7 @@ pub async fn server() -> SocketAddr { let stop_hdl2 = stop_hdl.clone(); let svc_builder2 = svc_builder.clone(); let conn_id2 = conn_id.clone(); - let svc = hyper::service::service_fn(move |req: hyper::Request| { + let svc = tower::service_fn(move |req: hyper::Request| { let connection_id = conn_id2.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let rpc_middleware = RpcServiceBuilder::default() .layer_fn(move |service| ConnectionDetails { inner: service, connection_id }); @@ -230,32 +228,8 @@ pub async fn server() -> SocketAddr { async move { tower_service.call(req).await.map_err(|e| anyhow::anyhow!("{:?}", e)) } }); - let stop_hdl2 = stop_hdl.clone(); // Spawn a new task to serve each respective (Hyper) connection. - tokio::spawn(async move { - let builder = hyper_util::server::conn::auto::Builder::new(TokioExecutor::new()); - let conn = builder.serve_connection_with_upgrades(TokioIo::new(stream), svc); - let stopped = stop_hdl2.shutdown(); - - // Pin the future so that it can be polled. - tokio::pin!(stopped, conn); - - let res = match future::select(conn, stopped).await { - // Return the connection if not stopped. - Either::Left((conn, _)) => conn, - // If the server is stopped, we should gracefully shutdown - // the connection and poll it until it finishes. - Either::Right((_, mut conn)) => { - conn.as_mut().graceful_shutdown(); - conn.await - } - }; - - // Log any errors that might have occurred. - if let Err(err) = res { - tracing::error!(err=?err, "HTTP connection failed"); - } - }); + tokio::spawn(serve_with_graceful_shutdown(stream, svc, stop_hdl.clone().shutdown())); } });