Skip to content

Commit

Permalink
DataFusion TableProvider for memory arrays (#384)
Browse files Browse the repository at this point in the history
Implements a readonly DataFusion `TableProvider` around an array of
`DType::Struct` rows. See tests in `vortex-datafusion/lib.rs` for
example of how to read from a SQL context.


* Adds new `vortex-datafusion` crate. Types are still private until
we're more comfortable making them publicly exposed
* Adds new cfg-flagged converters in `vortex-dtype` to convert from
`DType` to arrow `DataType` and arrow `Schema`

Support for pushing down filters against Vortex arrays will come in a
FLUP tomorrow
  • Loading branch information
a10y authored Jun 18, 2024
1 parent 81cb466 commit 5fbc1af
Show file tree
Hide file tree
Showing 30 changed files with 1,490 additions and 664 deletions.
812 changes: 561 additions & 251 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ members = [
"pyvortex",
"vortex-array",
"vortex-buffer",
"vortex-datafusion",
"vortex-dtype",
"vortex-error",
"vortex-expr",
Expand Down Expand Up @@ -36,20 +37,27 @@ arrayref = "0.3.7"
arrow = { version = "52.0.0", features = ["pyarrow"] }
arrow-array = "52.0.0"
arrow-buffer = "52.0.0"
arrow-cast = "52.0.0"
arrow-csv = "52.0.0"
arrow-data = "52.0.0"
arrow-ipc = "52.0.0"
arrow-schema = "52.0.0"
arrow-select = "52.0.0"
async-trait = "0.1"
bindgen = "0.69.4"
bytes = "1.6.0"
bzip2 = "0.4.4"
cargo_metadata = "0.18.1"
criterion = { version = "0.5.1", features = ["html_reports"] }
croaring = "1.0.1"
csv = "1.3.0"
datafusion = "39.0.0"
datafusion-common = "39.0.0"
datafusion-execution = "39.0.0"
datafusion-expr = "39.0.0"
datafusion-physical-expr = "39.0.0"
datafusion-physical-plan = "39.0.0"
derive_builder = "0.20.0"
divan = "0.1.14"
duckdb = { version = "0.10.1", features = ["bundled"] }
enum-iterator = "2.0.0"
Expand Down
2 changes: 1 addition & 1 deletion bench-vortex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ log = { workspace = true }
parquet = { workspace = true, features = [] }
reqwest = { workspace = true }
simplelog = { workspace = true }
tokio = { workspace = true }
tokio = { workspace = true, features = ["full"] }
uuid = { workspace = true, features = ["v4"] }
vortex-alp = { path = "../encodings/alp" }
vortex-array = { path = "../vortex-array" }
Expand Down
5 changes: 2 additions & 3 deletions bench-vortex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,6 @@ mod test {
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use vortex::arrow::FromArrowArray;
use vortex::compress::Compressor;
use vortex::compute::as_arrow::as_arrow;
use vortex::{ArrayData, IntoArray};

use crate::taxi_data::taxi_data_parquet;
Expand All @@ -240,7 +239,7 @@ mod test {
let struct_arrow: ArrowStructArray = record_batch.into();
let arrow_array: ArrowArrayRef = Arc::new(struct_arrow);
let vortex_array = ArrayData::from_arrow(arrow_array.clone(), false).into_array();
let vortex_as_arrow = as_arrow(&vortex_array).unwrap();
let vortex_as_arrow = vortex_array.flatten().unwrap().into_arrow();
assert_eq!(vortex_as_arrow.deref(), arrow_array.deref());
}
}
Expand All @@ -260,7 +259,7 @@ mod test {
let vortex_array = ArrayData::from_arrow(arrow_array.clone(), false).into_array();

let compressed = Compressor::new(&CTX).compress(&vortex_array, None).unwrap();
let compressed_as_arrow = as_arrow(&compressed).unwrap();
let compressed_as_arrow = compressed.flatten().unwrap().into_arrow();
assert_eq!(compressed_as_arrow.deref(), arrow_array.deref());
}
}
Expand Down
15 changes: 10 additions & 5 deletions pyvortex/src/vortex_arrow.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,27 @@
use arrow::array::Array as ArrowArray;
use arrow::array::{Array as ArrowArray, ArrayRef};
use arrow::error::ArrowError;
use arrow::pyarrow::ToPyArrow;
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use pyo3::types::{IntoPyDict, PyList};
use vortex::compute::as_arrow::as_arrow_chunks;
use vortex::array::chunked::ChunkedArray;
use vortex::Array;

use crate::error::PyVortexError;

pub fn map_arrow_err(error: ArrowError) -> PyErr {
PyValueError::new_err(error.to_string())
}

pub fn export_array<'py>(py: Python<'py>, array: &Array) -> PyResult<Bound<'py, PyAny>> {
// NOTE(ngates): for struct arrays, we could also return a RecordBatchStreamReader.
// NOTE(robert): Return RecordBatchStreamReader always?
let chunks = as_arrow_chunks(array).map_err(PyVortexError::map_err)?;
let chunks: Vec<ArrayRef> = if let Ok(chunked_array) = ChunkedArray::try_from(array) {
chunked_array
.chunks()
.map(|chunk| chunk.flatten().unwrap().into_arrow())
.collect()
} else {
vec![array.clone().flatten().unwrap().into_arrow()]
};
if chunks.is_empty() {
return Err(PyValueError::new_err("No chunks in array"));
}
Expand Down
1 change: 1 addition & 0 deletions vortex-array/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ workspace = true
[dependencies]
arrow-array = { workspace = true }
arrow-buffer = { workspace = true }
arrow-cast = { workspace = true }
arrow-schema = { workspace = true }
enum-iterator = { workspace = true }
flatbuffers = { workspace = true }
Expand Down
17 changes: 0 additions & 17 deletions vortex-array/src/array/bool/compute/as_arrow.rs

This file was deleted.

6 changes: 0 additions & 6 deletions vortex-array/src/array/bool/compute/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
use crate::array::bool::BoolArray;
use crate::compute::as_arrow::AsArrowArray;
use crate::compute::compare::CompareFn;
use crate::compute::fill::FillForwardFn;
use crate::compute::scalar_at::ScalarAtFn;
use crate::compute::slice::SliceFn;
use crate::compute::take::TakeFn;
use crate::compute::ArrayCompute;

mod as_arrow;
mod compare;
mod fill;
mod flatten;
Expand All @@ -16,10 +14,6 @@ mod slice;
mod take;

impl ArrayCompute for BoolArray {
fn as_arrow(&self) -> Option<&dyn AsArrowArray> {
Some(self)
}

fn compare(&self) -> Option<&dyn CompareFn> {
Some(self)
}
Expand Down
31 changes: 2 additions & 29 deletions vortex-array/src/array/datetime/localdatetime.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,10 @@
use std::sync::Arc;

use arrow_array::{
ArrayRef as ArrowArrayRef, TimestampMicrosecondArray, TimestampMillisecondArray,
TimestampNanosecondArray, TimestampSecondArray,
};
use arrow_buffer::ScalarBuffer;
use lazy_static::lazy_static;
use vortex_dtype::{DType, ExtDType, ExtID, PType};
use vortex_dtype::{DType, ExtDType, ExtID};
use vortex_error::{vortex_bail, vortex_err, VortexError, VortexResult};

use crate::array::datetime::TimeUnit;
use crate::array::extension::ExtensionArray;
use crate::compute::as_arrow::AsArrowArray;
use crate::compute::cast::cast;
use crate::validity::ArrayValidity;
use crate::{Array, ArrayDType, ArrayData, ArrayTrait, IntoArrayData};
use crate::{Array, ArrayDType, ArrayData, IntoArrayData};

lazy_static! {
pub static ref ID: ExtID = ExtID::from(LocalDateTimeArray::ID);
Expand Down Expand Up @@ -83,23 +73,6 @@ impl TryFrom<&ExtensionArray> for LocalDateTimeArray {
}
}

impl AsArrowArray for LocalDateTimeArray {
fn as_arrow(&self) -> VortexResult<ArrowArrayRef> {
// A LocalDateTime maps to an Arrow Timestamp array with no timezone.
let timestamps = cast(&self.timestamps(), PType::I64.into())?.flatten_primitive()?;
let validity = timestamps.logical_validity().to_null_buffer()?;
let timestamps_len = timestamps.len();
let buffer = ScalarBuffer::<i64>::new(timestamps.into_buffer().into(), 0, timestamps_len);

Ok(match self.time_unit() {
TimeUnit::Ns => Arc::new(TimestampNanosecondArray::new(buffer, validity)),
TimeUnit::Us => Arc::new(TimestampMicrosecondArray::new(buffer, validity)),
TimeUnit::Ms => Arc::new(TimestampMillisecondArray::new(buffer, validity)),
TimeUnit::S => Arc::new(TimestampSecondArray::new(buffer, validity)),
})
}
}

impl TryFrom<&Array> for LocalDateTimeArray {
type Error = VortexError;

Expand Down
21 changes: 1 addition & 20 deletions vortex-array/src/array/extension/compute.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
use arrow_array::ArrayRef as ArrowArrayRef;
use vortex_error::{vortex_bail, VortexResult};
use vortex_error::VortexResult;
use vortex_scalar::Scalar;

use crate::array::datetime::LocalDateTimeArray;
use crate::array::extension::ExtensionArray;
use crate::compute::as_arrow::AsArrowArray;
use crate::compute::cast::CastFn;
use crate::compute::scalar_at::{scalar_at, ScalarAtFn};
use crate::compute::slice::{slice, SliceFn};
Expand All @@ -13,10 +10,6 @@ use crate::compute::ArrayCompute;
use crate::{Array, IntoArray};

impl ArrayCompute for ExtensionArray {
fn as_arrow(&self) -> Option<&dyn AsArrowArray> {
Some(self)
}

fn cast(&self) -> Option<&dyn CastFn> {
// It's not possible to cast an extension array to another type.
// TODO(ngates): we should allow some extension arrays to implement a callback
Expand All @@ -37,18 +30,6 @@ impl ArrayCompute for ExtensionArray {
}
}

impl AsArrowArray for ExtensionArray {
/// To support full compatability with Arrow, we hard-code the conversion of our datetime
/// arrays to Arrow's Timestamp arrays here. For all other extension arrays, we return an
/// Arrow extension array with the same definition.
fn as_arrow(&self) -> VortexResult<ArrowArrayRef> {
match self.id().as_ref() {
"vortex.localdatetime" => LocalDateTimeArray::try_from(self)?.as_arrow(),
_ => vortex_bail!("Arrow extension arrays not yet supported"),
}
}
}

impl ScalarAtFn for ExtensionArray {
fn scalar_at(&self, index: usize) -> VortexResult<Scalar> {
Ok(Scalar::extension(
Expand Down
48 changes: 0 additions & 48 deletions vortex-array/src/array/null/as_arrow.rs

This file was deleted.

1 change: 0 additions & 1 deletion vortex-array/src/array/null/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use crate::validity::{ArrayValidity, LogicalValidity, Validity};
use crate::visitor::{AcceptArrayVisitor, ArrayVisitor};
use crate::{impl_encoding, ArrayFlatten};

mod as_arrow;
mod compute;

impl_encoding!("vortex.null", Null);
Expand Down
41 changes: 0 additions & 41 deletions vortex-array/src/array/primitive/compute/as_arrow.rs

This file was deleted.

11 changes: 4 additions & 7 deletions vortex-array/src/array/primitive/compute/filter_indices.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ fn apply_predicate<T: NativePType, F: Fn(&T, &T) -> bool>(

#[cfg(test)]
mod test {
use itertools::Itertools;
use vortex_dtype::field::FieldPath;
use vortex_expr::{lit, Conjunction, FieldPathOperations};

Expand All @@ -80,13 +79,11 @@ mod test {
}

fn to_int_indices(filtered_primitive: BoolArray) -> Vec<u64> {
let filtered = filtered_primitive
filtered_primitive
.boolean_buffer()
.iter()
.enumerate()
.flat_map(|(idx, v)| if v { Some(idx as u64) } else { None })
.collect_vec();
filtered
.set_indices()
.map(|i| i as u64)
.collect()
}

#[test]
Expand Down
6 changes: 0 additions & 6 deletions vortex-array/src/array/primitive/compute/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::array::primitive::PrimitiveArray;
use crate::compute::as_arrow::AsArrowArray;
use crate::compute::cast::CastFn;
use crate::compute::compare::CompareFn;
use crate::compute::fill::FillForwardFn;
Expand All @@ -11,7 +10,6 @@ use crate::compute::slice::SliceFn;
use crate::compute::take::TakeFn;
use crate::compute::ArrayCompute;

mod as_arrow;
mod cast;
mod compare;
mod fill;
Expand All @@ -23,10 +21,6 @@ mod subtract_scalar;
mod take;

impl ArrayCompute for PrimitiveArray {
fn as_arrow(&self) -> Option<&dyn AsArrowArray> {
Some(self)
}

fn cast(&self) -> Option<&dyn CastFn> {
Some(self)
}
Expand Down
Loading

0 comments on commit 5fbc1af

Please sign in to comment.