Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add deployment_status metric #5720

Merged
merged 2 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,24 @@
# NEWS

## v0.36.1

### What's new

- A new `deployment_status` metric is added [(#5720)](https://github.com/graphprotocol/graph-node/pull/5720) with the
following behavior:
- Once graph-node has figured out that it should index a deployment, `deployment_status` is set to `1` _(starting)_;
- When the block stream is created and blocks are ready to be processed, `deployment_status` is set to `2` _(
running)_;
- When a deployment is unassigned, `deployment_status` is set to `3` _(stopped)_;
- If a temporary or permanent failure occurs, `deployment_status` is set to `4` _(failed)_;
- If indexing manages to recover from a temporary failure, the `deployment_status` is set back to `2` _(
running)_;

### Breaking changes

- The `deployment_failed` metric is removed and the failures are reported by the new `deployment_status`
metric. [(#5720)](https://github.com/graphprotocol/graph-node/pull/5720)

## v0.36.0

### Note on Firehose Extended Block Details
Expand Down
191 changes: 111 additions & 80 deletions core/src/subgraph/instance_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::subgraph::runner::SubgraphRunner;
use graph::blockchain::block_stream::BlockStreamMetrics;
use graph::blockchain::{Blockchain, BlockchainKind, DataSource, NodeCapabilities};
use graph::components::metrics::gas::GasMetrics;
use graph::components::metrics::subgraph::DeploymentStatusMetric;
use graph::components::subgraph::ProofOfIndexingVersion;
use graph::data::subgraph::{UnresolvedSubgraphManifest, SPEC_VERSION_0_0_6};
use graph::data::value::Word;
Expand Down Expand Up @@ -69,77 +70,91 @@ impl<S: SubgraphStore> SubgraphInstanceManagerTrait for SubgraphInstanceManager<
let err_logger = logger.clone();
let instance_manager = self.cheap_clone();

let subgraph_start_future = async move {
match BlockchainKind::from_manifest(&manifest)? {
BlockchainKind::Arweave => {
let runner = instance_manager
.build_subgraph_runner::<graph_chain_arweave::Chain>(
logger.clone(),
self.env_vars.cheap_clone(),
loc.clone(),
manifest,
stop_block,
Box::new(SubgraphTriggerProcessor {}),
)
.await?;

self.start_subgraph_inner(logger, loc, runner).await
}
BlockchainKind::Ethereum => {
let runner = instance_manager
.build_subgraph_runner::<graph_chain_ethereum::Chain>(
logger.clone(),
self.env_vars.cheap_clone(),
loc.clone(),
manifest,
stop_block,
Box::new(SubgraphTriggerProcessor {}),
)
.await?;

self.start_subgraph_inner(logger, loc, runner).await
}
BlockchainKind::Near => {
let runner = instance_manager
.build_subgraph_runner::<graph_chain_near::Chain>(
logger.clone(),
self.env_vars.cheap_clone(),
loc.clone(),
manifest,
stop_block,
Box::new(SubgraphTriggerProcessor {}),
)
.await?;

self.start_subgraph_inner(logger, loc, runner).await
}
BlockchainKind::Cosmos => {
let runner = instance_manager
.build_subgraph_runner::<graph_chain_cosmos::Chain>(
logger.clone(),
self.env_vars.cheap_clone(),
loc.clone(),
manifest,
stop_block,
Box::new(SubgraphTriggerProcessor {}),
)
.await?;

self.start_subgraph_inner(logger, loc, runner).await
}
BlockchainKind::Substreams => {
let runner = instance_manager
.build_subgraph_runner::<graph_chain_substreams::Chain>(
logger.clone(),
self.env_vars.cheap_clone(),
loc.cheap_clone(),
manifest,
stop_block,
Box::new(graph_chain_substreams::TriggerProcessor::new(loc.clone())),
)
.await?;

self.start_subgraph_inner(logger, loc, runner).await
let deployment_status_metric = self.new_deployment_status_metric(&loc);
deployment_status_metric.starting();

let subgraph_start_future = {
let deployment_status_metric = deployment_status_metric.clone();

async move {
match BlockchainKind::from_manifest(&manifest)? {
BlockchainKind::Arweave => {
let runner = instance_manager
.build_subgraph_runner::<graph_chain_arweave::Chain>(
logger.clone(),
self.env_vars.cheap_clone(),
loc.clone(),
manifest,
stop_block,
Box::new(SubgraphTriggerProcessor {}),
deployment_status_metric,
)
.await?;

self.start_subgraph_inner(logger, loc, runner).await
}
BlockchainKind::Ethereum => {
let runner = instance_manager
.build_subgraph_runner::<graph_chain_ethereum::Chain>(
logger.clone(),
self.env_vars.cheap_clone(),
loc.clone(),
manifest,
stop_block,
Box::new(SubgraphTriggerProcessor {}),
deployment_status_metric,
)
.await?;

self.start_subgraph_inner(logger, loc, runner).await
}
BlockchainKind::Near => {
let runner = instance_manager
.build_subgraph_runner::<graph_chain_near::Chain>(
logger.clone(),
self.env_vars.cheap_clone(),
loc.clone(),
manifest,
stop_block,
Box::new(SubgraphTriggerProcessor {}),
deployment_status_metric,
)
.await?;

self.start_subgraph_inner(logger, loc, runner).await
}
BlockchainKind::Cosmos => {
let runner = instance_manager
.build_subgraph_runner::<graph_chain_cosmos::Chain>(
logger.clone(),
self.env_vars.cheap_clone(),
loc.clone(),
manifest,
stop_block,
Box::new(SubgraphTriggerProcessor {}),
deployment_status_metric,
)
.await?;

self.start_subgraph_inner(logger, loc, runner).await
}
BlockchainKind::Substreams => {
let runner = instance_manager
.build_subgraph_runner::<graph_chain_substreams::Chain>(
logger.clone(),
self.env_vars.cheap_clone(),
loc.cheap_clone(),
manifest,
stop_block,
Box::new(graph_chain_substreams::TriggerProcessor::new(
loc.clone(),
)),
deployment_status_metric,
)
.await?;

self.start_subgraph_inner(logger, loc, runner).await
}
}
}
};
Expand All @@ -152,12 +167,16 @@ impl<S: SubgraphStore> SubgraphInstanceManagerTrait for SubgraphInstanceManager<
graph::spawn(async move {
match subgraph_start_future.await {
Ok(()) => {}
Err(err) => error!(
err_logger,
"Failed to start subgraph";
"error" => format!("{:#}", err),
"code" => LogCode::SubgraphStartFailure
),
Err(err) => {
deployment_status_metric.failed();

error!(
err_logger,
"Failed to start subgraph";
"error" => format!("{:#}", err),
"code" => LogCode::SubgraphStartFailure
);
}
}
});
}
Expand Down Expand Up @@ -217,6 +236,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
manifest: serde_yaml::Mapping,
stop_block: Option<BlockNumber>,
tp: Box<dyn TriggerProcessor<C, RuntimeHostBuilder<C>>>,
deployment_status_metric: DeploymentStatusMetric,
) -> anyhow::Result<SubgraphRunner<C, RuntimeHostBuilder<C>>>
where
C: Blockchain,
Expand Down Expand Up @@ -387,6 +407,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
registry.cheap_clone(),
deployment.hash.as_str(),
stopwatch_metrics.clone(),
deployment_status_metric,
));

let block_stream_metrics = Arc::new(BlockStreamMetrics::new(
Expand Down Expand Up @@ -496,7 +517,7 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
<C as Blockchain>::MappingTrigger: ToAscPtr,
{
let registry = self.metrics_registry.cheap_clone();
let subgraph_metrics_unregister = runner.metrics.subgraph.cheap_clone();
let subgraph_metrics = runner.metrics.subgraph.cheap_clone();

// Keep restarting the subgraph until it terminates. The subgraph
// will usually only run once, but is restarted whenever a block
Expand All @@ -513,20 +534,30 @@ impl<S: SubgraphStore> SubgraphInstanceManager<S> {
// https://github.com/tokio-rs/tokio/issues/3493.
graph::spawn_thread(deployment.to_string(), move || {
match graph::block_on(task::unconstrained(runner.run())) {
Ok(()) => {}
Ok(()) => {
subgraph_metrics.deployment_status.stopped();
}
Err(SubgraphRunnerError::Duplicate) => {
// We do not need to unregister metrics because they are unique per subgraph
// and another runner is still active.
return;
}
Err(err) => {
error!(&logger, "Subgraph instance failed to run: {:#}", err);
subgraph_metrics.deployment_status.failed();
}
}

subgraph_metrics_unregister.unregister(registry);
subgraph_metrics.unregister(registry);
});

Ok(())
}

pub fn new_deployment_status_metric(
&self,
deployment: &DeploymentLocator,
) -> DeploymentStatusMetric {
DeploymentStatusMetric::register(&self.metrics_registry, deployment)
}
}
6 changes: 4 additions & 2 deletions core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ where

debug!(self.logger, "Starting block stream");

self.metrics.subgraph.deployment_status.running();
isum marked this conversation as resolved.
Show resolved Hide resolved

// Process events from the stream as long as no restart is needed
loop {
let event = {
Expand Down Expand Up @@ -876,7 +878,7 @@ where
self.state.should_try_unfail_non_deterministic = false;

if let UnfailOutcome::Unfailed = outcome {
self.metrics.stream.deployment_failed.set(0.0);
self.metrics.subgraph.deployment_status.running();
self.state.backoff.reset();
}
}
Expand Down Expand Up @@ -909,7 +911,7 @@ where

// Handle unexpected stream errors by marking the subgraph as failed.
Err(e) => {
self.metrics.stream.deployment_failed.set(1.0);
self.metrics.subgraph.deployment_status.failed();
let last_good_block = self
.inputs
.store
Expand Down
9 changes: 0 additions & 9 deletions graph/src/blockchain/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,6 @@ where
#[derive(Clone)]
pub struct BlockStreamMetrics {
pub deployment_head: Box<Gauge>,
pub deployment_failed: Box<Gauge>,
pub reverted_blocks: Gauge,
pub stopwatch: StopwatchMetrics,
}
Expand Down Expand Up @@ -605,16 +604,8 @@ impl BlockStreamMetrics {
labels.clone(),
)
.expect("failed to create `deployment_head` gauge");
let deployment_failed = registry
.new_gauge(
"deployment_failed",
"Boolean gauge to indicate whether the deployment has failed (1 == failed)",
labels,
)
.expect("failed to create `deployment_failed` gauge");
Self {
deployment_head,
deployment_failed,
reverted_blocks,
stopwatch,
}
Expand Down
Loading
Loading