Skip to content

Commit

Permalink
ipfs: improve ipfs client selection (#5723)
Browse files Browse the repository at this point in the history
  • Loading branch information
isum authored Jan 8, 2025
1 parent 1d29605 commit 453bcea
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 78 deletions.
85 changes: 47 additions & 38 deletions graph/src/ipfs/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,19 @@ pub trait IpfsClient: Send + Sync + 'static {
timeout: Option<Duration>,
retry_policy: RetryPolicy,
) -> IpfsResult<BoxStream<'static, IpfsResult<Bytes>>> {
let fut = retry_policy.create("IPFS.cat_stream", self.logger()).run({
let path = path.to_owned();
let fut = retry_policy
.create("IPFS.cat_stream", self.logger())
.no_timeout()
.run({
let path = path.to_owned();

move || {
let path = path.clone();
let client = self.clone();
move || {
let path = path.clone();
let client = self.clone();

async move { client.call(IpfsRequest::Cat(path)).await }
}
});
async move { client.call(IpfsRequest::Cat(path)).await }
}
});

let resp = run_with_optional_timeout(path, fut, timeout).await?;

Expand All @@ -63,22 +66,25 @@ pub trait IpfsClient: Send + Sync + 'static {
timeout: Option<Duration>,
retry_policy: RetryPolicy,
) -> IpfsResult<Bytes> {
let fut = retry_policy.create("IPFS.cat", self.logger()).run({
let path = path.to_owned();

move || {
let path = path.clone();
let client = self.clone();

async move {
client
.call(IpfsRequest::Cat(path))
.await?
.bytes(Some(max_size))
.await
let fut = retry_policy
.create("IPFS.cat", self.logger())
.no_timeout()
.run({
let path = path.to_owned();

move || {
let path = path.clone();
let client = self.clone();

async move {
client
.call(IpfsRequest::Cat(path))
.await?
.bytes(Some(max_size))
.await
}
}
}
});
});

run_with_optional_timeout(path, fut, timeout).await
}
Expand All @@ -93,22 +99,25 @@ pub trait IpfsClient: Send + Sync + 'static {
timeout: Option<Duration>,
retry_policy: RetryPolicy,
) -> IpfsResult<Bytes> {
let fut = retry_policy.create("IPFS.get_block", self.logger()).run({
let path = path.to_owned();

move || {
let path = path.clone();
let client = self.clone();

async move {
client
.call(IpfsRequest::GetBlock(path))
.await?
.bytes(None)
.await
let fut = retry_policy
.create("IPFS.get_block", self.logger())
.no_timeout()
.run({
let path = path.to_owned();

move || {
let path = path.clone();
let client = self.clone();

async move {
client
.call(IpfsRequest::GetBlock(path))
.await?
.bytes(None)
.await
}
}
}
});
});

run_with_optional_timeout(path, fut, timeout).await
}
Expand Down
2 changes: 2 additions & 0 deletions graph/src/ipfs/gateway_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ impl IpfsGatewayClient {

let fut = RetryPolicy::NonDeterministic
.create("IPFS.Gateway.send_test_request", &self.logger)
.no_logging()
.no_timeout()
.run(move || {
let req = req.try_clone().expect("request can be cloned");

Expand Down
86 changes: 52 additions & 34 deletions graph/src/ipfs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::sync::Arc;

use anyhow::anyhow;
use futures03::future::BoxFuture;
use futures03::stream::FuturesUnordered;
use futures03::stream::StreamExt;
use slog::info;
use slog::Logger;

Expand Down Expand Up @@ -55,40 +58,7 @@ where
SafeDisplay(server_address)
);

match IpfsGatewayClient::new(server_address, logger).await {
Ok(client) => {
info!(
logger,
"Successfully connected to IPFS gateway at: '{}'",
SafeDisplay(server_address)
);

clients.push(Arc::new(client));
continue;
}
Err(err) if err.is_invalid_server() => {}
Err(err) => return Err(err),
};

match IpfsRpcClient::new(server_address, logger).await {
Ok(client) => {
info!(
logger,
"Successfully connected to IPFS RPC API at: '{}'",
SafeDisplay(server_address)
);

clients.push(Arc::new(client));
continue;
}
Err(err) if err.is_invalid_server() => {}
Err(err) => return Err(err),
};

return Err(IpfsError::InvalidServer {
server_address: server_address.parse()?,
reason: anyhow!("unknown server kind"),
});
clients.push(use_first_valid_api(server_address, logger).await?);
}

match clients.len() {
Expand All @@ -106,3 +76,51 @@ where
}
}
}

async fn use_first_valid_api(
server_address: &str,
logger: &Logger,
) -> IpfsResult<Arc<dyn IpfsClient>> {
let supported_apis: Vec<BoxFuture<IpfsResult<Arc<dyn IpfsClient>>>> = vec![
Box::pin(async {
IpfsGatewayClient::new(server_address, logger)
.await
.map(|client| {
info!(
logger,
"Successfully connected to IPFS gateway at: '{}'",
SafeDisplay(server_address)
);

Arc::new(client) as Arc<dyn IpfsClient>
})
}),
Box::pin(async {
IpfsRpcClient::new(server_address, logger)
.await
.map(|client| {
info!(
logger,
"Successfully connected to IPFS RPC API at: '{}'",
SafeDisplay(server_address)
);

Arc::new(client) as Arc<dyn IpfsClient>
})
}),
];

let mut stream = supported_apis.into_iter().collect::<FuturesUnordered<_>>();
while let Some(result) = stream.next().await {
match result {
Ok(client) => return Ok(client),
Err(err) if err.is_invalid_server() => {}
Err(err) => return Err(err),
};
}

Err(IpfsError::InvalidServer {
server_address: server_address.parse()?,
reason: anyhow!("unknown server kind"),
})
}
13 changes: 7 additions & 6 deletions graph/src/ipfs/retry_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use slog::Logger;

use crate::ipfs::error::IpfsError;
use crate::util::futures::retry;
use crate::util::futures::RetryConfigNoTimeout;
use crate::util::futures::RetryConfig;

/// This is a safety mechanism to prevent infinite spamming of IPFS servers
/// in the event of logical or unhandled deterministic errors.
Expand All @@ -24,14 +24,11 @@ pub enum RetryPolicy {

impl RetryPolicy {
/// Creates a retry policy for every request sent to IPFS servers.
///
/// Note: It is expected that retries will be wrapped in timeouts
/// when necessary to make them more flexible.
pub(super) fn create<O: Send + Sync + 'static>(
self,
operation_name: impl ToString,
logger: &Logger,
) -> RetryConfigNoTimeout<O, IpfsError> {
) -> RetryConfig<O, IpfsError> {
retry(operation_name, logger)
.limit(DEFAULT_MAX_ATTEMPTS)
.when(move |result: &Result<O, IpfsError>| match result {
Expand All @@ -42,7 +39,6 @@ impl RetryPolicy {
Self::NonDeterministic => !err.is_deterministic(),
},
})
.no_timeout()
}
}

Expand All @@ -69,6 +65,7 @@ mod tests {

let err = RetryPolicy::None
.create::<()>("test", &discard())
.no_timeout()
.run({
let counter = counter.clone();
move || {
Expand All @@ -92,6 +89,7 @@ mod tests {

let err = RetryPolicy::Networking
.create("test", &discard())
.no_timeout()
.run({
let counter = counter.clone();
move || {
Expand Down Expand Up @@ -126,6 +124,7 @@ mod tests {

RetryPolicy::Networking
.create("test", &discard())
.no_timeout()
.run({
let counter = counter.clone();
move || {
Expand Down Expand Up @@ -159,6 +158,7 @@ mod tests {

let err = RetryPolicy::NonDeterministic
.create::<()>("test", &discard())
.no_timeout()
.run({
let counter = counter.clone();
move || {
Expand Down Expand Up @@ -190,6 +190,7 @@ mod tests {

RetryPolicy::NonDeterministic
.create("test", &discard())
.no_timeout()
.run({
let counter = counter.clone();
move || {
Expand Down
2 changes: 2 additions & 0 deletions graph/src/ipfs/rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ impl IpfsRpcClient {
async fn send_test_request(&self) -> anyhow::Result<()> {
let fut = RetryPolicy::NonDeterministic
.create("IPFS.RPC.send_test_request", &self.logger)
.no_logging()
.no_timeout()
.run({
let client = self.to_owned();

Expand Down

0 comments on commit 453bcea

Please sign in to comment.