Skip to content

Commit

Permalink
Add deployment_status metric (#5720)
Browse files Browse the repository at this point in the history
* graph, core: add deployment_status metric

* update news with the implemented changes
  • Loading branch information
isum authored Jan 16, 2025
1 parent 2be1cc3 commit bd0ab9a
Show file tree
Hide file tree
Showing 7 changed files with 262 additions and 97 deletions.
19 changes: 19 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,24 @@
# NEWS

## v0.36.1

### What's new

- A new `deployment_status` metric is added [(#5720)](https://github.com/graphprotocol/graph-node/pull/5720) with the
following behavior:
- Once graph-node has figured out that it should index a deployment, `deployment_status` is set to `1` _(starting)_;
- When the block stream is created and blocks are ready to be processed, `deployment_status` is set to `2` _(
running)_;
- When a deployment is unassigned, `deployment_status` is set to `3` _(stopped)_;
- If a temporary or permanent failure occurs, `deployment_status` is set to `4` _(failed)_;
- If indexing manages to recover from a temporary failure, the `deployment_status` is set back to `2` _(
running)_;

### Breaking changes

- The `deployment_failed` metric is removed and the failures are reported by the new `deployment_status`
metric. [(#5720)](https://github.com/graphprotocol/graph-node/pull/5720)

## v0.36.0

### Note on Firehose Extended Block Details
Expand Down
191 changes: 111 additions & 80 deletions core/src/subgraph/instance_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::subgraph::runner::SubgraphRunner;
use graph::blockchain::block_stream::BlockStreamMetrics;
use graph::blockchain::{Blockchain, BlockchainKind, DataSource, NodeCapabilities};
use graph::components::metrics::gas::GasMetrics;
use graph::components::metrics::subgraph::DeploymentStatusMetric;
use graph::components::subgraph::ProofOfIndexingVersion;
use graph::data::subgraph::{UnresolvedSubgraphManifest, SPEC_VERSION_0_0_6};
use graph::data::value::Word;
Expand Down Expand Up @@ -69,77 +70,91 @@ impl<S: SubgraphStore> SubgraphInstanceManagerTrait for SubgraphInstanceManager<
let err_logger = logger.clone();
let instance_manager = self.cheap_clone();

let subgraph_start_future = async move {
match BlockchainKind::from_manifest(&manifest)? {
BlockchainKind::Arweave => {
let runner = instance_manager
.build_subgraph_runner::<graph_chain_arweave::Chain>(
logger.clone(),
self.env_vars.cheap_clone(),
loc.clone(),
manifest,
stop_block,
Box::new(SubgraphTriggerProcessor {}),
)
.await?;

self.start_subgraph_inner(logger, loc, runner).await
}
BlockchainKind::Ethereum => {
let runner = instance_manager
.build_subgraph_runner::<graph_chain_ethereum::Chain>(
logger.clone(),
self.env_vars.cheap_clone(),
loc.clone(),
manifest,
stop_block,
Box::new(SubgraphTriggerProcessor {}),
)
.await?;

self.start_subgraph_inner(logger, loc, runner).await
}
BlockchainKind::Near => {
let runner = instance_manager
.build_subgraph_runner::<graph_chain_near::Chain>(
logger.clone(),
self.env_vars.cheap_clone(),
loc.clone(),
manifest,
stop_block,
Box::new(SubgraphTriggerProcessor {}),
)
.await?;

self.start_subgraph_inner(logger, loc, runner).await
}
BlockchainKind::Cosmos => {
let runner = instance_manager
.build_subgraph_runner::<graph_chain_cosmos::Chain>(
logger.clone(),
self.env_vars.cheap_clone(),
loc.clone(),
manifest,
stop_block,
Box::new(SubgraphTriggerProcessor {}),
)
.await?;

self.start_subgraph_inner(logger, loc, runner).await
}
BlockchainKind::Substreams => {
let runner = instance_manager
.build_subgraph_runner::<graph_chain_substreams::Chain>(
logger.clone(),
self.env_vars.cheap_clone(),
loc.cheap_clone(),
manifest,
stop_block,
Box::new(graph_chain_substreams::TriggerProcessor::new(loc.clone())),
)
.await?;

self.start_subgraph_inner(logger, loc, runner).await
let deployment_status_metric = self.new_deployment_status_metric(&loc);
deployment_status_metric.starting();

let subgraph_start_future = {
let deployment_status_metric = deployment_status_metric.clone();

async move {
match BlockchainKind::from_manifest(&manifest)? {
BlockchainKind::Arweave => {
let runner = instance_manager
.build_subgraph_runner::<graph_chain_arweave::Chain>(
logger.clone(),
self.env_vars.cheap_clone(),
loc.clone(),
manifest,
stop_block,
Box::new(SubgraphTriggerProcessor {}),
deployment_status_metric,
)
.await?;

self.start_subgraph_inner(logger, loc, runner).await
}
BlockchainKind::Ethereum => {
let runner = instance_manager
.build_subgraph_runner::<graph_chain_ethereum::Chain>(
logger.clone(),
self.env_vars.cheap_clone(),
loc.clone(),
manifest,
stop_block,
Box::new(SubgraphTriggerProcessor {}),
deployment_status_metric,
)
.await?;

self.start_subgraph_inner(logger, loc, runner).await
}
BlockchainKind::Near => {
let runner = instance_manager
.build_subgraph_runner::<graph_chain_near::Chain>(
logger.clone(),
self.env_vars.cheap_clone(),
loc.clone(),
manifest,
stop_block,
Box::new(SubgraphTriggerProcessor {}),
deployment_status_metric,
)
.await?;

self.start_subgraph_inner(logger, loc, runner).await
}
BlockchainKind::Cosmos => {
let runner = instance_manager
.build_subgraph_runner::<graph_chain_cosmos::Chain>(
logger.clone(),
self.env_vars.cheap_clone(),
loc.clone(),
manifest,
stop_block,
Box::new(SubgraphTriggerProcessor {}),
deployment_status_metric,
)
.await?;

self.start_subgraph_inner(logger, loc, runner).await
}
BlockchainKind::Substreams => {
let runner = instance_manager
.build_subgraph_runner::<graph_chain_substreams::Chain>(
logger.clone(),
self.env_vars.cheap_clone(),
loc.cheap_clone(),
manifest,
stop_block,
Box::new(graph_chain_substreams::TriggerProcessor::new(
loc.clone(),
)),
deployment_status_metric,
)
.await?;

self.start_subgraph_inner(logger, loc, runner).await
}
}
}
};
Expand All @@ -152,12 +167,16 @@ impl<S: SubgraphStore> SubgraphInstanceManagerTrait for SubgraphInstanceManager<
graph::spawn(async move {
match subgraph_start_future.await {
Ok(()) => {}
Err(err) => error!(
err_logger,
"Failed to start subgraph";
"error" => format!("{:#}", err),
"code" => LogCode::SubgraphStartFailure
),
Err(err) => {
deployment_status_metric.failed();

error!(
err_logger,
"Failed to start subgraph";
"error" => format!("{:#}", err),
"code" => LogCode::SubgraphStartFailure
);
}
}
});
}
Expand Down Expand Up @@ -217,6 +236,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
manifest: serde_yaml::Mapping,
stop_block: Option<BlockNumber>,
tp: Box<dyn TriggerProcessor<C, RuntimeHostBuilder<C>>>,
deployment_status_metric: DeploymentStatusMetric,
) -> anyhow::Result<SubgraphRunner<C, RuntimeHostBuilder<C>>>
where
C: Blockchain,
Expand Down Expand Up @@ -387,6 +407,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
registry.cheap_clone(),
deployment.hash.as_str(),
stopwatch_metrics.clone(),
deployment_status_metric,
));

let block_stream_metrics = Arc::new(BlockStreamMetrics::new(
Expand Down Expand Up @@ -496,7 +517,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
<C as Blockchain>::MappingTrigger: ToAscPtr,
{
let registry = self.metrics_registry.cheap_clone();
let subgraph_metrics_unregister = runner.metrics.subgraph.cheap_clone();
let subgraph_metrics = runner.metrics.subgraph.cheap_clone();

// Keep restarting the subgraph until it terminates. The subgraph
// will usually only run once, but is restarted whenever a block
Expand All @@ -513,20 +534,30 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
// https://github.com/tokio-rs/tokio/issues/3493.
graph::spawn_thread(deployment.to_string(), move || {
match graph::block_on(task::unconstrained(runner.run())) {
Ok(()) => {}
Ok(()) => {
subgraph_metrics.deployment_status.stopped();
}
Err(SubgraphRunnerError::Duplicate) => {
// We do not need to unregister metrics because they are unique per subgraph
// and another runner is still active.
return;
}
Err(err) => {
error!(&logger, "Subgraph instance failed to run: {:#}", err);
subgraph_metrics.deployment_status.failed();
}
}

subgraph_metrics_unregister.unregister(registry);
subgraph_metrics.unregister(registry);
});

Ok(())
}

pub fn new_deployment_status_metric(
&self,
deployment: &DeploymentLocator,
) -> DeploymentStatusMetric {
DeploymentStatusMetric::register(&self.metrics_registry, deployment)
}
}
6 changes: 4 additions & 2 deletions core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ where

debug!(self.logger, "Starting block stream");

self.metrics.subgraph.deployment_status.running();

// Process events from the stream as long as no restart is needed
loop {
let event = {
Expand Down Expand Up @@ -876,7 +878,7 @@ where
self.state.should_try_unfail_non_deterministic = false;

if let UnfailOutcome::Unfailed = outcome {
self.metrics.stream.deployment_failed.set(0.0);
self.metrics.subgraph.deployment_status.running();
self.state.backoff.reset();
}
}
Expand Down Expand Up @@ -909,7 +911,7 @@ where

// Handle unexpected stream errors by marking the subgraph as failed.
Err(e) => {
self.metrics.stream.deployment_failed.set(1.0);
self.metrics.subgraph.deployment_status.failed();
let last_good_block = self
.inputs
.store
Expand Down
9 changes: 0 additions & 9 deletions graph/src/blockchain/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,6 @@ where
#[derive(Clone)]
pub struct BlockStreamMetrics {
pub deployment_head: Box<Gauge>,
pub deployment_failed: Box<Gauge>,
pub reverted_blocks: Gauge,
pub stopwatch: StopwatchMetrics,
}
Expand Down Expand Up @@ -605,16 +604,8 @@ impl BlockStreamMetrics {
labels.clone(),
)
.expect("failed to create `deployment_head` gauge");
let deployment_failed = registry
.new_gauge(
"deployment_failed",
"Boolean gauge to indicate whether the deployment has failed (1 == failed)",
labels,
)
.expect("failed to create `deployment_failed` gauge");
Self {
deployment_head,
deployment_failed,
reverted_blocks,
stopwatch,
}
Expand Down
Loading

0 comments on commit bd0ab9a

Please sign in to comment.