Skip to content

Commit

Permalink
address code review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
mangas committed Mar 10, 2023
1 parent faf558d commit b6242bb
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 27 deletions.
5 changes: 1 addition & 4 deletions graph/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,7 @@ impl EndpointMetrics {
.map(|h| (Host::from(h.as_ref()), AtomicU64::new(0))),
));

Self {
logger: logger.clone(),
hosts: hosts.clone(),
}
Self { logger, hosts }
}

/// This should only be used for testing.
Expand Down
48 changes: 25 additions & 23 deletions graph/src/firehose/endpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl SubgraphLimit {
SubgraphLimit::Limit(total) => {
let total = *total;
if current >= total {
return AvailableCapacity::Unavailable
return AvailableCapacity::Unavailable;
}

let used_percent = current * 100 / total;
Expand Down Expand Up @@ -186,7 +186,7 @@ impl FirehoseEndpoint {
pub fn get_capacity(self: &Arc<Self>) -> AvailableCapacity {
self.subgraph_limit
.get_capacity(Arc::strong_count(self).saturating_sub(1))
}
}

fn new_client(
&self,
Expand Down Expand Up @@ -412,30 +412,33 @@ impl FirehoseEndpoints {
self.0.len()
}

/// This function will attempt to grab an endpoint based on the following priority:
/// 1. Lowest error count with high capacity available.
/// 2. Lowest error count that has any capacity
/// If an adapter cannot be found `endpoint` will return an error.
/// This function will attempt to grab an endpoint based on the Lowest error count
// with high capacity available. If an adapter cannot be found `endpoint` will
// return an error.
pub fn endpoint(&self) -> anyhow::Result<Arc<FirehoseEndpoint>> {
let endpoint = self.0.iter().sorted_by_key(|x| x.current_error_count()).try_fold(None,|acc,adapter| {
match adapter.get_capacity() {
AvailableCapacity::Unavailable => ControlFlow::Continue(acc),
AvailableCapacity::Low => match acc {
Some(_) => ControlFlow::Continue(acc),
None => ControlFlow::Continue(Some(adapter)),
let endpoint = self
.0
.iter()
.sorted_by_key(|x| x.current_error_count())
.try_fold(None, |acc, adapter| {
match adapter.get_capacity() {
AvailableCapacity::Unavailable => ControlFlow::Continue(acc),
AvailableCapacity::Low => match acc {
Some(_) => ControlFlow::Continue(acc),
None => ControlFlow::Continue(Some(adapter)),
},
// This means that if all adapters with low/no errors are low capacity
// we will retry the high capacity that has errors, at this point
// any other available with no errors are almost at their limit.
AvailableCapacity::High => ControlFlow::Break(Some(adapter)),
}
// This means that if all adapters with low/no errors are low capacity
// we will retry the high capacity that has errors, at this point
// any other available with no errors are almost at their limit.
AvailableCapacity::High => ControlFlow::Break(Some(adapter)),
}
});

});

match endpoint {
ControlFlow::Continue(adapter)|ControlFlow::Break(adapter) =>
adapter.cloned().ok_or(anyhow!("unable to get a connection, increase the firehose conn_pool_size or limit for the node"))
ControlFlow::Continue(adapter)
| ControlFlow::Break(adapter) =>
adapter.cloned().ok_or(anyhow!("unable to get a connection, increase the firehose conn_pool_size or limit for the node"))
}

}

pub fn remove(&mut self, provider: &str) {
Expand Down Expand Up @@ -652,7 +655,6 @@ mod test {
high_availability.clone(),
]);


let res = endpoints.endpoint().unwrap();
assert_eq!(res.provider, high_availability.provider);

Expand Down

0 comments on commit b6242bb

Please sign in to comment.