diff --git a/graph/src/ipfs/client.rs b/graph/src/ipfs/client.rs index d9df6cafb67..90da991152a 100644 --- a/graph/src/ipfs/client.rs +++ b/graph/src/ipfs/client.rs @@ -36,16 +36,19 @@ pub trait IpfsClient: Send + Sync + 'static { timeout: Option, retry_policy: RetryPolicy, ) -> IpfsResult>> { - let fut = retry_policy.create("IPFS.cat_stream", self.logger()).run({ - let path = path.to_owned(); + let fut = retry_policy + .create("IPFS.cat_stream", self.logger()) + .no_timeout() + .run({ + let path = path.to_owned(); - move || { - let path = path.clone(); - let client = self.clone(); + move || { + let path = path.clone(); + let client = self.clone(); - async move { client.call(IpfsRequest::Cat(path)).await } - } - }); + async move { client.call(IpfsRequest::Cat(path)).await } + } + }); let resp = run_with_optional_timeout(path, fut, timeout).await?; @@ -63,22 +66,25 @@ pub trait IpfsClient: Send + Sync + 'static { timeout: Option, retry_policy: RetryPolicy, ) -> IpfsResult { - let fut = retry_policy.create("IPFS.cat", self.logger()).run({ - let path = path.to_owned(); - - move || { - let path = path.clone(); - let client = self.clone(); - - async move { - client - .call(IpfsRequest::Cat(path)) - .await? - .bytes(Some(max_size)) - .await + let fut = retry_policy + .create("IPFS.cat", self.logger()) + .no_timeout() + .run({ + let path = path.to_owned(); + + move || { + let path = path.clone(); + let client = self.clone(); + + async move { + client + .call(IpfsRequest::Cat(path)) + .await? + .bytes(Some(max_size)) + .await + } } - } - }); + }); run_with_optional_timeout(path, fut, timeout).await } @@ -93,22 +99,25 @@ pub trait IpfsClient: Send + Sync + 'static { timeout: Option, retry_policy: RetryPolicy, ) -> IpfsResult { - let fut = retry_policy.create("IPFS.get_block", self.logger()).run({ - let path = path.to_owned(); - - move || { - let path = path.clone(); - let client = self.clone(); - - async move { - client - .call(IpfsRequest::GetBlock(path)) - .await? - .bytes(None) - .await + let fut = retry_policy + .create("IPFS.get_block", self.logger()) + .no_timeout() + .run({ + let path = path.to_owned(); + + move || { + let path = path.clone(); + let client = self.clone(); + + async move { + client + .call(IpfsRequest::GetBlock(path)) + .await? + .bytes(None) + .await + } } - } - }); + }); run_with_optional_timeout(path, fut, timeout).await } diff --git a/graph/src/ipfs/gateway_client.rs b/graph/src/ipfs/gateway_client.rs index 4f4844f0147..1ee36bb6609 100644 --- a/graph/src/ipfs/gateway_client.rs +++ b/graph/src/ipfs/gateway_client.rs @@ -80,6 +80,8 @@ impl IpfsGatewayClient { let fut = RetryPolicy::NonDeterministic .create("IPFS.Gateway.send_test_request", &self.logger) + .no_logging() + .no_timeout() .run(move || { let req = req.try_clone().expect("request can be cloned"); diff --git a/graph/src/ipfs/mod.rs b/graph/src/ipfs/mod.rs index 9770ab497db..f2131916e6d 100644 --- a/graph/src/ipfs/mod.rs +++ b/graph/src/ipfs/mod.rs @@ -1,6 +1,9 @@ use std::sync::Arc; use anyhow::anyhow; +use futures03::future::BoxFuture; +use futures03::stream::FuturesUnordered; +use futures03::stream::StreamExt; use slog::info; use slog::Logger; @@ -55,40 +58,7 @@ where SafeDisplay(server_address) ); - match IpfsGatewayClient::new(server_address, logger).await { - Ok(client) => { - info!( - logger, - "Successfully connected to IPFS gateway at: '{}'", - SafeDisplay(server_address) - ); - - clients.push(Arc::new(client)); - continue; - } - Err(err) if err.is_invalid_server() => {} - Err(err) => return Err(err), - }; - - match IpfsRpcClient::new(server_address, logger).await { - Ok(client) => { - info!( - logger, - "Successfully connected to IPFS RPC API at: '{}'", - SafeDisplay(server_address) - ); - - clients.push(Arc::new(client)); - continue; - } - Err(err) if err.is_invalid_server() => {} - Err(err) => return Err(err), - }; - - return Err(IpfsError::InvalidServer { - server_address: server_address.parse()?, - reason: anyhow!("unknown server kind"), - }); + clients.push(use_first_valid_api(server_address, logger).await?); } match clients.len() { @@ -106,3 +76,51 @@ where } } } + +async fn use_first_valid_api( + server_address: &str, + logger: &Logger, +) -> IpfsResult> { + let supported_apis: Vec>>> = vec![ + Box::pin(async { + IpfsGatewayClient::new(server_address, logger) + .await + .map(|client| { + info!( + logger, + "Successfully connected to IPFS gateway at: '{}'", + SafeDisplay(server_address) + ); + + Arc::new(client) as Arc + }) + }), + Box::pin(async { + IpfsRpcClient::new(server_address, logger) + .await + .map(|client| { + info!( + logger, + "Successfully connected to IPFS RPC API at: '{}'", + SafeDisplay(server_address) + ); + + Arc::new(client) as Arc + }) + }), + ]; + + let mut stream = supported_apis.into_iter().collect::>(); + while let Some(result) = stream.next().await { + match result { + Ok(client) => return Ok(client), + Err(err) if err.is_invalid_server() => {} + Err(err) => return Err(err), + }; + } + + Err(IpfsError::InvalidServer { + server_address: server_address.parse()?, + reason: anyhow!("unknown server kind"), + }) +} diff --git a/graph/src/ipfs/retry_policy.rs b/graph/src/ipfs/retry_policy.rs index 37ffec81372..88880d1fdab 100644 --- a/graph/src/ipfs/retry_policy.rs +++ b/graph/src/ipfs/retry_policy.rs @@ -2,7 +2,7 @@ use slog::Logger; use crate::ipfs::error::IpfsError; use crate::util::futures::retry; -use crate::util::futures::RetryConfigNoTimeout; +use crate::util::futures::RetryConfig; /// This is a safety mechanism to prevent infinite spamming of IPFS servers /// in the event of logical or unhandled deterministic errors. @@ -24,14 +24,11 @@ pub enum RetryPolicy { impl RetryPolicy { /// Creates a retry policy for every request sent to IPFS servers. - /// - /// Note: It is expected that retries will be wrapped in timeouts - /// when necessary to make them more flexible. pub(super) fn create( self, operation_name: impl ToString, logger: &Logger, - ) -> RetryConfigNoTimeout { + ) -> RetryConfig { retry(operation_name, logger) .limit(DEFAULT_MAX_ATTEMPTS) .when(move |result: &Result| match result { @@ -42,7 +39,6 @@ impl RetryPolicy { Self::NonDeterministic => !err.is_deterministic(), }, }) - .no_timeout() } } @@ -69,6 +65,7 @@ mod tests { let err = RetryPolicy::None .create::<()>("test", &discard()) + .no_timeout() .run({ let counter = counter.clone(); move || { @@ -92,6 +89,7 @@ mod tests { let err = RetryPolicy::Networking .create("test", &discard()) + .no_timeout() .run({ let counter = counter.clone(); move || { @@ -126,6 +124,7 @@ mod tests { RetryPolicy::Networking .create("test", &discard()) + .no_timeout() .run({ let counter = counter.clone(); move || { @@ -159,6 +158,7 @@ mod tests { let err = RetryPolicy::NonDeterministic .create::<()>("test", &discard()) + .no_timeout() .run({ let counter = counter.clone(); move || { @@ -190,6 +190,7 @@ mod tests { RetryPolicy::NonDeterministic .create("test", &discard()) + .no_timeout() .run({ let counter = counter.clone(); move || { diff --git a/graph/src/ipfs/rpc_client.rs b/graph/src/ipfs/rpc_client.rs index 8c0ff5f5acb..cd35d55b0ed 100644 --- a/graph/src/ipfs/rpc_client.rs +++ b/graph/src/ipfs/rpc_client.rs @@ -69,6 +69,8 @@ impl IpfsRpcClient { async fn send_test_request(&self) -> anyhow::Result<()> { let fut = RetryPolicy::NonDeterministic .create("IPFS.RPC.send_test_request", &self.logger) + .no_logging() + .no_timeout() .run({ let client = self.to_owned();