Skip to content

Commit

Permalink
consistent handling of agregations
Browse files Browse the repository at this point in the history
  • Loading branch information
zorancv committed Jan 29, 2025
1 parent a49edb8 commit c106cba
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 7 deletions.
29 changes: 27 additions & 2 deletions store/postgres/src/relational/prune.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,12 @@ use graph::{
};
use itertools::Itertools;

use crate::{catalog, copy::AdaptiveBatchSize, deployment, relational::Table};
use crate::{
catalog,
copy::AdaptiveBatchSize,
deployment,
relational::{Table, VID_COLUMN},
};

use super::{Catalog, Layout, Namespace};

Expand Down Expand Up @@ -68,6 +73,7 @@ struct TablePair {
// has the same name as `src` but is in a different namespace
dst: Arc<Table>,
src_nsp: Namespace,
dst_nsp: Namespace,
}

impl TablePair {
Expand All @@ -94,7 +100,12 @@ impl TablePair {
}
conn.batch_execute(&query)?;

Ok(TablePair { src, dst, src_nsp })
Ok(TablePair {
src,
dst,
src_nsp,
dst_nsp,
})
}

/// Copy all entity versions visible between `earliest_block` and
Expand Down Expand Up @@ -228,6 +239,11 @@ impl TablePair {
let src_qname = &self.src.qualified_name;
let dst_qname = &self.dst.qualified_name;
let src_nsp = &self.src_nsp;
let dst_nsp = &self.dst_nsp;

let vid_seq = format!("{}_{VID_COLUMN}_seq", self.src.name);

let old_vid_form = !self.src.object.is_object_type();
let mut query = String::new();

// What we are about to do would get blocked by autovacuum on our
Expand All @@ -237,6 +253,15 @@ impl TablePair {
"src" => src_nsp.as_str(), "error" => e.to_string());
}

// Make sure the vid sequence
// continues from where it was
if old_vid_form {
writeln!(
query,
"select setval('{dst_nsp}.{vid_seq}', nextval('{src_nsp}.{vid_seq}'));"
)?;
}

writeln!(query, "drop table {src_qname};")?;
writeln!(query, "alter table {dst_qname} set schema {src_nsp}")?;
conn.transaction(|conn| conn.batch_execute(&query))?;
Expand Down
22 changes: 17 additions & 5 deletions store/postgres/src/relational_queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2548,6 +2548,8 @@ impl<'a> QueryFragment<Pg> for InsertQuery<'a> {
let out = &mut out;
out.unsafe_to_cache_prepared();

let new_vid_form = self.table.object.is_object_type();

// Construct a query
// insert into schema.table(column, ...)
// values
Expand All @@ -2573,7 +2575,9 @@ impl<'a> QueryFragment<Pg> for InsertQuery<'a> {
out.push_sql(CAUSALITY_REGION_COLUMN);
};

out.push_sql(", vid");
if new_vid_form {
out.push_sql(", vid");
}
out.push_sql(") values\n");

for (i, row) in self.rows.iter().enumerate() {
Expand All @@ -2591,8 +2595,10 @@ impl<'a> QueryFragment<Pg> for InsertQuery<'a> {
out.push_sql(", ");
out.push_bind_param::<Integer, _>(&row.causality_region)?;
};
out.push_sql(", ");
out.push_bind_param::<BigInt, _>(&row.vid)?;
if new_vid_form {
out.push_sql(", ");
out.push_bind_param::<BigInt, _>(&row.vid)?;
}
out.push_sql(")");
}

Expand Down Expand Up @@ -5090,6 +5096,8 @@ impl<'a> QueryFragment<Pg> for CopyEntityBatchQuery<'a> {
fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> {
out.unsafe_to_cache_prepared();

let new_vid_form = self.src.object.is_object_type();

// Construct a query
// insert into {dst}({columns})
// select {columns} from {src}
Expand All @@ -5110,7 +5118,9 @@ impl<'a> QueryFragment<Pg> for CopyEntityBatchQuery<'a> {
out.push_sql(", ");
out.push_sql(CAUSALITY_REGION_COLUMN);
};
out.push_sql(", vid");
if new_vid_form {
out.push_sql(", vid");
}

out.push_sql(")\nselect ");
for column in &self.columns {
Expand Down Expand Up @@ -5176,7 +5186,9 @@ impl<'a> QueryFragment<Pg> for CopyEntityBatchQuery<'a> {
));
}
}
out.push_sql(", vid");
if new_vid_form {
out.push_sql(", vid");
}

out.push_sql(" from ");
out.push_sql(self.src.qualified_name.as_str());
Expand Down

0 comments on commit c106cba

Please sign in to comment.