From 10d259af0c650b972b008a640aeeb0cfb33e10f9 Mon Sep 17 00:00:00 2001 From: universalmind303 Date: Thu, 25 Jan 2024 12:20:38 -0600 Subject: [PATCH] fix: schema issue within writebuilder (#2106) # Description The schema when using `with_input_execution_plan` wasn't being applied. # Related Issue(s) closes https://github.com/delta-io/delta-rs/issues/2105 # Documentation --- crates/deltalake-core/src/operations/write.rs | 4 +- .../tests/integration_datafusion.rs | 59 ++++++++++++++++++- 2 files changed, 60 insertions(+), 3 deletions(-) diff --git a/crates/deltalake-core/src/operations/write.rs b/crates/deltalake-core/src/operations/write.rs index bf0ca86d86..bf36b32459 100644 --- a/crates/deltalake-core/src/operations/write.rs +++ b/crates/deltalake-core/src/operations/write.rs @@ -403,14 +403,13 @@ impl std::future::IntoFuture for WriteBuilder { Ok(this.partition_columns.unwrap_or_default()) }?; - let mut schema: ArrowSchemaRef = arrow_schema::Schema::empty().into(); let plan = if let Some(plan) = this.input { Ok(plan) } else if let Some(batches) = this.batches { if batches.is_empty() { Err(WriteError::MissingData) } else { - schema = batches[0].schema(); + let schema = batches[0].schema(); if let Some(snapshot) = &this.snapshot { let table_schema = snapshot @@ -460,6 +459,7 @@ impl std::future::IntoFuture for WriteBuilder { } else { Err(WriteError::MissingData) }?; + let schema = plan.schema(); let state = match this.state { Some(state) => state, diff --git a/crates/deltalake-core/tests/integration_datafusion.rs b/crates/deltalake-core/tests/integration_datafusion.rs index 4cc7c5a37c..f0138f85b3 100644 --- a/crates/deltalake-core/tests/integration_datafusion.rs +++ b/crates/deltalake-core/tests/integration_datafusion.rs @@ -45,7 +45,8 @@ use std::error::Error; mod local { use datafusion::common::stats::Precision; - use deltalake_core::writer::JsonWriter; + use deltalake_core::{logstore::default_logstore, writer::JsonWriter}; + use object_store::local::LocalFileSystem; use super::*; #[tokio::test] @@ -1071,6 +1072,62 @@ mod local { Ok(()) } + + #[tokio::test] + async fn test_issue_2105() -> Result<()> { + use datafusion::arrow::datatypes::{DataType as ArrowDataType, Field, Schema}; + let path = tempfile::tempdir().unwrap(); + let path = path.into_path(); + + let file_store = LocalFileSystem::new_with_prefix(path.clone()).unwrap(); + let log_store = default_logstore( + Arc::new(file_store), + &Url::from_file_path(path.clone()).unwrap(), + &Default::default(), + ); + + let tbl = CreateBuilder::new() + .with_log_store(log_store.clone()) + .with_save_mode(SaveMode::Overwrite) + .with_table_name("test") + .with_column( + "id", + DataType::Primitive(PrimitiveType::Integer), + true, + None, + ); + let tbl = tbl.await.unwrap(); + let ctx = SessionContext::new(); + let plan = ctx + .sql("SELECT 1 as id") + .await + .unwrap() + .create_physical_plan() + .await + .unwrap(); + let write_builder = WriteBuilder::new(log_store, tbl.state); + let _ = write_builder + .with_input_execution_plan(plan) + .with_save_mode(SaveMode::Overwrite) + .await + .unwrap(); + + let table = open_table(path.to_str().unwrap()).await.unwrap(); + let prov: Arc = Arc::new(table); + ctx.register_table("test", prov).unwrap(); + let mut batches = ctx + .sql("SELECT * FROM test") + .await + .unwrap() + .collect() + .await + .unwrap(); + let batch = batches.pop().unwrap(); + + let expected_schema = Schema::new(vec![Field::new("id", ArrowDataType::Int32, true)]); + assert_eq!(batch.schema().as_ref(), &expected_schema); + Ok(()) + } } async fn test_datafusion(context: &IntegrationContext) -> TestResult {