Skip to content

Commit

Permalink
Add interceptors for metrics and auth
Browse files Browse the repository at this point in the history
  • Loading branch information
mangas committed Mar 8, 2023
1 parent ef9a318 commit d076349
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 24 deletions.
39 changes: 31 additions & 8 deletions graph/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,34 @@ use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
/// HostCount is the underlying structure to keep the count,
/// we require that all the hosts are known ahead of time, this way we can
/// avoid locking since we don't need to modify the entire struture.
type HostCount = Arc<HashMap<String, AtomicU64>>;
type HostCount = Arc<HashMap<Host, AtomicU64>>;

#[derive(Debug, Eq, PartialEq, Hash, Clone)]
pub struct Host(Box<str>);

impl From<String> for Host {
fn from(s: String) -> Self {
Host(s.into_boxed_str())
}
}

impl From<&str> for Host {
fn from(s: &str) -> Self {
Host(s.into())
}
}

impl std::ops::Deref for Host {
type Target = str;

fn deref(&self) -> &Self::Target {
&self.0
}
}

enum EndpointMetric {
Success(String),
Failure(String),
Success(Host),
Failure(Host),
}

#[derive(Debug)]
Expand All @@ -39,15 +62,15 @@ impl EndpointMetrics {
}
}

pub fn success(&self, host: String) -> anyhow::Result<()> {
pub fn success(&self, host: Host) -> anyhow::Result<()> {
if let Err(e) = self.sender.send(EndpointMetric::Success(host)) {
warn!(self.logger, "metrics channel has been closed: {}", e)
}

Ok(())
}

pub fn failure(&self, host: String) -> anyhow::Result<()> {
pub fn failure(&self, host: Host) -> anyhow::Result<()> {
if let Err(e) = self.sender.send(EndpointMetric::Failure(host)) {
warn!(self.logger, "metrics channel has been closed: {}", e)
}
Expand All @@ -57,7 +80,7 @@ impl EndpointMetrics {

/// Returns the current error count of a host or 0 if the host
/// doesn't have a value on the map.
pub fn get_count(&self, host: &str) -> u64 {
pub fn get_count(&self, host: &Host) -> u64 {
self.hosts
.get(host)
.map(|c| c.load(Ordering::Relaxed))
Expand Down Expand Up @@ -91,13 +114,13 @@ impl EndpointMetricsProcessor {
pub fn new(logger: Logger, hosts: &[String]) -> (Self, EndpointMetrics) {
let (sender, receiver) = mpsc::unbounded_channel();
let hosts = Arc::new(HashMap::from_iter(
hosts.iter().map(|h| (h.clone(), AtomicU64::new(0))),
hosts.iter().map(|h| ((*h).into(), AtomicU64::new(0))),
));

(
Self {
logger: logger.clone(),
hosts: hosts.clone(),
hosts,
receiver,
},
EndpointMetrics {
Expand Down
55 changes: 39 additions & 16 deletions graph/src/firehose/endpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
blockchain::BlockPtr,
cheap_clone::CheapClone,
components::store::BlockNumber,
endpoint::EndpointMetrics,
endpoint::{EndpointMetrics, Host},
firehose::decode_firehose_block,
prelude::{anyhow, debug, info},
substreams,
Expand All @@ -24,7 +24,7 @@ use tonic::{
transport::{Channel, ClientTlsConfig},
};

use super::{codec as firehose, stream_client::StreamClient};
use super::{codec as firehose, interceptors::MetricsInterceptor, stream_client::StreamClient};

/// This is constant because we found this magic number of connections after
/// which the grpc connections start to hang.
Expand All @@ -34,6 +34,7 @@ pub const SUBGRAPHS_PER_CONN: usize = 100;
#[derive(Debug)]
pub struct FirehoseEndpoint {
pub provider: String,
pub host: Host,
pub auth: AuthInterceptor,
pub filters_enabled: bool,
pub compression_enabled: bool,
Expand Down Expand Up @@ -80,6 +81,7 @@ impl FirehoseEndpoint {
.as_ref()
.parse::<Uri>()
.expect("the url should have been validated by now, so it is a valid Uri");
let host = Host::from(uri.to_string());

let endpoint_builder = match uri.scheme().unwrap_or(&Scheme::HTTP).as_str() {
"http" => Channel::builder(uri),
Expand Down Expand Up @@ -130,6 +132,7 @@ impl FirehoseEndpoint {
compression_enabled,
subgraph_limit,
endpoint_metrics,
host,
}
}

Expand All @@ -142,10 +145,19 @@ impl FirehoseEndpoint {

fn new_client(
&self,
) -> FetchClient<InterceptedService<Channel, impl tonic::service::Interceptor>> {
let mut client =
FetchClient::with_interceptor(self.channel.cheap_clone(), self.auth.clone())
.accept_compressed(CompressionEncoding::Gzip);
) -> FetchClient<
InterceptedService<MetricsInterceptor<Channel>, impl tonic::service::Interceptor>,
> {
let metrics = MetricsInterceptor {
metrics: self.endpoint_metrics.cheap_clone(),
service: self.channel.cheap_clone(),
host: self.host.clone(),
};

let mut client: FetchClient<
InterceptedService<MetricsInterceptor<Channel>, AuthInterceptor>,
> = FetchClient::with_interceptor(metrics, self.auth.clone())
.accept_compressed(CompressionEncoding::Gzip);

if self.compression_enabled {
client = client.send_compressed(CompressionEncoding::Gzip);
Expand All @@ -156,10 +168,17 @@ impl FirehoseEndpoint {

fn new_stream_client(
&self,
) -> StreamClient<InterceptedService<Channel, impl tonic::service::Interceptor>> {
let mut client =
StreamClient::with_interceptor(self.channel.cheap_clone(), self.auth.clone())
.accept_compressed(CompressionEncoding::Gzip);
) -> StreamClient<
InterceptedService<MetricsInterceptor<Channel>, impl tonic::service::Interceptor>,
> {
let metrics = MetricsInterceptor {
metrics: self.endpoint_metrics.cheap_clone(),
service: self.channel.cheap_clone(),
host: self.host.clone(),
};

let mut client = StreamClient::with_interceptor(metrics, self.auth.clone())
.accept_compressed(CompressionEncoding::Gzip);

if self.compression_enabled {
client = client.send_compressed(CompressionEncoding::Gzip);
Expand All @@ -171,13 +190,17 @@ impl FirehoseEndpoint {
fn new_substreams_client(
&self,
) -> substreams::stream_client::StreamClient<
InterceptedService<Channel, impl tonic::service::Interceptor>,
InterceptedService<MetricsInterceptor<Channel>, impl tonic::service::Interceptor>,
> {
let mut client = substreams::stream_client::StreamClient::with_interceptor(
self.channel.cheap_clone(),
self.auth.clone(),
)
.accept_compressed(CompressionEncoding::Gzip);
let metrics = MetricsInterceptor {
metrics: self.endpoint_metrics.cheap_clone(),
service: self.channel.cheap_clone(),
host: self.host.clone(),
};

let mut client =
substreams::stream_client::StreamClient::with_interceptor(metrics, self.auth.clone())
.accept_compressed(CompressionEncoding::Gzip);

if self.compression_enabled {
client = client.send_compressed(CompressionEncoding::Gzip);
Expand Down
51 changes: 51 additions & 0 deletions graph/src/firehose/interceptors.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
use std::future::Future;
use std::pin::Pin;
use std::{fmt, sync::Arc};

use tonic::{
codegen::Service,
metadata::{Ascii, MetadataValue},
service::Interceptor,
};

use crate::endpoint::{EndpointMetrics, Host};

#[derive(Clone)]
pub struct AuthInterceptor {
pub token: Option<MetadataValue<Ascii>>,
Expand All @@ -26,3 +33,47 @@ impl Interceptor for AuthInterceptor {
Ok(req)
}
}

pub struct MetricsInterceptor<S> {
metrics: Arc<EndpointMetrics>,
service: S,
host: Host,
}

impl<S, Request> Service<Request> for MetricsInterceptor<S>
where
S: Service<Request>,
S::Future: 'static,
Request: fmt::Debug,
{
type Response = S::Response;

type Error = S::Error;

type Future = Pin<Box<dyn Future<Output = <S::Future as Future>::Output>>>;

fn poll_ready(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
self.service.poll_ready(cx)
}

fn call(&mut self, req: Request) -> Self::Future {
let host = self.host.clone();
let metrics = self.metrics.clone();

let fut = self.service.call(req);
let res = async move {
let res = fut.await;
if res.is_ok() {
metrics.success(host).unwrap_or_default();
} else {
metrics.failure(host).unwrap_or_default();
}
res
};

Box::pin(res)
}
}

0 comments on commit d076349

Please sign in to comment.