diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index 34f6669fca5..669d0ebca3b 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -3,6 +3,7 @@ use diesel::connection::SimpleConnection; use diesel::pg::PgConnection; use diesel::prelude::*; use diesel::r2d2::{ConnectionManager, PooledConnection}; +use graph::anyhow::Context; use graph::blockchain::block_stream::FirehoseCursor; use graph::components::store::{EntityKey, EntityType, PruneReporter, StoredDynamicDataSource}; use graph::components::versions::VERSIONS; @@ -211,7 +212,8 @@ impl DeploymentStore { site: &Site, ) -> Result { let conn = self.get_conn()?; - detail::deployment_entity(&conn, site) + Ok(detail::deployment_entity(&conn, site) + .with_context(|| format!("Deployment details not found for {}", site.deployment))?) } // Remove the data and metadata for the deployment `site`. This operation @@ -1309,92 +1311,98 @@ impl DeploymentStore { // Do any cleanup to bring the subgraph into a known good state if let Some((src, block)) = graft_src { - info!( - logger, - "Initializing graft by copying data from {} to {}", - src.catalog.site.namespace, - dst.catalog.site.namespace - ); - - let src_manifest_idx_and_name = self - .load_deployment(&src.site)? - .manifest - .template_idx_and_name()?; - let dst_manifest_idx_and_name = self - .load_deployment(&dst.site)? - .manifest - .template_idx_and_name()?; - - // Copy subgraph data - // We allow both not copying tables at all from the source, as well - // as adding new tables in `self`; we only need to check that tables - // that actually need to be copied from the source are compatible - // with the corresponding tables in `self` - let copy_conn = crate::copy::Connection::new( - logger, - self.pool.clone(), - src.clone(), - dst.clone(), - block.clone(), - src_manifest_idx_and_name, - dst_manifest_idx_and_name, - )?; - let status = copy_conn.copy_data()?; - if status == crate::copy::Status::Cancelled { - return Err(StoreError::Canceled); - } - let conn = self.get_conn()?; - conn.transaction(|| -> Result<(), StoreError> { - // Copy shared dynamic data sources and adjust their ID; if - // the subgraph uses private data sources, that is done by - // `copy::Connection::copy_data` since it requires access to - // the source schema which in sharded setups is only - // available while that function runs - let start = Instant::now(); - let count = dynds::shared::copy(&conn, &src.site, &dst.site, block.number)?; - info!(logger, "Copied {} dynamic data sources", count; + let dst_block_ptr = Self::block_ptr_with_conn(&conn, dst.site.cheap_clone())?; + + // If the dst block is past the graft point, then the graft has already been completed. + if dst_block_ptr.map(|ptr| ptr.number) < Some(block.number) { + info!( + logger, + "Initializing graft by copying data from {} to {}", + src.catalog.site.namespace, + dst.catalog.site.namespace + ); + + let src_manifest_idx_and_name = self + .load_deployment(&src.site)? + .manifest + .template_idx_and_name()?; + let dst_manifest_idx_and_name = self + .load_deployment(&dst.site)? + .manifest + .template_idx_and_name()?; + + // Copy subgraph data + // We allow both not copying tables at all from the source, as well + // as adding new tables in `self`; we only need to check that tables + // that actually need to be copied from the source are compatible + // with the corresponding tables in `self` + let copy_conn = crate::copy::Connection::new( + logger, + self.pool.clone(), + src.clone(), + dst.clone(), + block.clone(), + src_manifest_idx_and_name, + dst_manifest_idx_and_name, + )?; + let status = copy_conn.copy_data()?; + if status == crate::copy::Status::Cancelled { + return Err(StoreError::Canceled); + } + + let conn = self.get_conn()?; + conn.transaction(|| -> Result<(), StoreError> { + // Copy shared dynamic data sources and adjust their ID; if + // the subgraph uses private data sources, that is done by + // `copy::Connection::copy_data` since it requires access to + // the source schema which in sharded setups is only + // available while that function runs + let start = Instant::now(); + let count = dynds::shared::copy(&conn, &src.site, &dst.site, block.number)?; + info!(logger, "Copied {} dynamic data sources", count; "time_ms" => start.elapsed().as_millis()); - // Copy errors across - let start = Instant::now(); - let count = deployment::copy_errors(&conn, &src.site, &dst.site, &block)?; - info!(logger, "Copied {} existing errors", count; + // Copy errors across + let start = Instant::now(); + let count = deployment::copy_errors(&conn, &src.site, &dst.site, &block)?; + info!(logger, "Copied {} existing errors", count; "time_ms" => start.elapsed().as_millis()); - catalog::copy_account_like(&conn, &src.site, &dst.site)?; - - // Rewind the subgraph so that entity versions that are - // clamped in the future (beyond `block`) become valid for - // all blocks after `block`. `revert_block` gets rid of - // everything including the block passed to it. We want to - // preserve `block` and therefore revert `block+1` - let start = Instant::now(); - let block_to_revert: BlockNumber = block - .number - .checked_add(1) - .expect("block numbers fit into an i32"); - dst.revert_block(&conn, block_to_revert)?; - info!(logger, "Rewound subgraph to block {}", block.number; + catalog::copy_account_like(&conn, &src.site, &dst.site)?; + + // Rewind the subgraph so that entity versions that are + // clamped in the future (beyond `block`) become valid for + // all blocks after `block`. `revert_block` gets rid of + // everything including the block passed to it. We want to + // preserve `block` and therefore revert `block+1` + let start = Instant::now(); + let block_to_revert: BlockNumber = block + .number + .checked_add(1) + .expect("block numbers fit into an i32"); + dst.revert_block(&conn, block_to_revert)?; + info!(logger, "Rewound subgraph to block {}", block.number; "time_ms" => start.elapsed().as_millis()); - let start = Instant::now(); - deployment::set_entity_count(&conn, &dst.site, &dst.count_query)?; - info!(logger, "Counted the entities"; + let start = Instant::now(); + deployment::set_entity_count(&conn, &dst.site, &dst.count_query)?; + info!(logger, "Counted the entities"; "time_ms" => start.elapsed().as_millis()); - // Analyze all tables for this deployment - for entity_name in dst.tables.keys() { - self.analyze_with_conn(site.cheap_clone(), entity_name.as_str(), &conn)?; - } + // Analyze all tables for this deployment + for entity_name in dst.tables.keys() { + self.analyze_with_conn(site.cheap_clone(), entity_name.as_str(), &conn)?; + } - // Set the block ptr to the graft point to signal that we successfully - // performed the graft - crate::deployment::forward_block_ptr(&conn, &dst.site.deployment, &block)?; - info!(logger, "Subgraph successfully initialized"; + // Set the block ptr to the graft point to signal that we successfully + // performed the graft + crate::deployment::forward_block_ptr(&conn, &dst.site.deployment, &block)?; + info!(logger, "Subgraph successfully initialized"; "time_ms" => start.elapsed().as_millis()); - Ok(()) - })?; + Ok(()) + })?; + } } // Make sure the block pointer is set. This is important for newly // deployed subgraphs so that we respect the 'startBlock' setting