Skip to content

Commit

Permalink
feat: read schema from parquet files in datafusion scans (#1266)
Browse files Browse the repository at this point in the history
# Description

This PR updates table scans with datafusion to read the file schema from
the parquet file within the latest add action of the table. This is to
work around some issues, where the schema we derive from metadata does
not match the data in the parquet files - e.g. nanosecond timestamps vs.
micorsoecond.

We also update the `Load` command to handle column selections and make
it more consistent with the other operations.

# Related Issue(s)

closes #441

# Documentation

<!---
Share links to useful documentation
--->

---------

Co-authored-by: Will Jones <willjones127@gmail.com>
  • Loading branch information
roeap and wjones127 authored Apr 14, 2023
1 parent 216fc85 commit 362a94e
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 57 deletions.
7 changes: 4 additions & 3 deletions rust/src/delta_datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,9 +376,10 @@ impl TableProvider for DeltaTable {
filters: &[Expr],
limit: Option<usize>,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
let schema = Arc::new(<ArrowSchema as TryFrom<&schema::Schema>>::try_from(
DeltaTable::schema(self).unwrap(),
)?);
let schema = self
.state
.physical_arrow_schema(self.object_store())
.await?;

register_store(self, session.runtime_env().clone());

Expand Down
114 changes: 62 additions & 52 deletions rust/src/operations/load.rs
Original file line number Diff line number Diff line change
@@ -1,71 +1,40 @@
use std::collections::HashMap;
use std::sync::Arc;

use crate::storage::DeltaObjectStore;
use crate::{builder::ensure_table_uri, DeltaResult, DeltaTable};

use datafusion::datasource::TableProvider;
use datafusion::execution::context::{SessionContext, TaskContext};
use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream};
use futures::future::BoxFuture;

use crate::storage::DeltaObjectStore;
use crate::table_state::DeltaTableState;
use crate::{DeltaResult, DeltaTable, DeltaTableError};

#[derive(Debug, Clone)]
pub struct LoadBuilder {
location: Option<String>,
/// A snapshot of the to-be-loaded table's state
snapshot: DeltaTableState,
/// Delta object store for handling data files
store: Arc<DeltaObjectStore>,
/// A sub-selection of columns to be loaded
columns: Option<Vec<String>>,
storage_options: Option<HashMap<String, String>>,
object_store: Option<Arc<DeltaObjectStore>>,
}

impl Default for LoadBuilder {
fn default() -> Self {
Self::new()
}
}

impl LoadBuilder {
/// Create a new [`LoadBuilder`]
pub fn new() -> Self {
pub fn new(store: Arc<DeltaObjectStore>, snapshot: DeltaTableState) -> Self {
Self {
location: None,
snapshot,
store,
columns: None,
storage_options: None,
object_store: None,
}
}

/// Specify the path to the location where table data is stored,
/// which could be a path on distributed storage.
pub fn with_location(mut self, location: impl Into<String>) -> Self {
self.location = Some(location.into());
self
}

/// Specify column selection to load
pub fn with_columns(mut self, columns: impl IntoIterator<Item = impl Into<String>>) -> Self {
self.columns = Some(columns.into_iter().map(|s| s.into()).collect());
self
}

/// Set options used to initialize storage backend
///
/// Options may be passed in the HashMap or set as environment variables.
///
/// [crate::builder::s3_storage_options] describes the available options for the AWS or S3-compliant backend.
/// [dynamodb_lock::DynamoDbLockClient] describes additional options for the AWS atomic rename client.
/// [crate::builder::azure_storage_options] describes the available options for the Azure backend.
/// [crate::builder::gcp_storage_options] describes the available options for the Google Cloud Platform backend.
pub fn with_storage_options(mut self, storage_options: HashMap<String, String>) -> Self {
self.storage_options = Some(storage_options);
self
}

/// Provide a [`DeltaObjectStore`] instance, that points at table location
pub fn with_object_store(mut self, object_store: Arc<DeltaObjectStore>) -> Self {
self.object_store = Some(object_store);
self
}
}

impl std::future::IntoFuture for LoadBuilder {
Expand All @@ -76,20 +45,31 @@ impl std::future::IntoFuture for LoadBuilder {
let this = self;

Box::pin(async move {
let object_store = this.object_store.unwrap();
let url = ensure_table_uri(object_store.root_uri())?;
let store = object_store.storage_backend().clone();
let mut table = DeltaTable::new(object_store, Default::default());
table.load().await?;
let table = DeltaTable::new_with_state(this.store, this.snapshot);
let schema = table.state.arrow_schema()?;
let projection = this
.columns
.map(|cols| {
cols.iter()
.map(|col| {
schema.column_with_name(col).map(|(idx, _)| idx).ok_or(
DeltaTableError::SchemaMismatch {
msg: format!("Column '{col}' does not exist in table schema."),
},
)
})
.collect::<Result<_, _>>()
})
.transpose()?;

let ctx = SessionContext::new();
ctx.state()
.runtime_env()
.register_object_store(url.scheme(), "", store);
let scan_plan = table.scan(&ctx.state(), None, &[], None).await?;
let scan_plan = table
.scan(&ctx.state(), projection.as_ref(), &[], None)
.await?;
let plan = CoalescePartitionsExec::new(scan_plan);
let task_ctx = Arc::new(TaskContext::from(&ctx.state()));
let stream = plan.execute(0, task_ctx)?;

Ok((table, stream))
})
}
Expand Down Expand Up @@ -157,4 +137,34 @@ mod tests {
assert_eq!(batch.schema(), data[0].schema());
Ok(())
}

#[tokio::test]
async fn test_load_with_columns() -> TestResult {
let batch = get_record_batch(None, false);
let table = DeltaOps::new_in_memory().write(vec![batch.clone()]).await?;

let (_table, stream) = DeltaOps(table).load().with_columns(["id", "value"]).await?;
let data = collect_sendable_stream(stream).await?;

let expected = vec![
"+----+-------+",
"| id | value |",
"+----+-------+",
"| A | 1 |",
"| B | 2 |",
"| A | 3 |",
"| B | 4 |",
"| A | 5 |",
"| A | 6 |",
"| A | 7 |",
"| B | 8 |",
"| B | 9 |",
"| A | 10 |",
"| A | 11 |",
"+----+-------+",
];

assert_batches_sorted_eq!(&expected, &data);
Ok(())
}
}
2 changes: 1 addition & 1 deletion rust/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl DeltaOps {
#[cfg(feature = "datafusion")]
#[must_use]
pub fn load(self) -> LoadBuilder {
LoadBuilder::default().with_object_store(self.0.object_store())
LoadBuilder::new(self.0.object_store(), self.0.state)
}

/// Write data to Delta table
Expand Down
39 changes: 39 additions & 0 deletions rust/src/operations/transaction/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use datafusion_common::{Column, DFSchema, Result as DFResult, TableReference};
use datafusion_expr::{AggregateUDF, Expr, ScalarUDF, TableSource};
use datafusion_sql::planner::{ContextProvider, SqlToRel};
use itertools::Either;
use object_store::ObjectStore;
use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder};
use sqlparser::dialect::GenericDialect;
use sqlparser::parser::Parser;
use sqlparser::tokenizer::Tokenizer;
Expand Down Expand Up @@ -82,6 +84,43 @@ impl DeltaTableState {

Ok(sql_to_rel.sql_to_expr(sql, &df_schema, &mut Default::default())?)
}

/// Get the pysical table schema.
///
/// This will construct a schema derived from the parquet schema of the latest data file,
/// and fields for partition columns from the schema defined in table meta data.
pub async fn physical_arrow_schema(
&self,
object_store: Arc<dyn ObjectStore>,
) -> DeltaResult<ArrowSchemaRef> {
if let Some(add) = self.files().iter().max_by_key(|obj| obj.modification_time) {
let file_meta = add.try_into()?;
let file_reader = ParquetObjectReader::new(object_store, file_meta);
let file_schema = ParquetRecordBatchStreamBuilder::new(file_reader)
.await?
.build()?
.schema()
.clone();

let table_schema = Arc::new(ArrowSchema::new(
self.arrow_schema()?
.fields
.clone()
.into_iter()
.map(|field| {
file_schema
.field_with_name(field.name())
.cloned()
.unwrap_or(field)
})
.collect(),
));

Ok(table_schema)
} else {
self.arrow_schema()
}
}
}

pub struct AddContainer<'a> {
Expand Down
35 changes: 34 additions & 1 deletion rust/tests/datafusion_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,40 @@ async fn test_datafusion_partitioned_types() -> Result<()> {
),
]);

assert_eq!(Arc::new(expected_schema), batches[0].schema());
assert_eq!(
Arc::new(expected_schema),
Arc::new(
batches[0]
.schema()
.as_ref()
.clone()
.with_metadata(Default::default())
)
);

Ok(())
}

#[tokio::test]
async fn test_datafusion_scan_timestamps() -> Result<()> {
let ctx = SessionContext::new();
let table = deltalake::open_table("./tests/data/table_with_edge_timestamps")
.await
.unwrap();
ctx.register_table("demo", Arc::new(table))?;

let batches = ctx.sql("SELECT * FROM demo").await?.collect().await?;

let expected = vec![
"+-------------------------------+---------------------+------------+",
"| BIG_DATE | NORMAL_DATE | SOME_VALUE |",
"+-------------------------------+---------------------+------------+",
"| 1816-03-28T05:56:08.066277376 | 2022-02-01T00:00:00 | 2 |",
"| 1816-03-29T05:56:08.066277376 | 2022-01-01T00:00:00 | 1 |",
"+-------------------------------+---------------------+------------+",
];

assert_batches_sorted_eq!(&expected, &batches);

Ok(())
}

0 comments on commit 362a94e

Please sign in to comment.