From c106cba9fc881d4a42e5013b6a72af555e27441c Mon Sep 17 00:00:00 2001 From: Zoran Cvetkov Date: Wed, 29 Jan 2025 23:28:19 +0200 Subject: [PATCH] consistent handling of agregations --- store/postgres/src/relational/prune.rs | 29 ++++++++++++++++++++++-- store/postgres/src/relational_queries.rs | 22 ++++++++++++++---- 2 files changed, 44 insertions(+), 7 deletions(-) diff --git a/store/postgres/src/relational/prune.rs b/store/postgres/src/relational/prune.rs index 39337d2a485..aff5a8b64f9 100644 --- a/store/postgres/src/relational/prune.rs +++ b/store/postgres/src/relational/prune.rs @@ -17,7 +17,12 @@ use graph::{ }; use itertools::Itertools; -use crate::{catalog, copy::AdaptiveBatchSize, deployment, relational::Table}; +use crate::{ + catalog, + copy::AdaptiveBatchSize, + deployment, + relational::{Table, VID_COLUMN}, +}; use super::{Catalog, Layout, Namespace}; @@ -68,6 +73,7 @@ struct TablePair { // has the same name as `src` but is in a different namespace dst: Arc, src_nsp: Namespace, + dst_nsp: Namespace, } impl TablePair { @@ -94,7 +100,12 @@ impl TablePair { } conn.batch_execute(&query)?; - Ok(TablePair { src, dst, src_nsp }) + Ok(TablePair { + src, + dst, + src_nsp, + dst_nsp, + }) } /// Copy all entity versions visible between `earliest_block` and @@ -228,6 +239,11 @@ impl TablePair { let src_qname = &self.src.qualified_name; let dst_qname = &self.dst.qualified_name; let src_nsp = &self.src_nsp; + let dst_nsp = &self.dst_nsp; + + let vid_seq = format!("{}_{VID_COLUMN}_seq", self.src.name); + + let old_vid_form = !self.src.object.is_object_type(); let mut query = String::new(); // What we are about to do would get blocked by autovacuum on our @@ -237,6 +253,15 @@ impl TablePair { "src" => src_nsp.as_str(), "error" => e.to_string()); } + // Make sure the vid sequence + // continues from where it was + if old_vid_form { + writeln!( + query, + "select setval('{dst_nsp}.{vid_seq}', nextval('{src_nsp}.{vid_seq}'));" + )?; + } + writeln!(query, "drop table {src_qname};")?; writeln!(query, "alter table {dst_qname} set schema {src_nsp}")?; conn.transaction(|conn| conn.batch_execute(&query))?; diff --git a/store/postgres/src/relational_queries.rs b/store/postgres/src/relational_queries.rs index 61f0c80a3d8..180344f5430 100644 --- a/store/postgres/src/relational_queries.rs +++ b/store/postgres/src/relational_queries.rs @@ -2548,6 +2548,8 @@ impl<'a> QueryFragment for InsertQuery<'a> { let out = &mut out; out.unsafe_to_cache_prepared(); + let new_vid_form = self.table.object.is_object_type(); + // Construct a query // insert into schema.table(column, ...) // values @@ -2573,7 +2575,9 @@ impl<'a> QueryFragment for InsertQuery<'a> { out.push_sql(CAUSALITY_REGION_COLUMN); }; - out.push_sql(", vid"); + if new_vid_form { + out.push_sql(", vid"); + } out.push_sql(") values\n"); for (i, row) in self.rows.iter().enumerate() { @@ -2591,8 +2595,10 @@ impl<'a> QueryFragment for InsertQuery<'a> { out.push_sql(", "); out.push_bind_param::(&row.causality_region)?; }; - out.push_sql(", "); - out.push_bind_param::(&row.vid)?; + if new_vid_form { + out.push_sql(", "); + out.push_bind_param::(&row.vid)?; + } out.push_sql(")"); } @@ -5090,6 +5096,8 @@ impl<'a> QueryFragment for CopyEntityBatchQuery<'a> { fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> { out.unsafe_to_cache_prepared(); + let new_vid_form = self.src.object.is_object_type(); + // Construct a query // insert into {dst}({columns}) // select {columns} from {src} @@ -5110,7 +5118,9 @@ impl<'a> QueryFragment for CopyEntityBatchQuery<'a> { out.push_sql(", "); out.push_sql(CAUSALITY_REGION_COLUMN); }; - out.push_sql(", vid"); + if new_vid_form { + out.push_sql(", vid"); + } out.push_sql(")\nselect "); for column in &self.columns { @@ -5176,7 +5186,9 @@ impl<'a> QueryFragment for CopyEntityBatchQuery<'a> { )); } } - out.push_sql(", vid"); + if new_vid_form { + out.push_sql(", vid"); + } out.push_sql(" from "); out.push_sql(self.src.qualified_name.as_str());