From 362a94e60744b64a8ad985e5d77e9d9aa7a7518b Mon Sep 17 00:00:00 2001 From: Robert Pack <42610831+roeap@users.noreply.github.com> Date: Fri, 14 Apr 2023 04:18:48 +0200 Subject: [PATCH] feat: read schema from parquet files in datafusion scans (#1266) # Description This PR updates table scans with datafusion to read the file schema from the parquet file within the latest add action of the table. This is to work around some issues, where the schema we derive from metadata does not match the data in the parquet files - e.g. nanosecond timestamps vs. micorsoecond. We also update the `Load` command to handle column selections and make it more consistent with the other operations. # Related Issue(s) closes #441 # Documentation --------- Co-authored-by: Will Jones --- rust/src/delta_datafusion.rs | 7 +- rust/src/operations/load.rs | 114 ++++++++++++----------- rust/src/operations/mod.rs | 2 +- rust/src/operations/transaction/state.rs | 39 ++++++++ rust/tests/datafusion_test.rs | 35 ++++++- 5 files changed, 140 insertions(+), 57 deletions(-) diff --git a/rust/src/delta_datafusion.rs b/rust/src/delta_datafusion.rs index 029a4974a1..9feb617470 100644 --- a/rust/src/delta_datafusion.rs +++ b/rust/src/delta_datafusion.rs @@ -376,9 +376,10 @@ impl TableProvider for DeltaTable { filters: &[Expr], limit: Option, ) -> DataFusionResult> { - let schema = Arc::new(>::try_from( - DeltaTable::schema(self).unwrap(), - )?); + let schema = self + .state + .physical_arrow_schema(self.object_store()) + .await?; register_store(self, session.runtime_env().clone()); diff --git a/rust/src/operations/load.rs b/rust/src/operations/load.rs index ea4c5e49dd..2be717e549 100644 --- a/rust/src/operations/load.rs +++ b/rust/src/operations/load.rs @@ -1,71 +1,40 @@ -use std::collections::HashMap; use std::sync::Arc; -use crate::storage::DeltaObjectStore; -use crate::{builder::ensure_table_uri, DeltaResult, DeltaTable}; - use datafusion::datasource::TableProvider; use datafusion::execution::context::{SessionContext, TaskContext}; use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream}; use futures::future::BoxFuture; +use crate::storage::DeltaObjectStore; +use crate::table_state::DeltaTableState; +use crate::{DeltaResult, DeltaTable, DeltaTableError}; + #[derive(Debug, Clone)] pub struct LoadBuilder { - location: Option, + /// A snapshot of the to-be-loaded table's state + snapshot: DeltaTableState, + /// Delta object store for handling data files + store: Arc, + /// A sub-selection of columns to be loaded columns: Option>, - storage_options: Option>, - object_store: Option>, -} - -impl Default for LoadBuilder { - fn default() -> Self { - Self::new() - } } impl LoadBuilder { /// Create a new [`LoadBuilder`] - pub fn new() -> Self { + pub fn new(store: Arc, snapshot: DeltaTableState) -> Self { Self { - location: None, + snapshot, + store, columns: None, - storage_options: None, - object_store: None, } } - /// Specify the path to the location where table data is stored, - /// which could be a path on distributed storage. - pub fn with_location(mut self, location: impl Into) -> Self { - self.location = Some(location.into()); - self - } - /// Specify column selection to load pub fn with_columns(mut self, columns: impl IntoIterator>) -> Self { self.columns = Some(columns.into_iter().map(|s| s.into()).collect()); self } - - /// Set options used to initialize storage backend - /// - /// Options may be passed in the HashMap or set as environment variables. - /// - /// [crate::builder::s3_storage_options] describes the available options for the AWS or S3-compliant backend. - /// [dynamodb_lock::DynamoDbLockClient] describes additional options for the AWS atomic rename client. - /// [crate::builder::azure_storage_options] describes the available options for the Azure backend. - /// [crate::builder::gcp_storage_options] describes the available options for the Google Cloud Platform backend. - pub fn with_storage_options(mut self, storage_options: HashMap) -> Self { - self.storage_options = Some(storage_options); - self - } - - /// Provide a [`DeltaObjectStore`] instance, that points at table location - pub fn with_object_store(mut self, object_store: Arc) -> Self { - self.object_store = Some(object_store); - self - } } impl std::future::IntoFuture for LoadBuilder { @@ -76,20 +45,31 @@ impl std::future::IntoFuture for LoadBuilder { let this = self; Box::pin(async move { - let object_store = this.object_store.unwrap(); - let url = ensure_table_uri(object_store.root_uri())?; - let store = object_store.storage_backend().clone(); - let mut table = DeltaTable::new(object_store, Default::default()); - table.load().await?; + let table = DeltaTable::new_with_state(this.store, this.snapshot); + let schema = table.state.arrow_schema()?; + let projection = this + .columns + .map(|cols| { + cols.iter() + .map(|col| { + schema.column_with_name(col).map(|(idx, _)| idx).ok_or( + DeltaTableError::SchemaMismatch { + msg: format!("Column '{col}' does not exist in table schema."), + }, + ) + }) + .collect::>() + }) + .transpose()?; let ctx = SessionContext::new(); - ctx.state() - .runtime_env() - .register_object_store(url.scheme(), "", store); - let scan_plan = table.scan(&ctx.state(), None, &[], None).await?; + let scan_plan = table + .scan(&ctx.state(), projection.as_ref(), &[], None) + .await?; let plan = CoalescePartitionsExec::new(scan_plan); let task_ctx = Arc::new(TaskContext::from(&ctx.state())); let stream = plan.execute(0, task_ctx)?; + Ok((table, stream)) }) } @@ -157,4 +137,34 @@ mod tests { assert_eq!(batch.schema(), data[0].schema()); Ok(()) } + + #[tokio::test] + async fn test_load_with_columns() -> TestResult { + let batch = get_record_batch(None, false); + let table = DeltaOps::new_in_memory().write(vec![batch.clone()]).await?; + + let (_table, stream) = DeltaOps(table).load().with_columns(["id", "value"]).await?; + let data = collect_sendable_stream(stream).await?; + + let expected = vec![ + "+----+-------+", + "| id | value |", + "+----+-------+", + "| A | 1 |", + "| B | 2 |", + "| A | 3 |", + "| B | 4 |", + "| A | 5 |", + "| A | 6 |", + "| A | 7 |", + "| B | 8 |", + "| B | 9 |", + "| A | 10 |", + "| A | 11 |", + "+----+-------+", + ]; + + assert_batches_sorted_eq!(&expected, &data); + Ok(()) + } } diff --git a/rust/src/operations/mod.rs b/rust/src/operations/mod.rs index 7a554809a5..1f17d4b967 100644 --- a/rust/src/operations/mod.rs +++ b/rust/src/operations/mod.rs @@ -102,7 +102,7 @@ impl DeltaOps { #[cfg(feature = "datafusion")] #[must_use] pub fn load(self) -> LoadBuilder { - LoadBuilder::default().with_object_store(self.0.object_store()) + LoadBuilder::new(self.0.object_store(), self.0.state) } /// Write data to Delta table diff --git a/rust/src/operations/transaction/state.rs b/rust/src/operations/transaction/state.rs index f521ac2062..b22c8f8e8e 100644 --- a/rust/src/operations/transaction/state.rs +++ b/rust/src/operations/transaction/state.rs @@ -11,6 +11,8 @@ use datafusion_common::{Column, DFSchema, Result as DFResult, TableReference}; use datafusion_expr::{AggregateUDF, Expr, ScalarUDF, TableSource}; use datafusion_sql::planner::{ContextProvider, SqlToRel}; use itertools::Either; +use object_store::ObjectStore; +use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder}; use sqlparser::dialect::GenericDialect; use sqlparser::parser::Parser; use sqlparser::tokenizer::Tokenizer; @@ -82,6 +84,43 @@ impl DeltaTableState { Ok(sql_to_rel.sql_to_expr(sql, &df_schema, &mut Default::default())?) } + + /// Get the pysical table schema. + /// + /// This will construct a schema derived from the parquet schema of the latest data file, + /// and fields for partition columns from the schema defined in table meta data. + pub async fn physical_arrow_schema( + &self, + object_store: Arc, + ) -> DeltaResult { + if let Some(add) = self.files().iter().max_by_key(|obj| obj.modification_time) { + let file_meta = add.try_into()?; + let file_reader = ParquetObjectReader::new(object_store, file_meta); + let file_schema = ParquetRecordBatchStreamBuilder::new(file_reader) + .await? + .build()? + .schema() + .clone(); + + let table_schema = Arc::new(ArrowSchema::new( + self.arrow_schema()? + .fields + .clone() + .into_iter() + .map(|field| { + file_schema + .field_with_name(field.name()) + .cloned() + .unwrap_or(field) + }) + .collect(), + )); + + Ok(table_schema) + } else { + self.arrow_schema() + } + } } pub struct AddContainer<'a> { diff --git a/rust/tests/datafusion_test.rs b/rust/tests/datafusion_test.rs index 466ccfc733..a6d18aad7e 100644 --- a/rust/tests/datafusion_test.rs +++ b/rust/tests/datafusion_test.rs @@ -728,7 +728,40 @@ async fn test_datafusion_partitioned_types() -> Result<()> { ), ]); - assert_eq!(Arc::new(expected_schema), batches[0].schema()); + assert_eq!( + Arc::new(expected_schema), + Arc::new( + batches[0] + .schema() + .as_ref() + .clone() + .with_metadata(Default::default()) + ) + ); + + Ok(()) +} + +#[tokio::test] +async fn test_datafusion_scan_timestamps() -> Result<()> { + let ctx = SessionContext::new(); + let table = deltalake::open_table("./tests/data/table_with_edge_timestamps") + .await + .unwrap(); + ctx.register_table("demo", Arc::new(table))?; + + let batches = ctx.sql("SELECT * FROM demo").await?.collect().await?; + + let expected = vec![ + "+-------------------------------+---------------------+------------+", + "| BIG_DATE | NORMAL_DATE | SOME_VALUE |", + "+-------------------------------+---------------------+------------+", + "| 1816-03-28T05:56:08.066277376 | 2022-02-01T00:00:00 | 2 |", + "| 1816-03-29T05:56:08.066277376 | 2022-01-01T00:00:00 | 1 |", + "+-------------------------------+---------------------+------------+", + ]; + + assert_batches_sorted_eq!(&expected, &batches); Ok(()) }