From 295af3d91b9c9b8be571e119c0ac7ed6f3af9902 Mon Sep 17 00:00:00 2001 From: Ion Suman <47307091+isum@users.noreply.github.com> Date: Fri, 29 Nov 2024 10:13:14 +0200 Subject: [PATCH 1/2] graph, core: add deployment_status metric --- core/src/subgraph/instance_manager.rs | 191 +++++++++++++---------- core/src/subgraph/runner.rs | 6 +- graph/src/blockchain/block_stream.rs | 9 -- graph/src/components/metrics/registry.rs | 53 +++++++ graph/src/components/metrics/subgraph.rs | 71 ++++++++- tests/src/fixture/mod.rs | 10 ++ 6 files changed, 243 insertions(+), 97 deletions(-) diff --git a/core/src/subgraph/instance_manager.rs b/core/src/subgraph/instance_manager.rs index b255e9b3a0b..f9bb390d018 100644 --- a/core/src/subgraph/instance_manager.rs +++ b/core/src/subgraph/instance_manager.rs @@ -12,6 +12,7 @@ use crate::subgraph::runner::SubgraphRunner; use graph::blockchain::block_stream::BlockStreamMetrics; use graph::blockchain::{Blockchain, BlockchainKind, DataSource, NodeCapabilities}; use graph::components::metrics::gas::GasMetrics; +use graph::components::metrics::subgraph::DeploymentStatusMetric; use graph::components::subgraph::ProofOfIndexingVersion; use graph::data::subgraph::{UnresolvedSubgraphManifest, SPEC_VERSION_0_0_6}; use graph::data::value::Word; @@ -69,77 +70,91 @@ impl SubgraphInstanceManagerTrait for SubgraphInstanceManager< let err_logger = logger.clone(); let instance_manager = self.cheap_clone(); - let subgraph_start_future = async move { - match BlockchainKind::from_manifest(&manifest)? { - BlockchainKind::Arweave => { - let runner = instance_manager - .build_subgraph_runner::( - logger.clone(), - self.env_vars.cheap_clone(), - loc.clone(), - manifest, - stop_block, - Box::new(SubgraphTriggerProcessor {}), - ) - .await?; - - self.start_subgraph_inner(logger, loc, runner).await - } - BlockchainKind::Ethereum => { - let runner = instance_manager - .build_subgraph_runner::( - logger.clone(), - self.env_vars.cheap_clone(), - loc.clone(), - manifest, - stop_block, - Box::new(SubgraphTriggerProcessor {}), - ) - .await?; - - self.start_subgraph_inner(logger, loc, runner).await - } - BlockchainKind::Near => { - let runner = instance_manager - .build_subgraph_runner::( - logger.clone(), - self.env_vars.cheap_clone(), - loc.clone(), - manifest, - stop_block, - Box::new(SubgraphTriggerProcessor {}), - ) - .await?; - - self.start_subgraph_inner(logger, loc, runner).await - } - BlockchainKind::Cosmos => { - let runner = instance_manager - .build_subgraph_runner::( - logger.clone(), - self.env_vars.cheap_clone(), - loc.clone(), - manifest, - stop_block, - Box::new(SubgraphTriggerProcessor {}), - ) - .await?; - - self.start_subgraph_inner(logger, loc, runner).await - } - BlockchainKind::Substreams => { - let runner = instance_manager - .build_subgraph_runner::( - logger.clone(), - self.env_vars.cheap_clone(), - loc.cheap_clone(), - manifest, - stop_block, - Box::new(graph_chain_substreams::TriggerProcessor::new(loc.clone())), - ) - .await?; - - self.start_subgraph_inner(logger, loc, runner).await + let deployment_status_metric = self.new_deployment_status_metric(&loc); + deployment_status_metric.starting(); + + let subgraph_start_future = { + let deployment_status_metric = deployment_status_metric.clone(); + + async move { + match BlockchainKind::from_manifest(&manifest)? { + BlockchainKind::Arweave => { + let runner = instance_manager + .build_subgraph_runner::( + logger.clone(), + self.env_vars.cheap_clone(), + loc.clone(), + manifest, + stop_block, + Box::new(SubgraphTriggerProcessor {}), + deployment_status_metric, + ) + .await?; + + self.start_subgraph_inner(logger, loc, runner).await + } + BlockchainKind::Ethereum => { + let runner = instance_manager + .build_subgraph_runner::( + logger.clone(), + self.env_vars.cheap_clone(), + loc.clone(), + manifest, + stop_block, + Box::new(SubgraphTriggerProcessor {}), + deployment_status_metric, + ) + .await?; + + self.start_subgraph_inner(logger, loc, runner).await + } + BlockchainKind::Near => { + let runner = instance_manager + .build_subgraph_runner::( + logger.clone(), + self.env_vars.cheap_clone(), + loc.clone(), + manifest, + stop_block, + Box::new(SubgraphTriggerProcessor {}), + deployment_status_metric, + ) + .await?; + + self.start_subgraph_inner(logger, loc, runner).await + } + BlockchainKind::Cosmos => { + let runner = instance_manager + .build_subgraph_runner::( + logger.clone(), + self.env_vars.cheap_clone(), + loc.clone(), + manifest, + stop_block, + Box::new(SubgraphTriggerProcessor {}), + deployment_status_metric, + ) + .await?; + + self.start_subgraph_inner(logger, loc, runner).await + } + BlockchainKind::Substreams => { + let runner = instance_manager + .build_subgraph_runner::( + logger.clone(), + self.env_vars.cheap_clone(), + loc.cheap_clone(), + manifest, + stop_block, + Box::new(graph_chain_substreams::TriggerProcessor::new( + loc.clone(), + )), + deployment_status_metric, + ) + .await?; + + self.start_subgraph_inner(logger, loc, runner).await + } } } }; @@ -152,12 +167,16 @@ impl SubgraphInstanceManagerTrait for SubgraphInstanceManager< graph::spawn(async move { match subgraph_start_future.await { Ok(()) => {} - Err(err) => error!( - err_logger, - "Failed to start subgraph"; - "error" => format!("{:#}", err), - "code" => LogCode::SubgraphStartFailure - ), + Err(err) => { + deployment_status_metric.failed(); + + error!( + err_logger, + "Failed to start subgraph"; + "error" => format!("{:#}", err), + "code" => LogCode::SubgraphStartFailure + ); + } } }); } @@ -217,6 +236,7 @@ impl SubgraphInstanceManager { manifest: serde_yaml::Mapping, stop_block: Option, tp: Box>>, + deployment_status_metric: DeploymentStatusMetric, ) -> anyhow::Result>> where C: Blockchain, @@ -387,6 +407,7 @@ impl SubgraphInstanceManager { registry.cheap_clone(), deployment.hash.as_str(), stopwatch_metrics.clone(), + deployment_status_metric, )); let block_stream_metrics = Arc::new(BlockStreamMetrics::new( @@ -496,7 +517,7 @@ impl SubgraphInstanceManager { ::MappingTrigger: ToAscPtr, { let registry = self.metrics_registry.cheap_clone(); - let subgraph_metrics_unregister = runner.metrics.subgraph.cheap_clone(); + let subgraph_metrics = runner.metrics.subgraph.cheap_clone(); // Keep restarting the subgraph until it terminates. The subgraph // will usually only run once, but is restarted whenever a block @@ -513,7 +534,9 @@ impl SubgraphInstanceManager { // https://github.com/tokio-rs/tokio/issues/3493. graph::spawn_thread(deployment.to_string(), move || { match graph::block_on(task::unconstrained(runner.run())) { - Ok(()) => {} + Ok(()) => { + subgraph_metrics.deployment_status.stopped(); + } Err(SubgraphRunnerError::Duplicate) => { // We do not need to unregister metrics because they are unique per subgraph // and another runner is still active. @@ -521,12 +544,20 @@ impl SubgraphInstanceManager { } Err(err) => { error!(&logger, "Subgraph instance failed to run: {:#}", err); + subgraph_metrics.deployment_status.failed(); } } - subgraph_metrics_unregister.unregister(registry); + subgraph_metrics.unregister(registry); }); Ok(()) } + + pub fn new_deployment_status_metric( + &self, + deployment: &DeploymentLocator, + ) -> DeploymentStatusMetric { + DeploymentStatusMetric::register(&self.metrics_registry, deployment) + } } diff --git a/core/src/subgraph/runner.rs b/core/src/subgraph/runner.rs index 10e5ea4ed67..33b98b21ec7 100644 --- a/core/src/subgraph/runner.rs +++ b/core/src/subgraph/runner.rs @@ -244,6 +244,8 @@ where debug!(self.logger, "Starting block stream"); + self.metrics.subgraph.deployment_status.running(); + // Process events from the stream as long as no restart is needed loop { let event = { @@ -876,7 +878,7 @@ where self.state.should_try_unfail_non_deterministic = false; if let UnfailOutcome::Unfailed = outcome { - self.metrics.stream.deployment_failed.set(0.0); + self.metrics.subgraph.deployment_status.running(); self.state.backoff.reset(); } } @@ -909,7 +911,7 @@ where // Handle unexpected stream errors by marking the subgraph as failed. Err(e) => { - self.metrics.stream.deployment_failed.set(1.0); + self.metrics.subgraph.deployment_status.failed(); let last_good_block = self .inputs .store diff --git a/graph/src/blockchain/block_stream.rs b/graph/src/blockchain/block_stream.rs index 0daf4c33eda..5d9c8b3534a 100644 --- a/graph/src/blockchain/block_stream.rs +++ b/graph/src/blockchain/block_stream.rs @@ -573,7 +573,6 @@ where #[derive(Clone)] pub struct BlockStreamMetrics { pub deployment_head: Box, - pub deployment_failed: Box, pub reverted_blocks: Gauge, pub stopwatch: StopwatchMetrics, } @@ -605,16 +604,8 @@ impl BlockStreamMetrics { labels.clone(), ) .expect("failed to create `deployment_head` gauge"); - let deployment_failed = registry - .new_gauge( - "deployment_failed", - "Boolean gauge to indicate whether the deployment has failed (1 == failed)", - labels, - ) - .expect("failed to create `deployment_failed` gauge"); Self { deployment_head, - deployment_failed, reverted_blocks, stopwatch, } diff --git a/graph/src/components/metrics/registry.rs b/graph/src/components/metrics/registry.rs index 7fa5b903b05..efe5bf3cf47 100644 --- a/graph/src/components/metrics/registry.rs +++ b/graph/src/components/metrics/registry.rs @@ -1,7 +1,10 @@ use std::collections::HashMap; use std::sync::{Arc, RwLock}; +use prometheus::IntGauge; use prometheus::{labels, Histogram, IntCounterVec}; +use slog::info; +use slog::warn; use crate::components::metrics::{counter_with_labels, gauge_with_labels}; use crate::prelude::Collector; @@ -168,6 +171,39 @@ impl MetricsRegistry { }; } + pub fn register_or_replace(&self, name: &str, c: Box) + where + T: Collector + Clone + 'static, + { + match self.registry.register(c.clone()) { + Ok(()) => { + info!(self.logger, "metric [{}] successfully registered", name); + self.registered_metrics.inc(); + } + Err(PrometheusError::AlreadyReg) => { + warn!( + self.logger, + "metric [{}] is already registered; \ + the previous registration will be dropped so that the new metric can be used", + name, + ); + + // Since the current metric is a duplicate, + // we can use it to unregister the previous registration. + self.unregister(c.clone()); + + self.register(name, c); + } + Err(err) => { + error!( + self.logger, + "registering metric [{}] failed: {:#}", name, err, + ); + self.register_errors.inc(); + } + } + } + pub fn global_counter( &self, name: &str, @@ -510,6 +546,23 @@ impl MetricsRegistry { self.register(name, histograms.clone()); Ok(histograms) } + + pub fn new_int_gauge( + &self, + name: impl AsRef, + help: impl AsRef, + const_labels: impl IntoIterator, impl Into)>, + ) -> Result { + let opts = Opts::new(name.as_ref(), help.as_ref()).const_labels( + const_labels + .into_iter() + .map(|(a, b)| (a.into(), b.into())) + .collect(), + ); + let gauge = IntGauge::with_opts(opts)?; + self.register_or_replace(name.as_ref(), Box::new(gauge.clone())); + Ok(gauge) + } } fn deployment_labels(subgraph: &str) -> HashMap { diff --git a/graph/src/components/metrics/subgraph.rs b/graph/src/components/metrics/subgraph.rs index d9b68da8631..ba62ebf3411 100644 --- a/graph/src/components/metrics/subgraph.rs +++ b/graph/src/components/metrics/subgraph.rs @@ -1,21 +1,24 @@ -use prometheus::Counter; - -use crate::blockchain::block_stream::BlockStreamMetrics; -use crate::prelude::{Gauge, Histogram, HostMetrics}; use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; +use prometheus::Counter; +use prometheus::IntGauge; + use super::stopwatch::StopwatchMetrics; use super::MetricsRegistry; +use crate::blockchain::block_stream::BlockStreamMetrics; +use crate::components::store::DeploymentLocator; +use crate::prelude::{Gauge, Histogram, HostMetrics}; pub struct SubgraphInstanceMetrics { pub block_trigger_count: Box, pub block_processing_duration: Box, pub block_ops_transaction_duration: Box, pub firehose_connection_errors: Counter, - pub stopwatch: StopwatchMetrics, + pub deployment_status: DeploymentStatusMetric, + trigger_processing_duration: Box, blocks_processed_secs: Box, blocks_processed_count: Box, @@ -26,6 +29,7 @@ impl SubgraphInstanceMetrics { registry: Arc, subgraph_hash: &str, stopwatch: StopwatchMetrics, + deployment_status: DeploymentStatusMetric, ) -> Self { let block_trigger_count = registry .new_deployment_histogram( @@ -86,13 +90,15 @@ impl SubgraphInstanceMetrics { labels, ) .expect("failed to create blocks_processed_count counter"); + Self { block_trigger_count, block_processing_duration, - trigger_processing_duration, block_ops_transaction_duration, firehose_connection_errors, stopwatch, + deployment_status, + trigger_processing_duration, blocks_processed_secs, blocks_processed_count, } @@ -154,3 +160,56 @@ pub struct RunnerMetrics { /// Sensors to measure the BlockStream metrics pub stream: Arc, } + +/// Reports the current indexing status of a deployment. +#[derive(Clone)] +pub struct DeploymentStatusMetric { + inner: IntGauge, +} + +impl DeploymentStatusMetric { + const STATUS_STARTING: i64 = 1; + const STATUS_RUNNING: i64 = 2; + const STATUS_STOPPED: i64 = 3; + const STATUS_FAILED: i64 = 4; + + /// Registers the metric. + pub fn register(registry: &MetricsRegistry, deployment: &DeploymentLocator) -> Self { + let deployment_status = registry + .new_int_gauge( + "deployment_status", + "Indicates the current indexing status of a deployment.\n\ + Possible values:\n\ + 1 - graph-node is preparing to start indexing;\n\ + 2 - deployment is being indexed;\n\ + 3 - indexing is stopped by request;\n\ + 4 - indexing failed;", + [("deployment", deployment.hash.as_str())], + ) + .expect("failed to register `deployment_status` gauge"); + + Self { + inner: deployment_status, + } + } + + /// Records that the graph-node is preparing to start indexing. + pub fn starting(&self) { + self.inner.set(Self::STATUS_STARTING); + } + + /// Records that the deployment is being indexed. + pub fn running(&self) { + self.inner.set(Self::STATUS_RUNNING); + } + + /// Records that the indexing is stopped by request. + pub fn stopped(&self) { + self.inner.set(Self::STATUS_STOPPED); + } + + /// Records that the indexing failed. + pub fn failed(&self) { + self.inner.set(Self::STATUS_FAILED); + } +} diff --git a/tests/src/fixture/mod.rs b/tests/src/fixture/mod.rs index 4eb5fbb42b1..89184e0164b 100644 --- a/tests/src/fixture/mod.rs +++ b/tests/src/fixture/mod.rs @@ -209,6 +209,10 @@ impl TestContext { let (logger, deployment, raw) = self.get_runner_context().await; let tp: Box> = Box::new(SubgraphTriggerProcessor {}); + let deployment_status_metric = self + .instance_manager + .new_deployment_status_metric(&deployment); + self.instance_manager .build_subgraph_runner( logger, @@ -217,6 +221,7 @@ impl TestContext { raw, Some(stop_block.block_number()), tp, + deployment_status_metric, ) .await .unwrap() @@ -234,6 +239,10 @@ impl TestContext { graph_chain_substreams::TriggerProcessor::new(deployment.clone()), ); + let deployment_status_metric = self + .instance_manager + .new_deployment_status_metric(&deployment); + self.instance_manager .build_subgraph_runner( logger, @@ -242,6 +251,7 @@ impl TestContext { raw, Some(stop_block.block_number()), tp, + deployment_status_metric, ) .await .unwrap() From c202a6dbfa1f0d1bc7a37ec8a5e864b9d8c85df2 Mon Sep 17 00:00:00 2001 From: Ion Suman <47307091+isum@users.noreply.github.com> Date: Thu, 16 Jan 2025 15:32:41 +0000 Subject: [PATCH 2/2] update news with the implemented changes --- NEWS.md | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/NEWS.md b/NEWS.md index ab316821488..3e7dc750745 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,5 +1,24 @@ # NEWS +## v0.36.1 + +### What's new + +- A new `deployment_status` metric is added [(#5720)](https://github.com/graphprotocol/graph-node/pull/5720) with the + following behavior: + - Once graph-node has figured out that it should index a deployment, `deployment_status` is set to `1` _(starting)_; + - When the block stream is created and blocks are ready to be processed, `deployment_status` is set to `2` _( + running)_; + - When a deployment is unassigned, `deployment_status` is set to `3` _(stopped)_; + - If a temporary or permanent failure occurs, `deployment_status` is set to `4` _(failed)_; + - If indexing manages to recover from a temporary failure, the `deployment_status` is set back to `2` _( + running)_; + +### Breaking changes + +- The `deployment_failed` metric is removed and the failures are reported by the new `deployment_status` + metric. [(#5720)](https://github.com/graphprotocol/graph-node/pull/5720) + ## v0.36.0 ### Note on Firehose Extended Block Details