Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support add all null column as metadata-only operation via sql #3504

Merged
merged 5 commits into from
Mar 3, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat: support add all null column as metadata-only operation via sql
albertlockett committed Mar 3, 2025
commit 88443f224f1bf93f10ec14ef3d05cbf15431d56e
51 changes: 40 additions & 11 deletions rust/lance/src/dataset/schema_evolution.rs
Original file line number Diff line number Diff line change
@@ -23,6 +23,12 @@ use super::{
Dataset,
};

mod optimize;

use optimize::{
ChainedNewColumnTransformOptimizer, NewColumnTransformOptimizer, SqlToAllNullsOptimizer,
};

#[derive(Debug, Clone, PartialEq)]
pub struct BatchInfo {
pub fragment_id: u32,
@@ -149,6 +155,16 @@ pub(super) async fn add_columns_to_fragments(
Ok(())
};

// ALlNull transform can not performed on legacy files
let has_legacy_files = has_legacy_files(fragments);

// Optimize the transforms
let mut optimizer = ChainedNewColumnTransformOptimizer::new(vec![]);
if has_legacy_files {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be !has_legacy_files? Or am I misunderstanding the above comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, there should have been a ! here. Caught this after adding tests

optimizer.add_optimizer(Box::new(SqlToAllNullsOptimizer::new()));
}
let transforms = optimizer.optimize(dataset, transforms)?;

let (output_schema, fragments) = match transforms {
NewColumnTransform::BatchUDF(udf) => {
check_names(udf.output_schema.as_ref())?;
@@ -262,17 +278,7 @@ pub(super) async fn add_columns_to_fragments(
// can't add all-null columns as a metadata-only operation. The reason is because we
// use the NullReader for fragments that have missing columns and we can't mix legacy
// and non-legacy readers when reading the fragment.
if fragments.iter().any(|fragment| {
fragment.files.iter().any(|file| {
matches!(
LanceFileVersion::try_from_major_minor(
file.file_major_version,
file.file_minor_version
),
Ok(LanceFileVersion::Legacy)
)
})
}) {
if has_legacy_files {
return Err(Error::NotSupported {
source: "Cannot add all-null columns to legacy dataset version.".into(),
location: location!(),
@@ -289,6 +295,24 @@ pub(super) async fn add_columns_to_fragments(
Ok((fragments, schema))
}

fn has_legacy_files(fragments: &[FileFragment]) -> bool {
let has_legacy_files = fragments
.iter()
.map(|f| &f.metadata)
.flat_map(|fragment_meta| fragment_meta.files.iter())
.any(|file_meta| {
matches!(
LanceFileVersion::try_from_major_minor(
file_meta.file_major_version,
file_meta.file_minor_version
),
Ok(LanceFileVersion::Legacy)
)
});

!has_legacy_files
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be easier to just use dataset.is_legacy_storage().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I wasn't aware of that method


pub(super) async fn add_columns(
dataset: &mut Dataset,
transforms: NewColumnTransform,
@@ -1744,4 +1768,9 @@ mod test {

Ok(())
}

#[test]
fn test_new_column_sql_to_all_nulls_transform_optimizer() {
// TODO: write a test to ensure the optimizer for all null sql gets used
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're OK with the general approach in this PR, I'm planning to fill in this test before we merge

}
}
150 changes: 150 additions & 0 deletions rust/lance/src/dataset/schema_evolution/optimize.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
use std::sync::Arc;

use arrow_schema::{DataType, Field, Schema};
use datafusion::prelude::Expr;
use datafusion::scalar::ScalarValue;
use lance_datafusion::planner::Planner;

use crate::error::Result;
use crate::Dataset;

use super::NewColumnTransform;

/// Optimizes a `NewColumnTransform` into
pub(super) trait NewColumnTransformOptimizer: Send + Sync {
/// Optimize the passed `NewColumnTransform` to a more efficient form.
fn optimize(
&self,
dataset: &Dataset,
transform: NewColumnTransform,
) -> Result<NewColumnTransform>;
}

/// A `NewColumnTransformOptimizer` that chains multiple `NewColumnTransformOptimizer`s together.
pub(super) struct ChainedNewColumnTransformOptimizer {
optimizers: Vec<Box<dyn NewColumnTransformOptimizer>>,
}

impl ChainedNewColumnTransformOptimizer {
pub(super) fn new(optimizers: Vec<Box<dyn NewColumnTransformOptimizer>>) -> Self {
Self { optimizers }
}

pub(super) fn add_optimizer(&mut self, optimizer: Box<dyn NewColumnTransformOptimizer>) {
self.optimizers.push(optimizer);
}
}
Comment on lines +16 to +39
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not opposed to this pattern but it seems slightly heavier than we need for this particular fix. Do you anticipate additional optimizers?

Copy link
Contributor Author

@albertlockett albertlockett Mar 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I recognize that. I hesitated deeply over how/where to add the actual core optimization logic.

My thinking was that the alternatives could have been:

  • to run the transform logic add_columns_to_fragments , but that method is already pretty long, so it would be better if the optimization was extracted to somewhere else.
  • do the optimization higher up in the call-stack, but I thought there might eventually be additional context we compute just before applying the transform, so it's best to keep the optimization near where that context could be available.
  • I could have added some ad-hoc functions instead of the trait. But my thinking was that if there were eventually additional optimizations we make to the transform, without putting some structure around where those functions get invoked, figuring out the actual state of the transformation could get messy

I don't have concrete plans to add more optimizers, but in the future I think it's possible we could. For hypothetical example, maybe there are additional SQL patterns that we could recognize and write as a more optimal UDF. Given that this could happen in the future, I thought it might be worthwhile to at least try to to have a bit of structure around how these optimizers are organized


/// A `NewColumnTransformOptimizer` that chains multiple `NewColumnTransformOptimizer`s together.
impl NewColumnTransformOptimizer for ChainedNewColumnTransformOptimizer {
fn optimize(
&self,
dataset: &Dataset,
transform: NewColumnTransform,
) -> Result<NewColumnTransform> {
let mut transform = transform;
for optimizer in &self.optimizers {
transform = optimizer.optimize(dataset, transform)?;
}
Ok(transform)
}
}

/// Optimizes a `NewColumnTransform` that is a SQL expression to a `NewColumnTransform::AllNulls` if
/// the SQL expression is "NULL". For example
/// `NewColumnTransform::SqlExpression(vec![("new_col", "CAST(NULL AS int)"])`
/// would be optimized to
/// `NewColumnTransform::AllNulls(Schema::new(vec![Field::new("new_col", DataType::Int)]))`.
///
pub(super) struct SqlToAllNullsOptimizer;

impl SqlToAllNullsOptimizer {
pub(super) fn new() -> Self {
Self
}

fn is_all_null(&self, expr: &Expr) -> AllNullsResult {
match expr {
Expr::Cast(cast) => {
if matches!(cast.expr.as_ref(), Expr::Literal(ScalarValue::Null)) {
let data_type = cast.data_type.clone();
AllNullsResult::AllNulls(data_type)
} else {
AllNullsResult::NotAllNulls
}
}
_ => AllNullsResult::NotAllNulls,
}
}
}

enum AllNullsResult {
AllNulls(DataType),
NotAllNulls,
}

impl NewColumnTransformOptimizer for SqlToAllNullsOptimizer {
fn optimize(
&self,
dataset: &Dataset,
transform: NewColumnTransform,
) -> Result<NewColumnTransform> {
match &transform {
NewColumnTransform::SqlExpressions(expressions) => {
let arrow_schema = Arc::new(Schema::from(dataset.schema()));
let planner = Planner::new(arrow_schema);
let mut all_null_schema_fields = vec![];
for (name, expr) in expressions {
let expr = planner.parse_expr(expr)?;
if let AllNullsResult::AllNulls(data_type) = self.is_all_null(&expr) {
let field = Field::new(name, data_type, true);
all_null_schema_fields.push(field);
} else {
return Ok(transform);
}
}

let all_null_schema = Schema::new(all_null_schema_fields);
Ok(NewColumnTransform::AllNulls(Arc::new(all_null_schema)))
}
_ => Ok(transform),
}
}
}

#[cfg(test)]
mod test {
use super::*;

use arrow_array::RecordBatchIterator;

#[tokio::test]
async fn test_sql_to_all_null_transform() {
let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)]));
let empty_reader = RecordBatchIterator::new(vec![], schema.clone());
let dataset = Arc::new(
Dataset::write(empty_reader, "memory://", None)
.await
.unwrap(),
);

let original = NewColumnTransform::SqlExpressions(vec![
("new_col1".to_string(), "CAST(NULL AS int)".to_string()),
("new_col2".to_string(), "CAST(NULL AS bigint)".to_string()),
]);

let optimizer = SqlToAllNullsOptimizer::new();
let result = optimizer.optimize(&dataset, original).unwrap();

assert!(matches!(result, NewColumnTransform::AllNulls(_)));
if let NewColumnTransform::AllNulls(schema) = result {
assert_eq!(schema.fields().len(), 2);
assert_eq!(schema.field(0).name(), "new_col1");
assert_eq!(schema.field(0).data_type(), &DataType::Int32);
assert!(schema.field(0).is_nullable());
assert_eq!(schema.field(1).name(), "new_col2");
assert_eq!(schema.field(1).data_type(), &DataType::Int64);
assert!(schema.field(1).is_nullable());
}
}
}