From 8e022c56c60deede858dcecac18bec8b51f0c254 Mon Sep 17 00:00:00 2001 From: Filipe Azevedo Date: Wed, 8 Mar 2023 19:21:36 +0000 Subject: [PATCH] use error code to get firehose adapter --- graph/src/blockchain/firehose_block_stream.rs | 1 + graph/src/endpoint.rs | 6 +- graph/src/firehose/endpoints.rs | 25 ++- graph/src/firehose/sf.firehose.v2.rs | 159 +++++++++++------- node/src/config.rs | 8 +- 5 files changed, 128 insertions(+), 71 deletions(-) diff --git a/graph/src/blockchain/firehose_block_stream.rs b/graph/src/blockchain/firehose_block_stream.rs index 72391a1f645..8004c241548 100644 --- a/graph/src/blockchain/firehose_block_stream.rs +++ b/graph/src/blockchain/firehose_block_stream.rs @@ -226,6 +226,7 @@ fn stream_blocks>( "start_block" => start_block_num, "subgraph" => &deployment, "cursor" => latest_cursor.to_string(), + "provider_err_count" => endpoint.current_error_count(), ); // We just reconnected, assume that we want to back off on errors diff --git a/graph/src/endpoint.rs b/graph/src/endpoint.rs index 7a8a1705313..20f80fc2711 100644 --- a/graph/src/endpoint.rs +++ b/graph/src/endpoint.rs @@ -37,6 +37,7 @@ impl std::ops::Deref for Host { } } +#[derive(Debug)] enum EndpointMetric { Success(Host), Failure(Host), @@ -50,7 +51,7 @@ pub struct EndpointMetrics { } impl EndpointMetrics { - #[cfg(debug_assertions)] + /// This should only be used for testing. pub fn noop() -> Self { use slog::{o, Discard}; let (sender, _) = mpsc::unbounded_channel(); @@ -135,7 +136,8 @@ impl EndpointMetricsProcessor { pub async fn run(mut self) { loop { - match self.receiver.recv().await { + let event = self.receiver.recv().await; + match event { Some(EndpointMetric::Success(host)) => { if let Some(count) = self.hosts.get(&host) { count.store(0, Ordering::Relaxed); diff --git a/graph/src/firehose/endpoints.rs b/graph/src/firehose/endpoints.rs index 820fe683584..023c4ba0406 100644 --- a/graph/src/firehose/endpoints.rs +++ b/graph/src/firehose/endpoints.rs @@ -12,9 +12,9 @@ use crate::{ use crate::firehose::fetch_client::FetchClient; use crate::firehose::interceptors::AuthInterceptor; -use anyhow::bail; use futures03::StreamExt; use http::uri::{Scheme, Uri}; +use itertools::Itertools; use slog::Logger; use std::{collections::BTreeMap, fmt::Display, sync::Arc, time::Duration}; use tonic::codegen::InterceptedService; @@ -136,6 +136,10 @@ impl FirehoseEndpoint { } } + pub fn current_error_count(&self) -> u64 { + self.endpoint_metrics.get_count(&self.host) + } + // we need to -1 because there will always be a reference // inside FirehoseEndpoints that is not used (is always cloned). pub fn has_subgraph_capacity(self: &Arc) -> bool { @@ -370,14 +374,17 @@ impl FirehoseEndpoints { // selects the FirehoseEndpoint with the least amount of references, which will help with spliting // the load naively across the entire list. pub fn random(&self) -> anyhow::Result> { + // This isn't really efficient, we could prolly find a better way + // since we have repeated endpoints which usually boil down to + // a handful of different hosts, perhaps a map here would be + // better. let endpoint = self .0 .iter() - .min_by_key(|x| Arc::strong_count(x)) - .ok_or(anyhow!("no available firehose endpoints"))?; - if !endpoint.has_subgraph_capacity() { - bail!("all connections saturated with {} connections, increase the firehose conn_pool_size or limit for the node", SUBGRAPHS_PER_CONN); - } + .filter(|x| x.has_subgraph_capacity()) + .sorted_by_key(|x| x.current_error_count()) + .find(|x| x.has_subgraph_capacity()) + .ok_or(anyhow!("unable to get a connection: {} connections in use, increase the firehose conn_pool_size or limit for the node", self.0.len()))?; // Cloning here ensure we have the correct count at any given time, if we return a reference it can be cloned later // which could cause a high number of endpoints to be given away before accounting for them. @@ -479,7 +486,7 @@ mod test { endpoints.remove(""); let err = endpoints.random().unwrap_err(); - assert!(err.to_string().contains("no available firehose endpoints")); + assert!(err.to_string().contains("unable to get a connection")); } #[tokio::test] @@ -511,7 +518,7 @@ mod test { endpoints.remove(""); let err = endpoints.random().unwrap_err(); - assert!(err.to_string().contains("no available firehose endpoints")); + assert!(err.to_string().contains("unable to get a connection")); } #[tokio::test] @@ -535,6 +542,6 @@ mod test { endpoints.remove(""); let err = endpoints.random().unwrap_err(); - assert!(err.to_string().contains("no available firehose endpoints")); + assert!(err.to_string().contains("unable to get a connection")); } } diff --git a/graph/src/firehose/sf.firehose.v2.rs b/graph/src/firehose/sf.firehose.v2.rs index bd7a594ba18..6a5b9d35204 100644 --- a/graph/src/firehose/sf.firehose.v2.rs +++ b/graph/src/firehose/sf.firehose.v2.rs @@ -142,8 +142,8 @@ impl ForkStep { /// Generated client implementations. pub mod stream_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] - use tonic::codegen::http::Uri; use tonic::codegen::*; + use tonic::codegen::http::Uri; #[derive(Debug, Clone)] pub struct StreamClient { inner: tonic::client::Grpc, @@ -187,8 +187,9 @@ pub mod stream_client { >::ResponseBody, >, >, - >>::Error: - Into + Send + Sync, + , + >>::Error: Into + Send + Sync, { StreamClient::new(InterceptedService::new(inner, interceptor)) } @@ -210,27 +211,32 @@ pub mod stream_client { pub async fn blocks( &mut self, request: impl tonic::IntoRequest, - ) -> Result>, tonic::Status> - { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/sf.firehose.v2.Stream/Blocks"); + ) -> Result< + tonic::Response>, + tonic::Status, + > { self.inner - .server_streaming(request.into_request(), path, codec) + .ready() .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/sf.firehose.v2.Stream/Blocks", + ); + self.inner.server_streaming(request.into_request(), path, codec).await } } } /// Generated client implementations. pub mod fetch_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] - use tonic::codegen::http::Uri; use tonic::codegen::*; + use tonic::codegen::http::Uri; #[derive(Debug, Clone)] pub struct FetchClient { inner: tonic::client::Grpc, @@ -274,8 +280,9 @@ pub mod fetch_client { >::ResponseBody, >, >, - >>::Error: - Into + Send + Sync, + , + >>::Error: Into + Send + Sync, { FetchClient::new(InterceptedService::new(inner, interceptor)) } @@ -298,14 +305,19 @@ pub mod fetch_client { &mut self, request: impl tonic::IntoRequest, ) -> Result, tonic::Status> { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/sf.firehose.v2.Fetch/Block"); + let path = http::uri::PathAndQuery::from_static( + "/sf.firehose.v2.Fetch/Block", + ); self.inner.unary(request.into_request(), path, codec).await } } @@ -318,7 +330,9 @@ pub mod stream_server { #[async_trait] pub trait Stream: Send + Sync + 'static { /// Server streaming response type for the Blocks method. - type BlocksStream: futures_core::Stream> + type BlocksStream: futures_core::Stream< + Item = Result, + > + Send + 'static; async fn blocks( @@ -345,7 +359,10 @@ pub mod stream_server { send_compression_encodings: Default::default(), } } - pub fn with_interceptor(inner: T, interceptor: F) -> InterceptedService + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService where F: tonic::service::Interceptor, { @@ -373,7 +390,10 @@ pub mod stream_server { type Response = http::Response; type Error = std::convert::Infallible; type Future = BoxFuture; - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { @@ -382,11 +402,14 @@ pub mod stream_server { "/sf.firehose.v2.Stream/Blocks" => { #[allow(non_camel_case_types)] struct BlocksSvc(pub Arc); - impl tonic::server::ServerStreamingService for BlocksSvc { + impl tonic::server::ServerStreamingService + for BlocksSvc { type Response = super::Response; type ResponseStream = T::BlocksStream; - type Future = - BoxFuture, tonic::Status>; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; fn call( &mut self, request: tonic::Request, @@ -403,23 +426,28 @@ pub mod stream_server { let inner = inner.0; let method = BlocksSvc(inner); let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec).apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ); let res = grpc.server_streaming(method, req).await; Ok(res) }; Box::pin(fut) } - _ => Box::pin(async move { - Ok(http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(empty_body()) - .unwrap()) - }), + _ => { + Box::pin(async move { + Ok( + http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap(), + ) + }) + } } } } @@ -478,7 +506,10 @@ pub mod fetch_server { send_compression_encodings: Default::default(), } } - pub fn with_interceptor(inner: T, interceptor: F) -> InterceptedService + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService where F: tonic::service::Interceptor, { @@ -506,7 +537,10 @@ pub mod fetch_server { type Response = http::Response; type Error = std::convert::Infallible; type Future = BoxFuture; - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { @@ -515,9 +549,13 @@ pub mod fetch_server { "/sf.firehose.v2.Fetch/Block" => { #[allow(non_camel_case_types)] struct BlockSvc(pub Arc); - impl tonic::server::UnaryService for BlockSvc { + impl tonic::server::UnaryService + for BlockSvc { type Response = super::SingleBlockResponse; - type Future = BoxFuture, tonic::Status>; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; fn call( &mut self, request: tonic::Request, @@ -534,23 +572,28 @@ pub mod fetch_server { let inner = inner.0; let method = BlockSvc(inner); let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec).apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ); let res = grpc.unary(method, req).await; Ok(res) }; Box::pin(fut) } - _ => Box::pin(async move { - Ok(http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(empty_body()) - .unwrap()) - }), + _ => { + Box::pin(async move { + Ok( + http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap(), + ) + }) + } } } } diff --git a/node/src/config.rs b/node/src/config.rs index 0d4dc20ea28..940433493dd 100644 --- a/node/src/config.rs +++ b/node/src/config.rs @@ -565,12 +565,16 @@ pub enum ProviderDetails { impl ProviderDetails { pub fn url(&self) -> String { - match self { + let url = match self { ProviderDetails::Substreams(firehose) | ProviderDetails::Firehose(firehose) => { firehose.url.clone() } ProviderDetails::Web3(web3) | ProviderDetails::Web3Call(web3) => web3.url.to_string(), - } + }; + + // parsing and printing here normalizes the urls so we don't have + // mismatches later on. + url.parse::().expect("failed to parse url").to_string() } }