Skip to content

Commit

Permalink
use error code to get firehose adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
mangas committed Mar 8, 2023
1 parent 06c4dee commit 8e022c5
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 71 deletions.
1 change: 1 addition & 0 deletions graph/src/blockchain/firehose_block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ fn stream_blocks<C: Blockchain, F: FirehoseMapper<C>>(
"start_block" => start_block_num,
"subgraph" => &deployment,
"cursor" => latest_cursor.to_string(),
"provider_err_count" => endpoint.current_error_count(),
);

// We just reconnected, assume that we want to back off on errors
Expand Down
6 changes: 4 additions & 2 deletions graph/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ impl std::ops::Deref for Host {
}
}

#[derive(Debug)]
enum EndpointMetric {
Success(Host),
Failure(Host),
Expand All @@ -50,7 +51,7 @@ pub struct EndpointMetrics {
}

impl EndpointMetrics {
#[cfg(debug_assertions)]
/// This should only be used for testing.
pub fn noop() -> Self {
use slog::{o, Discard};
let (sender, _) = mpsc::unbounded_channel();
Expand Down Expand Up @@ -135,7 +136,8 @@ impl EndpointMetricsProcessor {

pub async fn run(mut self) {
loop {
match self.receiver.recv().await {
let event = self.receiver.recv().await;
match event {
Some(EndpointMetric::Success(host)) => {
if let Some(count) = self.hosts.get(&host) {
count.store(0, Ordering::Relaxed);
Expand Down
25 changes: 16 additions & 9 deletions graph/src/firehose/endpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ use crate::{

use crate::firehose::fetch_client::FetchClient;
use crate::firehose::interceptors::AuthInterceptor;
use anyhow::bail;
use futures03::StreamExt;
use http::uri::{Scheme, Uri};
use itertools::Itertools;
use slog::Logger;
use std::{collections::BTreeMap, fmt::Display, sync::Arc, time::Duration};
use tonic::codegen::InterceptedService;
Expand Down Expand Up @@ -136,6 +136,10 @@ impl FirehoseEndpoint {
}
}

pub fn current_error_count(&self) -> u64 {
self.endpoint_metrics.get_count(&self.host)
}

// we need to -1 because there will always be a reference
// inside FirehoseEndpoints that is not used (is always cloned).
pub fn has_subgraph_capacity(self: &Arc<Self>) -> bool {
Expand Down Expand Up @@ -370,14 +374,17 @@ impl FirehoseEndpoints {
// selects the FirehoseEndpoint with the least amount of references, which will help with spliting
// the load naively across the entire list.
pub fn random(&self) -> anyhow::Result<Arc<FirehoseEndpoint>> {
// This isn't really efficient, we could prolly find a better way
// since we have repeated endpoints which usually boil down to
// a handful of different hosts, perhaps a map here would be
// better.
let endpoint = self
.0
.iter()
.min_by_key(|x| Arc::strong_count(x))
.ok_or(anyhow!("no available firehose endpoints"))?;
if !endpoint.has_subgraph_capacity() {
bail!("all connections saturated with {} connections, increase the firehose conn_pool_size or limit for the node", SUBGRAPHS_PER_CONN);
}
.filter(|x| x.has_subgraph_capacity())
.sorted_by_key(|x| x.current_error_count())
.find(|x| x.has_subgraph_capacity())
.ok_or(anyhow!("unable to get a connection: {} connections in use, increase the firehose conn_pool_size or limit for the node", self.0.len()))?;

// Cloning here ensure we have the correct count at any given time, if we return a reference it can be cloned later
// which could cause a high number of endpoints to be given away before accounting for them.
Expand Down Expand Up @@ -479,7 +486,7 @@ mod test {
endpoints.remove("");

let err = endpoints.random().unwrap_err();
assert!(err.to_string().contains("no available firehose endpoints"));
assert!(err.to_string().contains("unable to get a connection"));
}

#[tokio::test]
Expand Down Expand Up @@ -511,7 +518,7 @@ mod test {
endpoints.remove("");

let err = endpoints.random().unwrap_err();
assert!(err.to_string().contains("no available firehose endpoints"));
assert!(err.to_string().contains("unable to get a connection"));
}

#[tokio::test]
Expand All @@ -535,6 +542,6 @@ mod test {
endpoints.remove("");

let err = endpoints.random().unwrap_err();
assert!(err.to_string().contains("no available firehose endpoints"));
assert!(err.to_string().contains("unable to get a connection"));
}
}
159 changes: 101 additions & 58 deletions graph/src/firehose/sf.firehose.v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ impl ForkStep {
/// Generated client implementations.
pub mod stream_client {
#![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)]
use tonic::codegen::http::Uri;
use tonic::codegen::*;
use tonic::codegen::http::Uri;
#[derive(Debug, Clone)]
pub struct StreamClient<T> {
inner: tonic::client::Grpc<T>,
Expand Down Expand Up @@ -187,8 +187,9 @@ pub mod stream_client {
<T as tonic::client::GrpcService<tonic::body::BoxBody>>::ResponseBody,
>,
>,
<T as tonic::codegen::Service<http::Request<tonic::body::BoxBody>>>::Error:
Into<StdError> + Send + Sync,
<T as tonic::codegen::Service<
http::Request<tonic::body::BoxBody>,
>>::Error: Into<StdError> + Send + Sync,
{
StreamClient::new(InterceptedService::new(inner, interceptor))
}
Expand All @@ -210,27 +211,32 @@ pub mod stream_client {
pub async fn blocks(
&mut self,
request: impl tonic::IntoRequest<super::Request>,
) -> Result<tonic::Response<tonic::codec::Streaming<super::Response>>, tonic::Status>
{
self.inner.ready().await.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static("/sf.firehose.v2.Stream/Blocks");
) -> Result<
tonic::Response<tonic::codec::Streaming<super::Response>>,
tonic::Status,
> {
self.inner
.server_streaming(request.into_request(), path, codec)
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static(
"/sf.firehose.v2.Stream/Blocks",
);
self.inner.server_streaming(request.into_request(), path, codec).await
}
}
}
/// Generated client implementations.
pub mod fetch_client {
#![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)]
use tonic::codegen::http::Uri;
use tonic::codegen::*;
use tonic::codegen::http::Uri;
#[derive(Debug, Clone)]
pub struct FetchClient<T> {
inner: tonic::client::Grpc<T>,
Expand Down Expand Up @@ -274,8 +280,9 @@ pub mod fetch_client {
<T as tonic::client::GrpcService<tonic::body::BoxBody>>::ResponseBody,
>,
>,
<T as tonic::codegen::Service<http::Request<tonic::body::BoxBody>>>::Error:
Into<StdError> + Send + Sync,
<T as tonic::codegen::Service<
http::Request<tonic::body::BoxBody>,
>>::Error: Into<StdError> + Send + Sync,
{
FetchClient::new(InterceptedService::new(inner, interceptor))
}
Expand All @@ -298,14 +305,19 @@ pub mod fetch_client {
&mut self,
request: impl tonic::IntoRequest<super::SingleBlockRequest>,
) -> Result<tonic::Response<super::SingleBlockResponse>, tonic::Status> {
self.inner.ready().await.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::new(
tonic::Code::Unknown,
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static("/sf.firehose.v2.Fetch/Block");
let path = http::uri::PathAndQuery::from_static(
"/sf.firehose.v2.Fetch/Block",
);
self.inner.unary(request.into_request(), path, codec).await
}
}
Expand All @@ -318,7 +330,9 @@ pub mod stream_server {
#[async_trait]
pub trait Stream: Send + Sync + 'static {
/// Server streaming response type for the Blocks method.
type BlocksStream: futures_core::Stream<Item = Result<super::Response, tonic::Status>>
type BlocksStream: futures_core::Stream<
Item = Result<super::Response, tonic::Status>,
>
+ Send
+ 'static;
async fn blocks(
Expand All @@ -345,7 +359,10 @@ pub mod stream_server {
send_compression_encodings: Default::default(),
}
}
pub fn with_interceptor<F>(inner: T, interceptor: F) -> InterceptedService<Self, F>
pub fn with_interceptor<F>(
inner: T,
interceptor: F,
) -> InterceptedService<Self, F>
where
F: tonic::service::Interceptor,
{
Expand Down Expand Up @@ -373,7 +390,10 @@ pub mod stream_server {
type Response = http::Response<tonic::body::BoxBody>;
type Error = std::convert::Infallible;
type Future = BoxFuture<Self::Response, Self::Error>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
fn poll_ready(
&mut self,
_cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: http::Request<B>) -> Self::Future {
Expand All @@ -382,11 +402,14 @@ pub mod stream_server {
"/sf.firehose.v2.Stream/Blocks" => {
#[allow(non_camel_case_types)]
struct BlocksSvc<T: Stream>(pub Arc<T>);
impl<T: Stream> tonic::server::ServerStreamingService<super::Request> for BlocksSvc<T> {
impl<T: Stream> tonic::server::ServerStreamingService<super::Request>
for BlocksSvc<T> {
type Response = super::Response;
type ResponseStream = T::BlocksStream;
type Future =
BoxFuture<tonic::Response<Self::ResponseStream>, tonic::Status>;
type Future = BoxFuture<
tonic::Response<Self::ResponseStream>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::Request>,
Expand All @@ -403,23 +426,28 @@ pub mod stream_server {
let inner = inner.0;
let method = BlocksSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec).apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
);
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
);
let res = grpc.server_streaming(method, req).await;
Ok(res)
};
Box::pin(fut)
}
_ => Box::pin(async move {
Ok(http::Response::builder()
.status(200)
.header("grpc-status", "12")
.header("content-type", "application/grpc")
.body(empty_body())
.unwrap())
}),
_ => {
Box::pin(async move {
Ok(
http::Response::builder()
.status(200)
.header("grpc-status", "12")
.header("content-type", "application/grpc")
.body(empty_body())
.unwrap(),
)
})
}
}
}
}
Expand Down Expand Up @@ -478,7 +506,10 @@ pub mod fetch_server {
send_compression_encodings: Default::default(),
}
}
pub fn with_interceptor<F>(inner: T, interceptor: F) -> InterceptedService<Self, F>
pub fn with_interceptor<F>(
inner: T,
interceptor: F,
) -> InterceptedService<Self, F>
where
F: tonic::service::Interceptor,
{
Expand Down Expand Up @@ -506,7 +537,10 @@ pub mod fetch_server {
type Response = http::Response<tonic::body::BoxBody>;
type Error = std::convert::Infallible;
type Future = BoxFuture<Self::Response, Self::Error>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
fn poll_ready(
&mut self,
_cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: http::Request<B>) -> Self::Future {
Expand All @@ -515,9 +549,13 @@ pub mod fetch_server {
"/sf.firehose.v2.Fetch/Block" => {
#[allow(non_camel_case_types)]
struct BlockSvc<T: Fetch>(pub Arc<T>);
impl<T: Fetch> tonic::server::UnaryService<super::SingleBlockRequest> for BlockSvc<T> {
impl<T: Fetch> tonic::server::UnaryService<super::SingleBlockRequest>
for BlockSvc<T> {
type Response = super::SingleBlockResponse;
type Future = BoxFuture<tonic::Response<Self::Response>, tonic::Status>;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::SingleBlockRequest>,
Expand All @@ -534,23 +572,28 @@ pub mod fetch_server {
let inner = inner.0;
let method = BlockSvc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec).apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
);
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
_ => Box::pin(async move {
Ok(http::Response::builder()
.status(200)
.header("grpc-status", "12")
.header("content-type", "application/grpc")
.body(empty_body())
.unwrap())
}),
_ => {
Box::pin(async move {
Ok(
http::Response::builder()
.status(200)
.header("grpc-status", "12")
.header("content-type", "application/grpc")
.body(empty_body())
.unwrap(),
)
})
}
}
}
}
Expand Down
Loading

0 comments on commit 8e022c5

Please sign in to comment.