Skip to content

Commit

Permalink
graph, core: add deployment_status metric
Browse files Browse the repository at this point in the history
  • Loading branch information
isum committed Jan 16, 2025
1 parent 2be1cc3 commit 295af3d
Show file tree
Hide file tree
Showing 6 changed files with 243 additions and 97 deletions.
191 changes: 111 additions & 80 deletions core/src/subgraph/instance_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::subgraph::runner::SubgraphRunner;
use graph::blockchain::block_stream::BlockStreamMetrics;
use graph::blockchain::{Blockchain, BlockchainKind, DataSource, NodeCapabilities};
use graph::components::metrics::gas::GasMetrics;
use graph::components::metrics::subgraph::DeploymentStatusMetric;
use graph::components::subgraph::ProofOfIndexingVersion;
use graph::data::subgraph::{UnresolvedSubgraphManifest, SPEC_VERSION_0_0_6};
use graph::data::value::Word;
Expand Down Expand Up @@ -69,77 +70,91 @@ impl<S: SubgraphStore> SubgraphInstanceManagerTrait for SubgraphInstanceManager<
let err_logger = logger.clone();
let instance_manager = self.cheap_clone();

let subgraph_start_future = async move {
match BlockchainKind::from_manifest(&manifest)? {
BlockchainKind::Arweave => {
let runner = instance_manager
.build_subgraph_runner::<graph_chain_arweave::Chain>(
logger.clone(),
self.env_vars.cheap_clone(),
loc.clone(),
manifest,
stop_block,
Box::new(SubgraphTriggerProcessor {}),
)
.await?;

self.start_subgraph_inner(logger, loc, runner).await
}
BlockchainKind::Ethereum => {
let runner = instance_manager
.build_subgraph_runner::<graph_chain_ethereum::Chain>(
logger.clone(),
self.env_vars.cheap_clone(),
loc.clone(),
manifest,
stop_block,
Box::new(SubgraphTriggerProcessor {}),
)
.await?;

self.start_subgraph_inner(logger, loc, runner).await
}
BlockchainKind::Near => {
let runner = instance_manager
.build_subgraph_runner::<graph_chain_near::Chain>(
logger.clone(),
self.env_vars.cheap_clone(),
loc.clone(),
manifest,
stop_block,
Box::new(SubgraphTriggerProcessor {}),
)
.await?;

self.start_subgraph_inner(logger, loc, runner).await
}
BlockchainKind::Cosmos => {
let runner = instance_manager
.build_subgraph_runner::<graph_chain_cosmos::Chain>(
logger.clone(),
self.env_vars.cheap_clone(),
loc.clone(),
manifest,
stop_block,
Box::new(SubgraphTriggerProcessor {}),
)
.await?;

self.start_subgraph_inner(logger, loc, runner).await
}
BlockchainKind::Substreams => {
let runner = instance_manager
.build_subgraph_runner::<graph_chain_substreams::Chain>(
logger.clone(),
self.env_vars.cheap_clone(),
loc.cheap_clone(),
manifest,
stop_block,
Box::new(graph_chain_substreams::TriggerProcessor::new(loc.clone())),
)
.await?;

self.start_subgraph_inner(logger, loc, runner).await
let deployment_status_metric = self.new_deployment_status_metric(&loc);
deployment_status_metric.starting();

let subgraph_start_future = {
let deployment_status_metric = deployment_status_metric.clone();

async move {
match BlockchainKind::from_manifest(&manifest)? {
BlockchainKind::Arweave => {
let runner = instance_manager
.build_subgraph_runner::<graph_chain_arweave::Chain>(
logger.clone(),
self.env_vars.cheap_clone(),
loc.clone(),
manifest,
stop_block,
Box::new(SubgraphTriggerProcessor {}),
deployment_status_metric,
)
.await?;

self.start_subgraph_inner(logger, loc, runner).await
}
BlockchainKind::Ethereum => {
let runner = instance_manager
.build_subgraph_runner::<graph_chain_ethereum::Chain>(
logger.clone(),
self.env_vars.cheap_clone(),
loc.clone(),
manifest,
stop_block,
Box::new(SubgraphTriggerProcessor {}),
deployment_status_metric,
)
.await?;

self.start_subgraph_inner(logger, loc, runner).await
}
BlockchainKind::Near => {
let runner = instance_manager
.build_subgraph_runner::<graph_chain_near::Chain>(
logger.clone(),
self.env_vars.cheap_clone(),
loc.clone(),
manifest,
stop_block,
Box::new(SubgraphTriggerProcessor {}),
deployment_status_metric,
)
.await?;

self.start_subgraph_inner(logger, loc, runner).await
}
BlockchainKind::Cosmos => {
let runner = instance_manager
.build_subgraph_runner::<graph_chain_cosmos::Chain>(
logger.clone(),
self.env_vars.cheap_clone(),
loc.clone(),
manifest,
stop_block,
Box::new(SubgraphTriggerProcessor {}),
deployment_status_metric,
)
.await?;

self.start_subgraph_inner(logger, loc, runner).await
}
BlockchainKind::Substreams => {
let runner = instance_manager
.build_subgraph_runner::<graph_chain_substreams::Chain>(
logger.clone(),
self.env_vars.cheap_clone(),
loc.cheap_clone(),
manifest,
stop_block,
Box::new(graph_chain_substreams::TriggerProcessor::new(
loc.clone(),
)),
deployment_status_metric,
)
.await?;

self.start_subgraph_inner(logger, loc, runner).await
}
}
}
};
Expand All @@ -152,12 +167,16 @@ impl<S: SubgraphStore> SubgraphInstanceManagerTrait for SubgraphInstanceManager<
graph::spawn(async move {
match subgraph_start_future.await {
Ok(()) => {}
Err(err) => error!(
err_logger,
"Failed to start subgraph";
"error" => format!("{:#}", err),
"code" => LogCode::SubgraphStartFailure
),
Err(err) => {
deployment_status_metric.failed();

error!(
err_logger,
"Failed to start subgraph";
"error" => format!("{:#}", err),
"code" => LogCode::SubgraphStartFailure
);
}
}
});
}
Expand Down Expand Up @@ -217,6 +236,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
manifest: serde_yaml::Mapping,
stop_block: Option<BlockNumber>,
tp: Box<dyn TriggerProcessor<C, RuntimeHostBuilder<C>>>,
deployment_status_metric: DeploymentStatusMetric,
) -> anyhow::Result<SubgraphRunner<C, RuntimeHostBuilder<C>>>
where
C: Blockchain,
Expand Down Expand Up @@ -387,6 +407,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
registry.cheap_clone(),
deployment.hash.as_str(),
stopwatch_metrics.clone(),
deployment_status_metric,
));

let block_stream_metrics = Arc::new(BlockStreamMetrics::new(
Expand Down Expand Up @@ -496,7 +517,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
<C as Blockchain>::MappingTrigger: ToAscPtr,
{
let registry = self.metrics_registry.cheap_clone();
let subgraph_metrics_unregister = runner.metrics.subgraph.cheap_clone();
let subgraph_metrics = runner.metrics.subgraph.cheap_clone();

// Keep restarting the subgraph until it terminates. The subgraph
// will usually only run once, but is restarted whenever a block
Expand All @@ -513,20 +534,30 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
// https://github.com/tokio-rs/tokio/issues/3493.
graph::spawn_thread(deployment.to_string(), move || {
match graph::block_on(task::unconstrained(runner.run())) {
Ok(()) => {}
Ok(()) => {
subgraph_metrics.deployment_status.stopped();
}
Err(SubgraphRunnerError::Duplicate) => {
// We do not need to unregister metrics because they are unique per subgraph
// and another runner is still active.
return;
}
Err(err) => {
error!(&logger, "Subgraph instance failed to run: {:#}", err);
subgraph_metrics.deployment_status.failed();
}
}

subgraph_metrics_unregister.unregister(registry);
subgraph_metrics.unregister(registry);
});

Ok(())
}

pub fn new_deployment_status_metric(
&self,
deployment: &DeploymentLocator,
) -> DeploymentStatusMetric {
DeploymentStatusMetric::register(&self.metrics_registry, deployment)
}
}
6 changes: 4 additions & 2 deletions core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ where

debug!(self.logger, "Starting block stream");

self.metrics.subgraph.deployment_status.running();

// Process events from the stream as long as no restart is needed
loop {
let event = {
Expand Down Expand Up @@ -876,7 +878,7 @@ where
self.state.should_try_unfail_non_deterministic = false;

if let UnfailOutcome::Unfailed = outcome {
self.metrics.stream.deployment_failed.set(0.0);
self.metrics.subgraph.deployment_status.running();
self.state.backoff.reset();
}
}
Expand Down Expand Up @@ -909,7 +911,7 @@ where

// Handle unexpected stream errors by marking the subgraph as failed.
Err(e) => {
self.metrics.stream.deployment_failed.set(1.0);
self.metrics.subgraph.deployment_status.failed();
let last_good_block = self
.inputs
.store
Expand Down
9 changes: 0 additions & 9 deletions graph/src/blockchain/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,6 @@ where
#[derive(Clone)]
pub struct BlockStreamMetrics {
pub deployment_head: Box<Gauge>,
pub deployment_failed: Box<Gauge>,
pub reverted_blocks: Gauge,
pub stopwatch: StopwatchMetrics,
}
Expand Down Expand Up @@ -605,16 +604,8 @@ impl BlockStreamMetrics {
labels.clone(),
)
.expect("failed to create `deployment_head` gauge");
let deployment_failed = registry
.new_gauge(
"deployment_failed",
"Boolean gauge to indicate whether the deployment has failed (1 == failed)",
labels,
)
.expect("failed to create `deployment_failed` gauge");
Self {
deployment_head,
deployment_failed,
reverted_blocks,
stopwatch,
}
Expand Down
53 changes: 53 additions & 0 deletions graph/src/components/metrics/registry.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::collections::HashMap;
use std::sync::{Arc, RwLock};

use prometheus::IntGauge;
use prometheus::{labels, Histogram, IntCounterVec};
use slog::info;
use slog::warn;

use crate::components::metrics::{counter_with_labels, gauge_with_labels};
use crate::prelude::Collector;
Expand Down Expand Up @@ -168,6 +171,39 @@ impl MetricsRegistry {
};
}

pub fn register_or_replace<T>(&self, name: &str, c: Box<T>)
where
T: Collector + Clone + 'static,
{
match self.registry.register(c.clone()) {
Ok(()) => {
info!(self.logger, "metric [{}] successfully registered", name);
self.registered_metrics.inc();
}
Err(PrometheusError::AlreadyReg) => {
warn!(
self.logger,
"metric [{}] is already registered; \
the previous registration will be dropped so that the new metric can be used",
name,
);

// Since the current metric is a duplicate,
// we can use it to unregister the previous registration.
self.unregister(c.clone());

self.register(name, c);
}
Err(err) => {
error!(
self.logger,
"registering metric [{}] failed: {:#}", name, err,
);
self.register_errors.inc();
}
}
}

pub fn global_counter(
&self,
name: &str,
Expand Down Expand Up @@ -510,6 +546,23 @@ impl MetricsRegistry {
self.register(name, histograms.clone());
Ok(histograms)
}

pub fn new_int_gauge(
&self,
name: impl AsRef<str>,
help: impl AsRef<str>,
const_labels: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
) -> Result<IntGauge, PrometheusError> {
let opts = Opts::new(name.as_ref(), help.as_ref()).const_labels(
const_labels
.into_iter()
.map(|(a, b)| (a.into(), b.into()))
.collect(),
);
let gauge = IntGauge::with_opts(opts)?;
self.register_or_replace(name.as_ref(), Box::new(gauge.clone()));
Ok(gauge)
}
}

fn deployment_labels(subgraph: &str) -> HashMap<String, String> {
Expand Down
Loading

0 comments on commit 295af3d

Please sign in to comment.