diff --git a/core/src/subgraph/context/mod.rs b/core/src/subgraph/context/mod.rs index df0666a4f6a..ea42d2ff503 100644 --- a/core/src/subgraph/context/mod.rs +++ b/core/src/subgraph/context/mod.rs @@ -58,6 +58,10 @@ impl SubgraphKeepAlive { self.sg_metrics.running_count.inc(); } } + + pub fn contains(&self, deployment_id: &DeploymentId) -> bool { + self.alive_map.read().unwrap().contains_key(deployment_id) + } } // The context keeps track of mutable in-memory state that is retained across blocks. diff --git a/core/src/subgraph/instance_manager.rs b/core/src/subgraph/instance_manager.rs index aba94ec2d3a..b255e9b3a0b 100644 --- a/core/src/subgraph/instance_manager.rs +++ b/core/src/subgraph/instance_manager.rs @@ -1,3 +1,6 @@ +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; + use crate::polling_monitor::{ArweaveService, IpfsService}; use crate::subgraph::context::{IndexingContext, SubgraphKeepAlive}; use crate::subgraph::inputs::IndexingInputs; @@ -22,6 +25,7 @@ use tokio::task; use super::context::OffchainMonitor; use super::SubgraphTriggerProcessor; +use crate::subgraph::runner::SubgraphRunnerError; #[derive(Clone)] pub struct SubgraphInstanceManager { @@ -35,6 +39,18 @@ pub struct SubgraphInstanceManager { arweave_service: ArweaveService, static_filters: bool, env_vars: Arc, + + /// By design, there should be only one subgraph runner process per subgraph, but the current + /// implementation does not completely prevent multiple runners from being active at the same + /// time, and we have already had a [bug][0] due to this limitation. Investigating the problem + /// was quite complicated because there was no way to know that the logs were coming from two + /// different processes because all the logs looked the same. Ideally, the implementation + /// should be refactored to make it more strict, but until then, we keep this counter, which + /// is incremented each time a new runner is started, and the previous count is embedded in + /// each log of the started runner, to make debugging future issues easier. + /// + /// [0]: https://github.com/graphprotocol/graph-node/issues/5452 + subgraph_start_counter: Arc, } #[async_trait] @@ -45,7 +61,11 @@ impl SubgraphInstanceManagerTrait for SubgraphInstanceManager< manifest: serde_yaml::Mapping, stop_block: Option, ) { + let runner_index = self.subgraph_start_counter.fetch_add(1, Ordering::SeqCst); + let logger = self.logger_factory.subgraph_logger(&loc); + let logger = logger.new(o!("runner_index" => runner_index)); + let err_logger = logger.clone(); let instance_manager = self.cheap_clone(); @@ -185,6 +205,7 @@ impl SubgraphInstanceManager { static_filters, env_vars, arweave_service, + subgraph_start_counter: Arc::new(AtomicU64::new(0)), } } @@ -491,13 +512,18 @@ impl SubgraphInstanceManager { // it has a dedicated OS thread so the OS will handle the preemption. See // https://github.com/tokio-rs/tokio/issues/3493. graph::spawn_thread(deployment.to_string(), move || { - if let Err(e) = graph::block_on(task::unconstrained(runner.run())) { - error!( - &logger, - "Subgraph instance failed to run: {}", - format!("{:#}", e) - ); + match graph::block_on(task::unconstrained(runner.run())) { + Ok(()) => {} + Err(SubgraphRunnerError::Duplicate) => { + // We do not need to unregister metrics because they are unique per subgraph + // and another runner is still active. + return; + } + Err(err) => { + error!(&logger, "Subgraph instance failed to run: {:#}", err); + } } + subgraph_metrics_unregister.unregister(registry); }); diff --git a/core/src/subgraph/runner.rs b/core/src/subgraph/runner.rs index 9b81c420ec2..a0269863614 100644 --- a/core/src/subgraph/runner.rs +++ b/core/src/subgraph/runner.rs @@ -52,6 +52,15 @@ where pub metrics: RunnerMetrics, } +#[derive(Debug, thiserror::Error)] +pub enum SubgraphRunnerError { + #[error("subgraph runner terminated because a newer one was active")] + Duplicate, + + #[error(transparent)] + Unknown(#[from] Error), +} + impl SubgraphRunner where C: Blockchain, @@ -109,7 +118,7 @@ where #[cfg(debug_assertions)] pub async fn run_for_test(self, break_on_restart: bool) -> Result { - self.run_inner(break_on_restart).await + self.run_inner(break_on_restart).await.map_err(Into::into) } fn is_static_filters_enabled(&self) -> bool { @@ -166,11 +175,11 @@ where self.build_filter() } - pub async fn run(self) -> Result<(), Error> { + pub async fn run(self) -> Result<(), SubgraphRunnerError> { self.run_inner(false).await.map(|_| ()) } - async fn run_inner(mut self, break_on_restart: bool) -> Result { + async fn run_inner(mut self, break_on_restart: bool) -> Result { // If a subgraph failed for deterministic reasons, before start indexing, we first // revert the deployment head. It should lead to the same result since the error was // deterministic. @@ -246,7 +255,8 @@ where // TODO: move cancel handle to the Context // This will require some code refactor in how the BlockStream is created let block_start = Instant::now(); - match self + + let action = self .handle_stream_event(event, &block_stream_cancel_handle) .await .map(|res| { @@ -254,7 +264,30 @@ where .subgraph .observe_block_processed(block_start.elapsed(), res.block_finished()); res - })? { + })?; + + // It is possible that the subgraph was unassigned, but the runner was in + // a retry delay state and did not observe the cancel signal. + if block_stream_cancel_handle.is_canceled() { + // It is also possible that the runner was in a retry delay state while + // the subgraph was reassigned and a new runner was started. + if self.ctx.instances.contains(&self.inputs.deployment.id) { + warn!( + self.logger, + "Terminating the subgraph runner because a newer one is active. \ + Possible reassignment detected while the runner was in a non-cancellable pending state", + ); + return Err(SubgraphRunnerError::Duplicate); + } + + warn!( + self.logger, + "Terminating the subgraph runner because subgraph was unassigned", + ); + return Ok(self); + } + + match action { Action::Continue => continue, Action::Stop => { info!(self.logger, "Stopping subgraph"); @@ -1579,6 +1612,12 @@ where } } +impl From for SubgraphRunnerError { + fn from(err: StoreError) -> Self { + Self::Unknown(err.into()) + } +} + /// Transform the proof of indexing changes into entity updates that will be /// inserted when as_modifications is called. async fn update_proof_of_indexing( diff --git a/store/postgres/src/writable.rs b/store/postgres/src/writable.rs index 4bcf434b6ec..99ccfd02217 100644 --- a/store/postgres/src/writable.rs +++ b/store/postgres/src/writable.rs @@ -17,7 +17,7 @@ use graph::prelude::{ SubgraphStore as _, BLOCK_NUMBER_MAX, }; use graph::schema::{EntityKey, EntityType, InputSchema}; -use graph::slog::{info, warn}; +use graph::slog::{debug, info, warn}; use graph::tokio::select; use graph::tokio::sync::Notify; use graph::tokio::task::JoinHandle; @@ -936,6 +936,7 @@ impl Queue { // Graceful shutdown. We also handled the request // successfully queue.queue.pop().await; + debug!(logger, "Subgraph writer has processed a stop request"); return; } Ok(Err(e)) => {