From 88949d5c03394395ea4a9d3cc11f8c84b0684a6e Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Tue, 25 Jun 2024 08:52:36 -0400 Subject: [PATCH 01/16] save --- vortex-datafusion/src/lib.rs | 100 ++++++++++++++++++++++++++++++++--- 1 file changed, 94 insertions(+), 6 deletions(-) diff --git a/vortex-datafusion/src/lib.rs b/vortex-datafusion/src/lib.rs index ba7330848f..6361d36869 100644 --- a/vortex-datafusion/src/lib.rs +++ b/vortex-datafusion/src/lib.rs @@ -9,13 +9,20 @@ use std::task::{Context, Poll}; use arrow_array::{RecordBatch, StructArray as ArrowStructArray}; use arrow_schema::SchemaRef; use async_trait::async_trait; +use datafusion::arrow::buffer::NullBuffer; use datafusion::dataframe::DataFrame; use datafusion::datasource::TableProvider; use datafusion::execution::context::SessionState; use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; +use datafusion::optimizer::simplify_expressions::ExprSimplifier; use datafusion::prelude::SessionContext; -use datafusion_common::{exec_datafusion_err, exec_err, DataFusionError, Result as DFResult}; -use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType}; +use datafusion_common::{ + exec_datafusion_err, DataFusionError, Result as DFResult, ScalarValue, ToDFSchema, +}; +use datafusion_common::tree_node::{TreeNodeRecursion, TreeNodeVisitor}; +use datafusion_expr::execution_props::ExecutionProps; +use datafusion_expr::simplify::SimplifyContext; +use datafusion_expr::{BinaryExpr, Expr, Operator, TableProviderFilterPushDown, TableType}; use datafusion_physical_expr::EquivalenceProperties; use datafusion_physical_plan::{ DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties, @@ -99,9 +106,16 @@ impl TableProvider for VortexInMemoryTableProvider { filters: &[Expr], _limit: Option, ) -> DFResult> { - if !filters.is_empty() { - return exec_err!("vortex does not support filter pushdown"); - } + let filter_expr = if filters.is_empty() { + None + } else { + Some(make_simplified_conjunction( + filters, + self.schema_ref.clone(), + )?) + }; + + println!("simplified filter: {filter_expr:?}"); let partitioning = if let Ok(chunked_array) = ChunkedArray::try_from(&self.array) { Partitioning::RoundRobinBatch(chunked_array.nchunks()) @@ -118,6 +132,7 @@ impl TableProvider for VortexInMemoryTableProvider { Ok(Arc::new(VortexMemoryExec { array: self.array.clone(), projection: projection.cloned(), + filter_expr, plan_properties, })) } @@ -129,15 +144,88 @@ impl TableProvider for VortexInMemoryTableProvider { // TODO(aduffy): add support for filter pushdown Ok(filters .iter() - .map(|_| TableProviderFilterPushDown::Unsupported) + .map(|expr| { + match expr { + // Several expressions can be pushed down. + Expr::BinaryExpr(_) + | Expr::IsNotNull(_) + | Expr::IsNull(_) + | Expr::IsTrue(_) + | Expr::IsFalse(_) + | Expr::IsNotTrue(_) + | Expr::IsNotFalse(_) + | Expr::Cast(_) => TableProviderFilterPushDown::Exact, + + // All other expressions should be handled outside of the TableProvider + // via the normal DataFusion operator chain. + _ => TableProviderFilterPushDown::Unsupported, + } + + TableProviderFilterPushDown::Exact + }) .collect()) } } +struct ValidationVisitor {} + +impl ValidationVisitor { + +} + +impl TreeNodeVisitor for ValidationVisitor { + type Node = Expr; + + fn f_down(&mut self, node: &Self::Node) -> DFResult { + + } +} + +/// A mask determining the rows in an Array that should be treated as valid for query processing. +/// The vector is used to determine the take order of a set of things, or otherwise we determine +/// that we want to perform cross-filtering of the larger columns, if we so choose. +pub(crate) struct RowSelection { + selection: NullBuffer, +} + +/// Convert a set of expressions that must all match into a single AND expression. +/// +/// # Returns +/// +/// If conversion is successful, the result will be a +/// [binary expression node][datafusion_expr::Expr::BinaryExpr] containing the conjunction. +/// +/// Note that the set of operators must be provided here instead. +/// +/// # Simplification +/// +/// Simplification will occur as part of this process, so constant folding and similar optimizations +/// will be applied before returning the final expression. +fn make_simplified_conjunction(filters: &[Expr], schema: SchemaRef) -> DFResult { + let init = Box::new(Expr::Literal(ScalarValue::Boolean(Some(true)))); + let conjunction = filters.iter().fold(init, |conj, item| { + Box::new(Expr::BinaryExpr(BinaryExpr::new( + conj, + Operator::And, + Box::new(item.clone()), + ))) + }); + + let schema = schema.to_dfschema_ref()?; + + // simplify the expression. + let props = ExecutionProps::new(); + let context = SimplifyContext::new(&props).with_schema(schema); + let simplifier = ExprSimplifier::new(context); + + simplifier.simplify(*conjunction) +} + /// Physical plan node for scans against an in-memory, possibly chunked Vortex Array. #[derive(Debug, Clone)] struct VortexMemoryExec { array: Array, + filter_expr: Option, projection: Option>, plan_properties: PlanProperties, } From 998bc55683f4683f40771de9bd2daca6d33a8189 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Tue, 25 Jun 2024 14:12:13 -0400 Subject: [PATCH 02/16] in progress --- Cargo.lock | 4 + .../src/array/bool/compute/compare.rs | 1 + vortex-array/src/array/constant/mod.rs | 2 + vortex-array/src/array/struct_/mod.rs | 13 +- vortex-array/src/stats/statsset.rs | 31 ++ vortex-datafusion/Cargo.toml | 9 +- vortex-datafusion/src/lib.rs | 366 ++++++++++++------ vortex-datafusion/src/plans.rs | 205 ++++++++++ 8 files changed, 513 insertions(+), 118 deletions(-) create mode 100644 vortex-datafusion/src/plans.rs diff --git a/Cargo.lock b/Cargo.lock index 5431681566..3fd8d68809 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3848,11 +3848,15 @@ dependencies = [ "datafusion-physical-expr", "datafusion-physical-plan", "futures", + "itertools 0.13.0", + "lazy_static", "pin-project", "tokio", "vortex-array", "vortex-dtype", "vortex-error", + "vortex-expr", + "vortex-scalar", ] [[package]] diff --git a/vortex-array/src/array/bool/compute/compare.rs b/vortex-array/src/array/bool/compute/compare.rs index d333c9cc33..a63e432cae 100644 --- a/vortex-array/src/array/bool/compute/compare.rs +++ b/vortex-array/src/array/bool/compute/compare.rs @@ -8,6 +8,7 @@ use crate::compute::compare::CompareFn; use crate::{Array, ArrayTrait, IntoArray, IntoArrayVariant}; impl CompareFn for BoolArray { + // TODO(aduffy): replace these with Arrow compute kernels. fn compare(&self, other: &Array, op: Operator) -> VortexResult { let flattened = other.clone().into_bool()?; let lhs = self.boolean_buffer(); diff --git a/vortex-array/src/array/constant/mod.rs b/vortex-array/src/array/constant/mod.rs index a51f2574be..d275b64e37 100644 --- a/vortex-array/src/array/constant/mod.rs +++ b/vortex-array/src/array/constant/mod.rs @@ -25,6 +25,8 @@ impl ConstantArray { Scalar: From, { let scalar: Scalar = scalar.into(); + // TODO(aduffy): add stats for bools, ideally there should be a + // StatsSet::constant(Scalar) constructor that does this for us, like StatsSet::nulls. let stats = StatsSet::from(HashMap::from([ (Stat::Max, scalar.clone()), (Stat::Min, scalar.clone()), diff --git a/vortex-array/src/array/struct_/mod.rs b/vortex-array/src/array/struct_/mod.rs index 86a74c5092..7ffa0ceda2 100644 --- a/vortex-array/src/array/struct_/mod.rs +++ b/vortex-array/src/array/struct_/mod.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; use vortex_dtype::{FieldName, FieldNames, Nullability, StructDType}; -use vortex_error::{vortex_bail, vortex_err}; +use vortex_error::vortex_bail; use crate::stats::ArrayStatisticsCompute; use crate::validity::{ArrayValidity, LogicalValidity, Validity, ValidityMetadata}; @@ -27,6 +27,15 @@ impl StructArray { self.array().child(idx, dtype) } + pub fn field_by_name(&self, name: &str) -> Option { + let field_idx = self + .names() + .iter() + .position(|field_name| field_name.as_ref() == name); + + field_idx.and_then(|field_idx| self.field(field_idx)) + } + pub fn names(&self) -> &FieldNames { let DType::Struct(st, _) = self.dtype() else { unreachable!() @@ -126,7 +135,7 @@ impl StructArray { for column_idx in projection { children.push( self.field(*column_idx) - .ok_or_else(|| vortex_err!(InvalidArgument: "column index out of bounds"))?, + .expect("column must not exceed bounds"), ); names.push(self.names()[*column_idx].clone()); } diff --git a/vortex-array/src/stats/statsset.rs b/vortex-array/src/stats/statsset.rs index cc5dec36d5..02c32b78ca 100644 --- a/vortex-array/src/stats/statsset.rs +++ b/vortex-array/src/stats/statsset.rs @@ -3,6 +3,7 @@ use std::collections::HashMap; use enum_iterator::all; use itertools::Itertools; + use vortex_dtype::DType; use vortex_error::VortexError; use vortex_scalar::Scalar; @@ -27,6 +28,36 @@ impl StatsSet { } } + // pub fn constant(len: usize, scalar: &Scalar) -> Self { + // let mut stats = HashMap::from([ + // (Stat::Max, scalar.clone()), + // (Stat::Min, scalar.clone()), + // (Stat::IsConstant, true.into()), + // (Stat::IsSorted, true.into()), + // (Stat::RunCount, 1.into()), + // ]); + // + // match scalar.dtype() { + // DType::Bool(_) => { + // stats.insert(Stat::TrueCount, 0.into()); + // } + // DType::Primitive(ptype, _) => { + // ptype.byte_width(); + // stats.insert( + // Stat::BitWidthFreq, + // vec![0; ptype.byte_width() * 8 + 1].into(), + // ); + // stats.insert( + // Stat::TrailingZeroFreq, + // vec![ptype.byte_width() * 8; ptype.byte_width() * 8 + 1].into(), + // ); + // } + // _ => {} + // } + // + // Self::from(stats) + // } + /// Specialized constructor for the case where the StatsSet represents /// an array consisting entirely of [null](vortex_dtype::DType::Null) values. pub fn nulls(len: usize, dtype: &DType) -> Self { diff --git a/vortex-datafusion/Cargo.toml b/vortex-datafusion/Cargo.toml index 72c4caa5b0..f013abbf5c 100644 --- a/vortex-datafusion/Cargo.toml +++ b/vortex-datafusion/Cargo.toml @@ -13,7 +13,9 @@ rust-version.workspace = true [dependencies] vortex-array = { path = "../vortex-array" } vortex-dtype = { path = "../vortex-dtype" } +vortex-expr = { path = "../vortex-expr" } vortex-error = { path = "../vortex-error" } +vortex-scalar = { path = "../vortex-scalar" } arrow-array = { workspace = true } arrow-schema = { workspace = true } @@ -26,10 +28,13 @@ datafusion-execution = { workspace = true } datafusion-physical-expr = { workspace = true } datafusion-physical-plan = { workspace = true } futures = { workspace = true } +itertools = { workspace = true } +lazy_static = { workspace = true } pin-project = { workspace = true } [dev-dependencies] tokio = { workspace = true, features = ["test-util"] } -[lints] -workspace = true +# TODO(aduffy): re-enable +#[lints] +#workspace = true diff --git a/vortex-datafusion/src/lib.rs b/vortex-datafusion/src/lib.rs index 6361d36869..00adf82759 100644 --- a/vortex-datafusion/src/lib.rs +++ b/vortex-datafusion/src/lib.rs @@ -1,7 +1,8 @@ //! Connectors to enable DataFusion to read Vortex data. use std::any::Any; -use std::fmt::Formatter; +use std::collections::HashSet; +use std::fmt::{Debug, Formatter}; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -16,28 +17,28 @@ use datafusion::execution::context::SessionState; use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; use datafusion::optimizer::simplify_expressions::ExprSimplifier; use datafusion::prelude::SessionContext; -use datafusion_common::{ - exec_datafusion_err, DataFusionError, Result as DFResult, ScalarValue, ToDFSchema, -}; -use datafusion_common::tree_node::{TreeNodeRecursion, TreeNodeVisitor}; +use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; +use datafusion_common::{exec_datafusion_err, DataFusionError, Result as DFResult, ToDFSchema}; use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::simplify::SimplifyContext; -use datafusion_expr::{BinaryExpr, Expr, Operator, TableProviderFilterPushDown, TableType}; +use datafusion_expr::{Expr, Operator, TableProviderFilterPushDown, TableType}; use datafusion_physical_expr::EquivalenceProperties; use datafusion_physical_plan::{ DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties, }; use futures::{Stream, StreamExt}; +use itertools::Itertools; use pin_project::pin_project; +use vortex::array::bool::BoolArray; use vortex::array::chunked::ChunkedArray; -use vortex::array::struct_::StructArray; -use vortex::{Array, ArrayDType, IntoArray, IntoCanonical}; +use vortex::{Array, ArrayDType, IntoArrayVariant, IntoCanonical}; use vortex_dtype::DType; use vortex_error::{vortex_bail, VortexResult}; use crate::datatype::infer_schema; mod datatype; +mod plans; pub trait SessionContextExt { fn read_vortex(&self, array: Array) -> DFResult; @@ -106,16 +107,28 @@ impl TableProvider for VortexInMemoryTableProvider { filters: &[Expr], _limit: Option, ) -> DFResult> { - let filter_expr = if filters.is_empty() { + fn get_filter_projection(exprs: &[Expr], schema: SchemaRef) -> Vec { + let referenced_columns: HashSet = + exprs.iter().flat_map(get_column_references).collect(); + + let projection: Vec = referenced_columns + .iter() + .map(|col_name| schema.column_with_name(col_name).unwrap().0) + .sorted() + .collect(); + + projection + } + + let filter_exprs: Option> = if filters.is_empty() { None } else { - Some(make_simplified_conjunction( - filters, - self.schema_ref.clone(), - )?) + Some(filters.iter().cloned().collect()) }; - println!("simplified filter: {filter_expr:?}"); + let filter_projection = filter_exprs + .clone() + .map(|exprs| get_filter_projection(exprs.as_slice(), self.schema_ref.clone())); let partitioning = if let Ok(chunked_array) = ChunkedArray::try_from(&self.array) { Partitioning::RoundRobinBatch(chunked_array.nchunks()) @@ -129,56 +142,164 @@ impl TableProvider for VortexInMemoryTableProvider { ExecutionMode::Bounded, ); - Ok(Arc::new(VortexMemoryExec { - array: self.array.clone(), - projection: projection.cloned(), - filter_expr, - plan_properties, - })) + match (filter_exprs, filter_projection) { + // If there is a filter expression, we execute in two phases, first performing a filter + // on the input to get back row indices, and then taking the remaning struct columns + // using the calculcated indices from the filter. + (Some(filter_exprs), Some(filter_projection)) => Ok(make_filter_then_take_plan( + self.schema_ref.clone(), + filter_exprs, + filter_projection, + self.array.clone(), + projection.clone(), + plan_properties, + )), + + // If no filters were pushed down, we materialize the entire StructArray into a + // RecordBatch and let DataFusion process the entire query. + _ => Ok(Arc::new(VortexScanExec { + array: self.array.clone(), + filter_exprs: None, + filter_projection: None, + scan_projection: projection.cloned(), + plan_properties, + })), + } } fn supports_filters_pushdown( &self, filters: &[&Expr], ) -> DFResult> { - // TODO(aduffy): add support for filter pushdown - Ok(filters + // Get the set of column filters supported. + let schema_columns: HashSet = self + .schema_ref + .fields + .iter() + .map(|field| field.name().clone()) + .collect(); + + filters .iter() .map(|expr| { - match expr { - // Several expressions can be pushed down. - Expr::BinaryExpr(_) - | Expr::IsNotNull(_) - | Expr::IsNull(_) - | Expr::IsTrue(_) - | Expr::IsFalse(_) - | Expr::IsNotTrue(_) - | Expr::IsNotFalse(_) - | Expr::Cast(_) => TableProviderFilterPushDown::Exact, - - // All other expressions should be handled outside of the TableProvider - // via the normal DataFusion operator chain. - _ => TableProviderFilterPushDown::Unsupported, + if can_be_pushed_down(*expr, &schema_columns)? { + Ok(TableProviderFilterPushDown::Exact) + } else { + Ok(TableProviderFilterPushDown::Unsupported) } - - TableProviderFilterPushDown::Exact }) - .collect()) + .try_collect() } } -struct ValidationVisitor {} +/// Construct an operator plan that executes in two stages. +/// +/// The first plan stage only materializes the columns related to the provided set of filter +/// expressions. It evaluates the filters into a row selection. +/// +/// The second stage receives the row selection above and dispatches a `take` on the remaining +/// columns. +fn make_filter_then_take_plan( + _schema: SchemaRef, + _filter_exprs: Vec, + _filter_projection: Vec, + _array: Array, + _output_projection: Option<&Vec>, + _plan_properties: PlanProperties, +) -> Arc { + // Create a struct array necessary to run the filter operations. + + todo!() +} -impl ValidationVisitor { +/// Check if the given expression tree can be pushed down into the scan. +fn can_be_pushed_down(expr: &Expr, schema_columns: &HashSet) -> DFResult { + // If the filter references a column not known to our schema, we reject the filter for pushdown. + // TODO(aduffy): is this necessary? Under what conditions would this happen? + let column_refs = get_column_references(expr); + if !column_refs.is_subset(&schema_columns) { + return Ok(false); + } -} + fn is_supported(expr: &Expr) -> bool { + match expr { + Expr::BinaryExpr(binary_expr) => { + // Both the left and right sides must be column expressions, scalars, or casts. + + match binary_expr.op { + // Initially, we will only support pushdown for basic boolean operators + Operator::Eq + | Operator::NotEq + | Operator::Lt + | Operator::LtEq + | Operator::Gt + | Operator::GtEq => true, + + // TODO(aduffy): add support for LIKE + // TODO(aduffy): add support for basic mathematical ops +-*/ + // TODO(aduffy): add support for conjunctions, assuming all of the + // left and right are valid expressions. + _ => false, + } + } + Expr::IsNotNull(_) + | Expr::IsNull(_) + | Expr::IsTrue(_) + | Expr::IsFalse(_) + | Expr::IsNotTrue(_) + | Expr::IsNotFalse(_) + // TODO(aduffy): ensure that cast can be pushed down. + | Expr::Cast(_) => true, + _ => false, + } + } + + // Visitor that traverses the expression tree and tracks if any unsupported expressions were + // encountered. + struct IsSupportedVisitor { + supported_expressions_only: bool, + } -impl TreeNodeVisitor for ValidationVisitor { - type Node = Expr; + impl TreeNodeVisitor<'_> for IsSupportedVisitor { + type Node = Expr; - fn f_down(&mut self, node: &Self::Node) -> DFResult { + fn f_down(&mut self, node: &Self::Node) -> DFResult { + if !is_supported(node) { + self.supported_expressions_only = false; + return Ok(TreeNodeRecursion::Stop); + } + Ok(TreeNodeRecursion::Continue) + } } + + let mut visitor = IsSupportedVisitor { + supported_expressions_only: true, + }; + + // Traverse the tree. + // At the end of the traversal, the internal state of `visitor` will indicate if there were + // unsupported expressions encountered. + expr.visit(&mut visitor)?; + + Ok(visitor.supported_expressions_only) +} + +/// Extract out the columns from our table referenced by the expression. +fn get_column_references(expr: &Expr) -> HashSet { + let mut references = HashSet::new(); + + expr.apply(|node| match node { + Expr::Column(col) => { + references.insert(col.name.clone()); + + Ok(TreeNodeRecursion::Continue) + } + _ => Ok(TreeNodeRecursion::Continue), + }) + .unwrap(); + + references } /// A mask determining the rows in an Array that should be treated as valid for query processing. @@ -188,29 +309,46 @@ pub(crate) struct RowSelection { selection: NullBuffer, } +impl RowSelection { + /// Construct a new RowSelection with all elements initialized to selected (true). + pub(crate) fn new_selected(len: usize) -> Self { + Self { + selection: NullBuffer::new_valid(len), + } + } + + /// Construct a new RowSelection with all elements initialized to unselected (false). + pub(crate) fn new_unselected(len: usize) -> Self { + Self { + selection: NullBuffer::new_null(len), + } + } +} + +impl RowSelection { + // Based on the boolean array outputs of the other vector here. + // We want to be careful when comparing things based on the infra for pushdown here. + pub(crate) fn refine(&mut self, matches: &BoolArray) -> &mut Self { + let matches = matches.boolean_buffer(); + + // If nothing matches, we return a new value to set to false here. + if matches.count_set_bits() == 0 { + return self; + } + + // Use an internal BoolArray to perform the logic here. + // Once we have this setup, it might just work this way. + self + } +} + /// Convert a set of expressions that must all match into a single AND expression. /// /// # Returns /// /// If conversion is successful, the result will be a /// [binary expression node][datafusion_expr::Expr::BinaryExpr] containing the conjunction. -/// -/// Note that the set of operators must be provided here instead. -/// -/// # Simplification -/// -/// Simplification will occur as part of this process, so constant folding and similar optimizations -/// will be applied before returning the final expression. -fn make_simplified_conjunction(filters: &[Expr], schema: SchemaRef) -> DFResult { - let init = Box::new(Expr::Literal(ScalarValue::Boolean(Some(true)))); - let conjunction = filters.iter().fold(init, |conj, item| { - Box::new(Expr::BinaryExpr(BinaryExpr::new( - conj, - Operator::And, - Box::new(item.clone()), - ))) - }); - +fn make_simplified(expr: &Expr, schema: SchemaRef) -> DFResult { let schema = schema.to_dfschema_ref()?; // simplify the expression. @@ -218,72 +356,72 @@ fn make_simplified_conjunction(filters: &[Expr], schema: SchemaRef) -> DFResult< let context = SimplifyContext::new(&props).with_schema(schema); let simplifier = ExprSimplifier::new(context); - simplifier.simplify(*conjunction) + simplifier.simplify(expr.clone()) } /// Physical plan node for scans against an in-memory, possibly chunked Vortex Array. #[derive(Debug, Clone)] -struct VortexMemoryExec { +struct VortexScanExec { array: Array, - filter_expr: Option, - projection: Option>, + filter_exprs: Option>, + filter_projection: Option>, + scan_projection: Option>, plan_properties: PlanProperties, } -impl DisplayAs for VortexMemoryExec { +impl DisplayAs for VortexScanExec { fn fmt_as(&self, _display_type: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { write!(f, "{:?}", self) } } -impl VortexMemoryExec { - /// Read a single array chunk from the source as a RecordBatch. - /// - /// `array` must be a [`StructArray`] or flatten into one. Passing a different Array variant - /// may cause a panic. - fn execute_single_chunk( - array: Array, - projection: &Option>, - _context: Arc, - ) -> DFResult { - let data = array +/// Read a single array chunk from the source as a RecordBatch. +/// +/// # Errors +/// This function will return an Error if `array` is not struct-typed. It will also return an +/// error if the projection references columns +fn execute_unfiltered( + array: &Array, + projection: &Option>, +) -> DFResult { + // Construct the RecordBatch by flattening each struct field and transmuting to an ArrayRef. + let struct_array = array + .clone() + .into_struct() + .map_err(|vortex_error| DataFusionError::Execution(format!("{}", vortex_error)))?; + + let field_order = if let Some(projection) = projection { + projection.clone() + } else { + (0..struct_array.names().len()).collect() + }; + + let projected_struct = struct_array + .project(field_order.as_slice()) + .map_err(|vortex_err| { + exec_datafusion_err!("projection pushdown to Vortex failed: {vortex_err}") + })?; + let batch = RecordBatch::from( + projected_struct .into_canonical() - .map_err(|vortex_error| DataFusionError::Execution(format!("{}", vortex_error)))? - .into_array(); - - // Construct the RecordBatch by flattening each struct field and transmuting to an ArrayRef. - let struct_array = StructArray::try_from(data).expect("array must be StructArray"); - - let field_order = if let Some(projection) = projection { - projection.clone() - } else { - (0..struct_array.names().len()).collect() - }; - - let projected_struct = - struct_array - .project(field_order.as_slice()) - .map_err(|vortex_err| { - exec_datafusion_err!("projection pushdown to Vortex failed: {vortex_err}") - })?; - let batch = RecordBatch::from( - projected_struct - .into_canonical() - .expect("struct arrays must flatten") - .into_arrow() - .as_any() - .downcast_ref::() - .expect("vortex StructArray must convert to arrow StructArray"), - ); - Ok(Box::pin(VortexRecordBatchStream { - schema_ref: batch.schema(), - inner: futures::stream::iter(vec![batch]), - })) - } + .expect("struct arrays must canonicalize") + .into_arrow() + .as_any() + .downcast_ref::() + .expect("vortex StructArray must convert to arrow StructArray"), + ); + Ok(Box::pin(VortexRecordBatchStream { + schema_ref: batch.schema(), + inner: futures::stream::iter(vec![batch]), + })) } +// Row selector stream. +// I.e., send a stream of RowSelector which allows us to pass in a bunch of binary arrays +// back down to the other systems here instead. + #[pin_project] -struct VortexRecordBatchStream { +pub(crate) struct VortexRecordBatchStream { schema_ref: SchemaRef, #[pin] @@ -315,7 +453,7 @@ where } } -impl ExecutionPlan for VortexMemoryExec { +impl ExecutionPlan for VortexScanExec { fn as_any(&self) -> &dyn Any { self } @@ -339,7 +477,7 @@ impl ExecutionPlan for VortexMemoryExec { fn execute( &self, partition: usize, - context: Arc, + _context: Arc, ) -> DFResult { let chunk = if let Ok(chunked_array) = ChunkedArray::try_from(&self.array) { chunked_array @@ -349,7 +487,7 @@ impl ExecutionPlan for VortexMemoryExec { self.array.clone() }; - Self::execute_single_chunk(chunk, &self.projection, context) + execute_unfiltered(&chunk, &self.scan_projection) } } diff --git a/vortex-datafusion/src/plans.rs b/vortex-datafusion/src/plans.rs new file mode 100644 index 0000000000..739465a6f0 --- /dev/null +++ b/vortex-datafusion/src/plans.rs @@ -0,0 +1,205 @@ +//! Physical operators needed to implement scanning of Vortex arrays with pushdown. + +use std::any::Any; +use std::fmt::{Debug, Formatter}; +use std::sync::Arc; + +use arrow_array::RecordBatch; +use arrow_schema::{DataType, Field, Schema, SchemaRef}; +use datafusion_common::Result as DFResult; +use datafusion_execution::{SendableRecordBatchStream, TaskContext}; +use datafusion_expr::Expr; +use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; +use datafusion_physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, PlanProperties, +}; +use lazy_static::lazy_static; +use vortex::array::struct_::StructArray; + +/// Physical plan operator that applies a set of [filters][Expr] against the input, producing a +/// row mask that can be used downstream to force a take against the corresponding struct array +/// chunks but for different columns. +pub(crate) struct RowSelectorExec { + filter_exprs: Vec, + filter_projection: Vec, + + // cached PlanProperties object. We do not make use of this. + cached_plan_props: PlanProperties, + + // A Vortex struct array that contains all columns necessary for executing the filter + // expressions. + filter_struct: StructArray, +} + +lazy_static! { + static ref ROW_SELECTOR_SCHEMA_REF: SchemaRef = Arc::new(Schema::new(vec![Field::new( + "row_idx", + DataType::UInt64, + false + )])); +} + +impl RowSelectorExec { + pub(crate) fn new( + filter_exprs: &Vec, + filter_projection: &Vec, + filter_struct: &StructArray, + ) -> Self { + let cached_plan_props = PlanProperties::new( + EquivalenceProperties::new(ROW_SELECTOR_SCHEMA_REF.clone()), + Partitioning::RoundRobinBatch(1), + ExecutionMode::Bounded, + ); + + Self { + filter_exprs: filter_exprs.clone(), + filter_projection: filter_projection.clone(), + filter_struct: filter_struct.clone(), + cached_plan_props, + } + } +} + +impl Debug for RowSelectorExec { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RowSelectorExec").finish() + } +} + +impl DisplayAs for RowSelectorExec { + fn fmt_as( + &self, + _display_format_type: DisplayFormatType, + f: &mut Formatter, + ) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} + +impl ExecutionPlan for RowSelectorExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.cached_plan_props + } + + fn children(&self) -> Vec<&Arc> { + // No children + vec![] + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> DFResult> { + panic!("with_new_children not supported for RowSelectorExec") + } + + fn execute( + &self, + partition: usize, + _context: Arc, + ) -> DFResult { + assert_eq!( + partition, 0, + "single partitioning only supported by TakeOperator" + ); + + todo!("need to implement this") + } +} + +/// Physical that receives a stream of row indices from a child operator, and uses that to perform +/// a `take` operation on tha backing Vortex array. +pub(crate) struct TakeRowsExec { + plan_properties: PlanProperties, + + // Array storing the indices used to take the plan nodes. + projection: Vec, + + // Input plan, a stream of indices on which we perform a take against the original dataset. + input: Arc, + + output_schema: SchemaRef, + + // A record batch holding the fields that were relevant to executing the upstream filter expression. + // These fields have already been decoded, so we hold them separately and "paste" them together + // with the fields we decode from `table` below. + filter_struct: RecordBatch, + + // The original Vortex array holding the fields we have not decoded yet. + table: StructArray, +} + +impl TakeRowsExec { + pub(crate) fn new( + schema_ref: SchemaRef, + projection: &Vec, + row_indices: Arc, + output_schema: SchemaRef, + table: StructArray, + ) -> Self { + let plan_properties = PlanProperties::new( + EquivalenceProperties::new(schema_ref.clone()), + Partitioning::RoundRobinBatch(1), + ExecutionMode::Bounded, + ); + + Self { + plan_properties, + projection: projection.clone(), + input: row_indices, + output_schema: output_schema.clone(), + filter_struct: RecordBatch::new_empty(output_schema.clone()), + table, + } + } +} + +impl Debug for TakeRowsExec { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Take").finish() + } +} + +impl DisplayAs for TakeRowsExec { + fn fmt_as(&self, _display_type: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} + +impl ExecutionPlan for TakeRowsExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.plan_properties + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.input] + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> DFResult> { + panic!("unsupported with_new_children for {:?}", &self) + } + + fn execute( + &self, + partition: usize, + _context: Arc, + ) -> DFResult { + assert_eq!( + partition, 0, + "single partitioning only supported by TakeOperator" + ); + + todo!() + } +} From abb2e98db97606b46929e547084d90775c8319d3 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Tue, 25 Jun 2024 17:45:41 -0400 Subject: [PATCH 03/16] implement RowSelectorExec --- vortex-array/src/stats/statsset.rs | 1 - vortex-datafusion/src/datatype.rs | 8 ++ vortex-datafusion/src/expr.rs | 58 ++++++++++ vortex-datafusion/src/lib.rs | 67 +---------- vortex-datafusion/src/plans.rs | 171 ++++++++++++++++++++++++++++- 5 files changed, 236 insertions(+), 69 deletions(-) create mode 100644 vortex-datafusion/src/expr.rs diff --git a/vortex-array/src/stats/statsset.rs b/vortex-array/src/stats/statsset.rs index 02c32b78ca..a2f9e85748 100644 --- a/vortex-array/src/stats/statsset.rs +++ b/vortex-array/src/stats/statsset.rs @@ -3,7 +3,6 @@ use std::collections::HashMap; use enum_iterator::all; use itertools::Itertools; - use vortex_dtype::DType; use vortex_error::VortexError; use vortex_scalar::Scalar; diff --git a/vortex-datafusion/src/datatype.rs b/vortex-datafusion/src/datatype.rs index e53fb27f89..c4a0296b72 100644 --- a/vortex-datafusion/src/datatype.rs +++ b/vortex-datafusion/src/datatype.rs @@ -13,6 +13,14 @@ use arrow_schema::{DataType, Field, FieldRef, Fields, Schema, SchemaBuilder}; use vortex_dtype::{DType, Nullability, PType}; +/// Convert a Vortex [struct DType][DType] to an Arrow [Schema]. +/// +/// To avoid ambiguity, Vortex types are mapped to the [DataType] that +/// +/// # Panics +/// +/// This function will panic if the provided `dtype` is not a StructDType, or if the struct DType +/// has top-level nullability. pub(crate) fn infer_schema(dtype: &DType) -> Schema { let DType::Struct(struct_dtype, nullable) = dtype else { panic!("only DType::Struct can be converted to arrow schema"); diff --git a/vortex-datafusion/src/expr.rs b/vortex-datafusion/src/expr.rs new file mode 100644 index 0000000000..62e348498d --- /dev/null +++ b/vortex-datafusion/src/expr.rs @@ -0,0 +1,58 @@ +use arrow_schema::SchemaRef; +use datafusion::optimizer::simplify_expressions::ExprSimplifier; +use datafusion_common::{Result as DFResult, ToDFSchema}; +use datafusion_expr::execution_props::ExecutionProps; +use datafusion_expr::simplify::SimplifyContext; +use datafusion_expr::{and, lit, Expr}; + +/// Convert a set of expressions into a single AND expression. +/// +/// # Returns +/// +/// If conversion is successful, the result will be a +/// [binary expression node][datafusion_expr::Expr::BinaryExpr] containing the conjunction. +pub(crate) fn make_conjunction(exprs: impl AsRef<[Expr]>) -> DFResult { + Ok(exprs + .as_ref() + .iter() + .fold(lit(true), |conj, elem| and(conj, elem.clone()))) +} + +/// Simplify an expression using DataFusion's builtin analysis passes. +/// +/// This encapsulates common optimizations like constant folding and eliminating redundant +/// expressions, e.g. `value AND true`. +pub(crate) fn simplify_expr(expr: &Expr, schema: SchemaRef) -> DFResult { + let schema = schema.to_dfschema_ref()?; + + let props = ExecutionProps::new(); + let context = SimplifyContext::new(&props).with_schema(schema); + let simplifier = ExprSimplifier::new(context); + + simplifier.simplify(expr.clone()) +} + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use arrow_schema::{DataType, Field, Schema}; + use datafusion_expr::{col, lit}; + + use super::*; + + #[test] + fn test_conjunction_simplify() { + let schema = Arc::new(Schema::new(vec![ + Field::new("int_col", DataType::Int32, false), + Field::new("bool_col", DataType::Boolean, false), + ])); + + let exprs = vec![col("int_col").gt_eq(lit(4)), col("bool_col").is_true()]; + + assert_eq!( + simplify_expr(&make_conjunction(&exprs).unwrap(), schema).unwrap(), + and(col("int_col").gt_eq(lit(4)), col("bool_col").is_true()) + ); + } +} diff --git a/vortex-datafusion/src/lib.rs b/vortex-datafusion/src/lib.rs index 00adf82759..f72981e84c 100644 --- a/vortex-datafusion/src/lib.rs +++ b/vortex-datafusion/src/lib.rs @@ -10,17 +10,13 @@ use std::task::{Context, Poll}; use arrow_array::{RecordBatch, StructArray as ArrowStructArray}; use arrow_schema::SchemaRef; use async_trait::async_trait; -use datafusion::arrow::buffer::NullBuffer; use datafusion::dataframe::DataFrame; use datafusion::datasource::TableProvider; use datafusion::execution::context::SessionState; use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; -use datafusion::optimizer::simplify_expressions::ExprSimplifier; use datafusion::prelude::SessionContext; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; use datafusion_common::{exec_datafusion_err, DataFusionError, Result as DFResult, ToDFSchema}; -use datafusion_expr::execution_props::ExecutionProps; -use datafusion_expr::simplify::SimplifyContext; use datafusion_expr::{Expr, Operator, TableProviderFilterPushDown, TableType}; use datafusion_physical_expr::EquivalenceProperties; use datafusion_physical_plan::{ @@ -29,7 +25,6 @@ use datafusion_physical_plan::{ use futures::{Stream, StreamExt}; use itertools::Itertools; use pin_project::pin_project; -use vortex::array::bool::BoolArray; use vortex::array::chunked::ChunkedArray; use vortex::{Array, ArrayDType, IntoArrayVariant, IntoCanonical}; use vortex_dtype::DType; @@ -38,6 +33,7 @@ use vortex_error::{vortex_bail, VortexResult}; use crate::datatype::infer_schema; mod datatype; +mod expr; mod plans; pub trait SessionContextExt { @@ -144,8 +140,8 @@ impl TableProvider for VortexInMemoryTableProvider { match (filter_exprs, filter_projection) { // If there is a filter expression, we execute in two phases, first performing a filter - // on the input to get back row indices, and then taking the remaning struct columns - // using the calculcated indices from the filter. + // on the input to get back row indices, and then taking the remaining struct columns + // using the calculated indices from the filter. (Some(filter_exprs), Some(filter_projection)) => Ok(make_filter_then_take_plan( self.schema_ref.clone(), filter_exprs, @@ -302,63 +298,6 @@ fn get_column_references(expr: &Expr) -> HashSet { references } -/// A mask determining the rows in an Array that should be treated as valid for query processing. -/// The vector is used to determine the take order of a set of things, or otherwise we determine -/// that we want to perform cross-filtering of the larger columns, if we so choose. -pub(crate) struct RowSelection { - selection: NullBuffer, -} - -impl RowSelection { - /// Construct a new RowSelection with all elements initialized to selected (true). - pub(crate) fn new_selected(len: usize) -> Self { - Self { - selection: NullBuffer::new_valid(len), - } - } - - /// Construct a new RowSelection with all elements initialized to unselected (false). - pub(crate) fn new_unselected(len: usize) -> Self { - Self { - selection: NullBuffer::new_null(len), - } - } -} - -impl RowSelection { - // Based on the boolean array outputs of the other vector here. - // We want to be careful when comparing things based on the infra for pushdown here. - pub(crate) fn refine(&mut self, matches: &BoolArray) -> &mut Self { - let matches = matches.boolean_buffer(); - - // If nothing matches, we return a new value to set to false here. - if matches.count_set_bits() == 0 { - return self; - } - - // Use an internal BoolArray to perform the logic here. - // Once we have this setup, it might just work this way. - self - } -} - -/// Convert a set of expressions that must all match into a single AND expression. -/// -/// # Returns -/// -/// If conversion is successful, the result will be a -/// [binary expression node][datafusion_expr::Expr::BinaryExpr] containing the conjunction. -fn make_simplified(expr: &Expr, schema: SchemaRef) -> DFResult { - let schema = schema.to_dfschema_ref()?; - - // simplify the expression. - let props = ExecutionProps::new(); - let context = SimplifyContext::new(&props).with_schema(schema); - let simplifier = ExprSimplifier::new(context); - - simplifier.simplify(expr.clone()) -} - /// Physical plan node for scans against an in-memory, possibly chunked Vortex Array. #[derive(Debug, Clone)] struct VortexScanExec { diff --git a/vortex-datafusion/src/plans.rs b/vortex-datafusion/src/plans.rs index 739465a6f0..4e201c7793 100644 --- a/vortex-datafusion/src/plans.rs +++ b/vortex-datafusion/src/plans.rs @@ -2,19 +2,29 @@ use std::any::Any; use std::fmt::{Debug, Formatter}; +use std::future::Future; +use std::pin::Pin; use std::sync::Arc; +use std::task::{Context, Poll}; -use arrow_array::RecordBatch; +use arrow_array::cast::AsArray; +use arrow_array::{ArrayRef, RecordBatch, UInt64Array}; use arrow_schema::{DataType, Field, Schema, SchemaRef}; -use datafusion_common::Result as DFResult; -use datafusion_execution::{SendableRecordBatchStream, TaskContext}; +use datafusion::prelude::SessionContext; +use datafusion_common::{DFSchema, Result as DFResult}; +use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; use datafusion_expr::Expr; use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; use datafusion_physical_plan::{ DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, PlanProperties, }; +use futures::{ready, Stream}; use lazy_static::lazy_static; use vortex::array::struct_::StructArray; +use vortex::{ArrayDType, IntoCanonical}; + +use crate::datatype::infer_schema; +use crate::expr::{make_conjunction, simplify_expr}; /// Physical plan operator that applies a set of [filters][Expr] against the input, producing a /// row mask that can be used downstream to force a take against the corresponding struct array @@ -107,7 +117,103 @@ impl ExecutionPlan for RowSelectorExec { "single partitioning only supported by TakeOperator" ); - todo!("need to implement this") + let stream_schema = Arc::new(infer_schema(self.filter_struct.dtype())); + + let filter_struct = self.filter_struct.clone(); + let inner = Box::pin(async move { + let arrow_array = filter_struct.into_canonical().unwrap().into_arrow(); + Ok(RecordBatch::from(arrow_array.as_struct())) + }); + + let conjunction_expr = simplify_expr( + &make_conjunction(&self.filter_exprs)?, + stream_schema.clone(), + )?; + + Ok(Box::pin(RowIndicesStream { + inner, + polled_inner: false, + conjunction_expr, + schema_ref: stream_schema, + })) + } +} + +/// [RecordBatchStream] of row indices, emitted by the [RowSelectorExec] physical plan node. +#[pin_project::pin_project] +struct RowIndicesStream { + /// The inner future that returns `DFResult`. + #[pin] + inner: F, + + polled_inner: bool, + + conjunction_expr: Expr, + schema_ref: SchemaRef, +} + +impl Stream for RowIndicesStream +where + F: Future>, +{ + type Item = DFResult; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + println!("BEGIN poll_next"); + // Get access to a single record batch of values in the upstream system. + let this = self.project(); + + if *this.polled_inner { + println!("EXIT"); + return Poll::Ready(None); + } + + // Get the unfiltered record batch. + // Since this is a one-shot, we only want to poll the inner future once, to create the + // initial batch for us to process. + // + // We want to avoid ever calling it again. + println!("POLL record_batch"); + let record_batch = ready!(this.inner.poll(cx))?; + *this.polled_inner = true; + + // Using a local SessionContext, generate a physical plan to execute the conjunction query + // against the filter columns. + // + // The result of a conjunction expression is a BooleanArray containing `true` for rows + // where the conjunction was satisfied, and `false` otherwise. + println!("CREATE session"); + let session = SessionContext::new(); + let df_schema = DFSchema::try_from(this.schema_ref.clone())?; + let physical_expr = + session.create_physical_expr(this.conjunction_expr.clone(), &df_schema)?; + let selection = physical_expr + .evaluate(&record_batch)? + .into_array(record_batch.num_rows())?; + + // Convert the `selection` BooleanArray into a UInt64Array of indices. + let selection_indices: Vec = selection + .as_boolean() + .clone() + .values() + .set_indices() + .map(|idx| idx as u64) + .collect(); + + let indices: ArrayRef = Arc::new(UInt64Array::from(selection_indices)); + let indices_batch = RecordBatch::try_new(ROW_SELECTOR_SCHEMA_REF.clone(), vec![indices])?; + + println!("RETURNING Poll::Ready"); + Poll::Ready(Some(Ok(indices_batch))) + } +} + +impl RecordBatchStream for RowIndicesStream +where + F: Future>, +{ + fn schema(&self) -> SchemaRef { + self.schema_ref.clone() } } @@ -203,3 +309,60 @@ impl ExecutionPlan for TakeRowsExec { todo!() } } + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use arrow_array::{BooleanArray, RecordBatch, UInt64Array}; + use arrow_schema::{DataType, Field, Schema}; + use datafusion_expr::{and, col, lit}; + use itertools::Itertools; + + use crate::plans::{RowIndicesStream, ROW_SELECTOR_SCHEMA_REF}; + + #[tokio::test] + async fn test_filtering_stream() { + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::UInt64, false), + Field::new("b", DataType::Boolean, false), + ])); + + let _schema = schema.clone(); + let inner = Box::pin(async move { + Ok(RecordBatch::try_new( + _schema, + vec![ + Arc::new(UInt64Array::from(vec![0u64, 1, 2])), + Arc::new(BooleanArray::from(vec![false, false, true])), + ], + ) + .unwrap()) + }); + + let _schema = schema.clone(); + let filtering_stream = RowIndicesStream { + inner, + polled_inner: false, + conjunction_expr: and((col("a") % lit(2)).eq(lit(0)), col("b").is_true()), + schema_ref: _schema, + }; + + let rows: Vec = futures::executor::block_on_stream(filtering_stream) + .try_collect() + .unwrap(); + + assert_eq!(rows.len(), 1); + + // The output of row selection is a RecordBatch of indices that can be used as selectors + // against the original RecordBatch. + assert_eq!( + rows[0], + RecordBatch::try_new( + ROW_SELECTOR_SCHEMA_REF.clone(), + vec![Arc::new(UInt64Array::from(vec![2u64])),] + ) + .unwrap() + ); + } +} From 1536dba2538099d6cc0caf5adebfd09ec8a62cab Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Tue, 25 Jun 2024 20:01:42 -0400 Subject: [PATCH 04/16] implement operators --- vortex-array/src/array/struct_/mod.rs | 2 +- vortex-datafusion/Cargo.toml | 5 +- vortex-datafusion/src/lib.rs | 70 +++++++------ vortex-datafusion/src/plans.rs | 139 ++++++++++++++++++++++---- 4 files changed, 160 insertions(+), 56 deletions(-) diff --git a/vortex-array/src/array/struct_/mod.rs b/vortex-array/src/array/struct_/mod.rs index 7ffa0ceda2..cac5fecb49 100644 --- a/vortex-array/src/array/struct_/mod.rs +++ b/vortex-array/src/array/struct_/mod.rs @@ -128,7 +128,7 @@ impl StructArray { /// copying. /// /// This function will return an error if the projection includes invalid column IDs. - pub fn project(self, projection: &[usize]) -> VortexResult { + pub fn project(&self, projection: &[usize]) -> VortexResult { let mut children = Vec::with_capacity(projection.len()); let mut names = Vec::with_capacity(projection.len()); diff --git a/vortex-datafusion/Cargo.toml b/vortex-datafusion/Cargo.toml index f013abbf5c..615171e3f4 100644 --- a/vortex-datafusion/Cargo.toml +++ b/vortex-datafusion/Cargo.toml @@ -35,6 +35,5 @@ pin-project = { workspace = true } [dev-dependencies] tokio = { workspace = true, features = ["test-util"] } -# TODO(aduffy): re-enable -#[lints] -#workspace = true +[lints] +workspace = true diff --git a/vortex-datafusion/src/lib.rs b/vortex-datafusion/src/lib.rs index f72981e84c..c18bda8c1d 100644 --- a/vortex-datafusion/src/lib.rs +++ b/vortex-datafusion/src/lib.rs @@ -12,11 +12,11 @@ use arrow_schema::SchemaRef; use async_trait::async_trait; use datafusion::dataframe::DataFrame; use datafusion::datasource::TableProvider; -use datafusion::execution::context::SessionState; use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; +use datafusion::execution::context::SessionState; use datafusion::prelude::SessionContext; +use datafusion_common::{DataFusionError, exec_datafusion_err, Result as DFResult}; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; -use datafusion_common::{exec_datafusion_err, DataFusionError, Result as DFResult, ToDFSchema}; use datafusion_expr::{Expr, Operator, TableProviderFilterPushDown, TableType}; use datafusion_physical_expr::EquivalenceProperties; use datafusion_physical_plan::{ @@ -25,12 +25,15 @@ use datafusion_physical_plan::{ use futures::{Stream, StreamExt}; use itertools::Itertools; use pin_project::pin_project; -use vortex::array::chunked::ChunkedArray; + use vortex::{Array, ArrayDType, IntoArrayVariant, IntoCanonical}; +use vortex::array::chunked::ChunkedArray; +use vortex::array::struct_::StructArray; use vortex_dtype::DType; use vortex_error::{vortex_bail, VortexResult}; use crate::datatype::infer_schema; +use crate::plans::{RowSelectorExec, TakeRowsExec}; mod datatype; mod expr; @@ -119,7 +122,7 @@ impl TableProvider for VortexInMemoryTableProvider { let filter_exprs: Option> = if filters.is_empty() { None } else { - Some(filters.iter().cloned().collect()) + Some(filters.to_vec()) }; let filter_projection = filter_exprs @@ -138,6 +141,11 @@ impl TableProvider for VortexInMemoryTableProvider { ExecutionMode::Bounded, ); + let output_projection: Vec = match projection { + None => (0..self.schema_ref.fields().len()).collect(), + Some(proj) => proj.clone(), + }; + match (filter_exprs, filter_projection) { // If there is a filter expression, we execute in two phases, first performing a filter // on the input to get back row indices, and then taking the remaining struct columns @@ -147,17 +155,14 @@ impl TableProvider for VortexInMemoryTableProvider { filter_exprs, filter_projection, self.array.clone(), - projection.clone(), - plan_properties, + output_projection.clone(), )), // If no filters were pushed down, we materialize the entire StructArray into a // RecordBatch and let DataFusion process the entire query. _ => Ok(Arc::new(VortexScanExec { array: self.array.clone(), - filter_exprs: None, - filter_projection: None, - scan_projection: projection.cloned(), + scan_projection: output_projection.clone(), plan_properties, })), } @@ -178,7 +183,7 @@ impl TableProvider for VortexInMemoryTableProvider { filters .iter() .map(|expr| { - if can_be_pushed_down(*expr, &schema_columns)? { + if can_be_pushed_down(expr, &schema_columns)? { Ok(TableProviderFilterPushDown::Exact) } else { Ok(TableProviderFilterPushDown::Unsupported) @@ -196,16 +201,26 @@ impl TableProvider for VortexInMemoryTableProvider { /// The second stage receives the row selection above and dispatches a `take` on the remaining /// columns. fn make_filter_then_take_plan( - _schema: SchemaRef, - _filter_exprs: Vec, - _filter_projection: Vec, - _array: Array, - _output_projection: Option<&Vec>, - _plan_properties: PlanProperties, + schema: SchemaRef, + filter_exprs: Vec, + filter_projection: Vec, + array: Array, + output_projection: Vec, ) -> Arc { - // Create a struct array necessary to run the filter operations. + let struct_array = StructArray::try_from(array).unwrap(); + + let filter_struct = struct_array + .project(filter_projection.as_slice()) + .expect("projecting filter struct"); - todo!() + let row_selector_op = Arc::new(RowSelectorExec::new(&filter_exprs, &filter_struct)); + + Arc::new(TakeRowsExec::new( + schema.clone(), + &output_projection, + row_selector_op.clone(), + &struct_array, + )) } /// Check if the given expression tree can be pushed down into the scan. @@ -213,7 +228,7 @@ fn can_be_pushed_down(expr: &Expr, schema_columns: &HashSet) -> DFResult // If the filter references a column not known to our schema, we reject the filter for pushdown. // TODO(aduffy): is this necessary? Under what conditions would this happen? let column_refs = get_column_references(expr); - if !column_refs.is_subset(&schema_columns) { + if !column_refs.is_subset(schema_columns) { return Ok(false); } @@ -302,9 +317,7 @@ fn get_column_references(expr: &Expr) -> HashSet { #[derive(Debug, Clone)] struct VortexScanExec { array: Array, - filter_exprs: Option>, - filter_projection: Option>, - scan_projection: Option>, + scan_projection: Vec, plan_properties: PlanProperties, } @@ -321,7 +334,7 @@ impl DisplayAs for VortexScanExec { /// error if the projection references columns fn execute_unfiltered( array: &Array, - projection: &Option>, + projection: &Vec, ) -> DFResult { // Construct the RecordBatch by flattening each struct field and transmuting to an ArrayRef. let struct_array = array @@ -329,14 +342,8 @@ fn execute_unfiltered( .into_struct() .map_err(|vortex_error| DataFusionError::Execution(format!("{}", vortex_error)))?; - let field_order = if let Some(projection) = projection { - projection.clone() - } else { - (0..struct_array.names().len()).collect() - }; - let projected_struct = struct_array - .project(field_order.as_slice()) + .project(projection.as_slice()) .map_err(|vortex_err| { exec_datafusion_err!("projection pushdown to Vortex failed: {vortex_err}") })?; @@ -436,11 +443,12 @@ mod test { use datafusion::arrow::array::AsArray; use datafusion::prelude::SessionContext; use datafusion_expr::{col, count_distinct, lit}; + use vortex::array::primitive::PrimitiveArray; use vortex::array::struct_::StructArray; use vortex::array::varbin::VarBinArray; - use vortex::validity::Validity; use vortex::IntoArray; + use vortex::validity::Validity; use vortex_dtype::{DType, Nullability}; use crate::SessionContextExt; diff --git a/vortex-datafusion/src/plans.rs b/vortex-datafusion/src/plans.rs index 4e201c7793..87d76ab6f6 100644 --- a/vortex-datafusion/src/plans.rs +++ b/vortex-datafusion/src/plans.rs @@ -8,6 +8,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; use arrow_array::cast::AsArray; +use arrow_array::types::UInt64Type; use arrow_array::{ArrayRef, RecordBatch, UInt64Array}; use arrow_schema::{DataType, Field, Schema, SchemaRef}; use datafusion::prelude::SessionContext; @@ -20,8 +21,11 @@ use datafusion_physical_plan::{ }; use futures::{ready, Stream}; use lazy_static::lazy_static; +use pin_project::pin_project; use vortex::array::struct_::StructArray; -use vortex::{ArrayDType, IntoCanonical}; +use vortex::arrow::FromArrowArray; +use vortex::compute::take::take; +use vortex::{ArrayDType, ArrayData, IntoArray, IntoCanonical}; use crate::datatype::infer_schema; use crate::expr::{make_conjunction, simplify_expr}; @@ -31,7 +35,6 @@ use crate::expr::{make_conjunction, simplify_expr}; /// chunks but for different columns. pub(crate) struct RowSelectorExec { filter_exprs: Vec, - filter_projection: Vec, // cached PlanProperties object. We do not make use of this. cached_plan_props: PlanProperties, @@ -51,8 +54,7 @@ lazy_static! { impl RowSelectorExec { pub(crate) fn new( - filter_exprs: &Vec, - filter_projection: &Vec, + filter_exprs: &[Expr], filter_struct: &StructArray, ) -> Self { let cached_plan_props = PlanProperties::new( @@ -62,8 +64,7 @@ impl RowSelectorExec { ); Self { - filter_exprs: filter_exprs.clone(), - filter_projection: filter_projection.clone(), + filter_exprs: filter_exprs.to_owned(), filter_struct: filter_struct.clone(), cached_plan_props, } @@ -141,7 +142,7 @@ impl ExecutionPlan for RowSelectorExec { /// [RecordBatchStream] of row indices, emitted by the [RowSelectorExec] physical plan node. #[pin_project::pin_project] -struct RowIndicesStream { +pub(crate) struct RowIndicesStream { /// The inner future that returns `DFResult`. #[pin] inner: F, @@ -152,6 +153,20 @@ struct RowIndicesStream { schema_ref: SchemaRef, } +impl RowIndicesStream +where + F: Future>, +{ + pub fn new(record_batch_fut: F, conjunction_expr: Expr, schema_ref: SchemaRef) -> Self { + Self { + inner: record_batch_fut, + polled_inner: false, + conjunction_expr, + schema_ref, + } + } +} + impl Stream for RowIndicesStream where F: Future>, @@ -160,9 +175,10 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { println!("BEGIN poll_next"); - // Get access to a single record batch of values in the upstream system. let this = self.project(); + // If we have already polled the one-shot future with the filter records, indicate + // that the stream has finished. if *this.polled_inner { println!("EXIT"); return Poll::Ready(None); @@ -230,11 +246,6 @@ pub(crate) struct TakeRowsExec { output_schema: SchemaRef, - // A record batch holding the fields that were relevant to executing the upstream filter expression. - // These fields have already been decoded, so we hold them separately and "paste" them together - // with the fields we decode from `table` below. - filter_struct: RecordBatch, - // The original Vortex array holding the fields we have not decoded yet. table: StructArray, } @@ -242,10 +253,9 @@ pub(crate) struct TakeRowsExec { impl TakeRowsExec { pub(crate) fn new( schema_ref: SchemaRef, - projection: &Vec, + projection: &[usize], row_indices: Arc, - output_schema: SchemaRef, - table: StructArray, + table: &StructArray, ) -> Self { let plan_properties = PlanProperties::new( EquivalenceProperties::new(schema_ref.clone()), @@ -253,13 +263,14 @@ impl TakeRowsExec { ExecutionMode::Bounded, ); + let output_schema = Arc::new(schema_ref.project(projection).unwrap()); + Self { plan_properties, - projection: projection.clone(), + projection: projection.to_owned(), input: row_indices, output_schema: output_schema.clone(), - filter_struct: RecordBatch::new_empty(output_schema.clone()), - table, + table: table.clone(), } } } @@ -299,14 +310,100 @@ impl ExecutionPlan for TakeRowsExec { fn execute( &self, partition: usize, - _context: Arc, + context: Arc, ) -> DFResult { assert_eq!( partition, 0, "single partitioning only supported by TakeOperator" ); - todo!() + let row_indices_stream = self.input.execute(partition, context)?; + + Ok(Box::pin(TakeRowsStream { + row_indices_stream, + completed: false, + output_projection: self.projection.clone(), + output_schema: self.output_schema.clone(), + vortex_array: self.table.clone(), + })) + } +} + +/// Stream of outputs emitted by the [TakeRowsExec] physical operator. +#[pin_project] +pub(crate) struct TakeRowsStream { + // Stream of row indices arriving from upstream operator. + #[pin] + row_indices_stream: F, + + completed: bool, + + // Projection based on the schema here + output_projection: Vec, + output_schema: SchemaRef, + + // The original Vortex array we're taking from + vortex_array: StructArray, +} + +impl Stream for TakeRowsStream +where + F: Stream>, +{ + type Item = DFResult; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + + // If `poll_next` has already fired, return None indicating end of the stream. + if *this.completed { + return Poll::Ready(None); + } + + // Get the indices provided by the upstream operator. + let record_batch = match ready!(this.row_indices_stream.poll_next(cx)) { + None => { + // Row indices stream is complete, we are also complete. + // This should never happen right now given we only emit one recordbatch upstream. + return Poll::Ready(None); + } + Some(result) => { + *this.completed = true; + result? + } + }; + + let row_indices = + ArrayData::from_arrow(record_batch.column(0).as_primitive::(), false) + .into_array(); + + // Assemble the output columns using the row indices. + // NOTE(aduffy): this re-decodes the fields from the filter schema, which is unnecessary. + let mut columns = Vec::new(); + for field_idx in this.output_projection { + let encoded = this.vortex_array.field(*field_idx).unwrap(); + let decoded = take(&encoded, &row_indices) + .unwrap() + .into_canonical() + .unwrap() + .into_arrow(); + + columns.push(decoded); + } + + // Send back a single record batch of the decoded data + let output_batch = RecordBatch::try_new(this.output_schema.clone(), columns).unwrap(); + + Poll::Ready(Some(Ok(output_batch))) + } +} + +impl RecordBatchStream for TakeRowsStream +where + F: Stream>, +{ + fn schema(&self) -> SchemaRef { + self.output_schema.clone() } } From c51d7065cae5dca48915ee933b8028b3a89f9679 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Tue, 25 Jun 2024 20:14:44 -0400 Subject: [PATCH 05/16] couple fixes --- vortex-datafusion/src/lib.rs | 77 +++++++++++++++++++++++++--------- vortex-datafusion/src/plans.rs | 5 +-- 2 files changed, 58 insertions(+), 24 deletions(-) diff --git a/vortex-datafusion/src/lib.rs b/vortex-datafusion/src/lib.rs index c18bda8c1d..e52a819688 100644 --- a/vortex-datafusion/src/lib.rs +++ b/vortex-datafusion/src/lib.rs @@ -12,11 +12,11 @@ use arrow_schema::SchemaRef; use async_trait::async_trait; use datafusion::dataframe::DataFrame; use datafusion::datasource::TableProvider; -use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; use datafusion::execution::context::SessionState; +use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; use datafusion::prelude::SessionContext; -use datafusion_common::{DataFusionError, exec_datafusion_err, Result as DFResult}; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; +use datafusion_common::{exec_datafusion_err, DataFusionError, Result as DFResult}; use datafusion_expr::{Expr, Operator, TableProviderFilterPushDown, TableType}; use datafusion_physical_expr::EquivalenceProperties; use datafusion_physical_plan::{ @@ -25,12 +25,11 @@ use datafusion_physical_plan::{ use futures::{Stream, StreamExt}; use itertools::Itertools; use pin_project::pin_project; - -use vortex::{Array, ArrayDType, IntoArrayVariant, IntoCanonical}; use vortex::array::chunked::ChunkedArray; use vortex::array::struct_::StructArray; +use vortex::{Array, ArrayDType, IntoArrayVariant, IntoCanonical}; use vortex_dtype::DType; -use vortex_error::{vortex_bail, VortexResult}; +use vortex_error::VortexResult; use crate::datatype::infer_schema; use crate::plans::{RowSelectorExec, TakeRowsExec}; @@ -39,18 +38,40 @@ mod datatype; mod expr; mod plans; +/// Optional configurations to pass when loading a [VortexMemTable]. +#[derive(Default, Debug, Clone)] +pub struct VortexMemTableOptions { + pub disable_pushdown: bool, +} + +impl VortexMemTableOptions { + pub fn with_disable_pushdown(&mut self, disable_pushdown: bool) -> &mut Self { + self.disable_pushdown = disable_pushdown; + self + } +} + pub trait SessionContextExt { - fn read_vortex(&self, array: Array) -> DFResult; + fn read_vortex(&self, array: Array) -> DFResult { + self.read_vortex_opts(array, VortexMemTableOptions::default()) + } + + fn read_vortex_opts(&self, array: Array, options: VortexMemTableOptions) + -> DFResult; } impl SessionContextExt for SessionContext { - fn read_vortex(&self, array: Array) -> DFResult { + fn read_vortex_opts( + &self, + array: Array, + options: VortexMemTableOptions, + ) -> DFResult { assert!( matches!(array.dtype(), DType::Struct(_, _)), "Vortex arrays must have struct type" ); - let vortex_table = VortexInMemoryTableProvider::try_new(array) + let vortex_table = VortexMemTable::try_new(array, options) .map_err(|error| DataFusionError::Internal(format!("vortex error: {error}")))?; self.read_table(Arc::new(vortex_table)) @@ -62,27 +83,32 @@ impl SessionContextExt for SessionContext { /// Only arrays that have a top-level [struct type](vortex_dtype::StructDType) can be exposed as /// a table to DataFusion. #[derive(Debug, Clone)] -pub(crate) struct VortexInMemoryTableProvider { +pub(crate) struct VortexMemTable { array: Array, schema_ref: SchemaRef, + options: VortexMemTableOptions, } -impl VortexInMemoryTableProvider { +impl VortexMemTable { /// Build a new table provider from an existing [struct type](vortex_dtype::StructDType) array. - pub fn try_new(array: Array) -> VortexResult { - if !matches!(array.dtype(), DType::Struct(_, _)) { - vortex_bail!(InvalidArgument: "only DType::Struct arrays can produce a table provider"); - } - + /// + /// # Panics + /// + /// Creation will panic if the provided array is not of `DType::Struct` type. + pub fn try_new(array: Array, options: VortexMemTableOptions) -> VortexResult { let arrow_schema = infer_schema(array.dtype()); let schema_ref = SchemaRef::new(arrow_schema); - Ok(Self { array, schema_ref }) + Ok(Self { + array, + schema_ref, + options, + }) } } #[async_trait] -impl TableProvider for VortexInMemoryTableProvider { +impl TableProvider for VortexMemTable { fn as_any(&self) -> &dyn Any { self } @@ -172,7 +198,19 @@ impl TableProvider for VortexInMemoryTableProvider { &self, filters: &[&Expr], ) -> DFResult> { - // Get the set of column filters supported. + // In the case the caller has configured this provider with filter pushdown disabled, + // do not attempt to apply any filters at scan time. + if self.options.disable_pushdown { + return Ok(filters + .iter() + .map(|_| TableProviderFilterPushDown::Unsupported) + .collect()); + } + + // Verify for each filter whether its expression tree consists solely of operations we know + // how to pushdown, and that they only reference columns in our source. + // TODO(aduffy): figure out if we actually need to do this, or what guarantees about the + // filters that DataFusion provides. let schema_columns: HashSet = self .schema_ref .fields @@ -443,12 +481,11 @@ mod test { use datafusion::arrow::array::AsArray; use datafusion::prelude::SessionContext; use datafusion_expr::{col, count_distinct, lit}; - use vortex::array::primitive::PrimitiveArray; use vortex::array::struct_::StructArray; use vortex::array::varbin::VarBinArray; - use vortex::IntoArray; use vortex::validity::Validity; + use vortex::IntoArray; use vortex_dtype::{DType, Nullability}; use crate::SessionContextExt; diff --git a/vortex-datafusion/src/plans.rs b/vortex-datafusion/src/plans.rs index 87d76ab6f6..f4906afe6a 100644 --- a/vortex-datafusion/src/plans.rs +++ b/vortex-datafusion/src/plans.rs @@ -53,10 +53,7 @@ lazy_static! { } impl RowSelectorExec { - pub(crate) fn new( - filter_exprs: &[Expr], - filter_struct: &StructArray, - ) -> Self { + pub(crate) fn new(filter_exprs: &[Expr], filter_struct: &StructArray) -> Self { let cached_plan_props = PlanProperties::new( EquivalenceProperties::new(ROW_SELECTOR_SCHEMA_REF.clone()), Partitioning::RoundRobinBatch(1), From 3982bb7b860cc4546b0ad5395ba3d2cafddb13e3 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Wed, 26 Jun 2024 00:44:12 -0400 Subject: [PATCH 06/16] benchmarks, fixes found while doing benchmarks --- Cargo.lock | 4 + bench-vortex/Cargo.toml | 8 + bench-vortex/benches/datafusion_benchmark.rs | 160 +++++++++++++++++++ vortex-datafusion/src/lib.rs | 61 +++++-- vortex-datafusion/src/plans.rs | 49 ++++-- 5 files changed, 256 insertions(+), 26 deletions(-) create mode 100644 bench-vortex/benches/datafusion_benchmark.rs diff --git a/Cargo.lock b/Cargo.lock index 3fd8d68809..ce03c9eb3d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -421,11 +421,13 @@ name = "bench-vortex" version = "0.1.0" dependencies = [ "arrow-array", + "arrow-schema", "arrow-select", "bytes", "bzip2", "criterion", "csv", + "datafusion", "enum-iterator", "flexbuffers", "futures", @@ -435,6 +437,7 @@ dependencies = [ "log", "mimalloc", "parquet", + "rand", "reqwest", "serde", "simplelog", @@ -443,6 +446,7 @@ dependencies = [ "vortex-alp", "vortex-array", "vortex-buffer", + "vortex-datafusion", "vortex-datetime-parts", "vortex-dict", "vortex-dtype", diff --git a/bench-vortex/Cargo.toml b/bench-vortex/Cargo.toml index cb222976d9..b314f0dcf4 100644 --- a/bench-vortex/Cargo.toml +++ b/bench-vortex/Cargo.toml @@ -16,10 +16,12 @@ workspace = true [dependencies] arrow-array = { workspace = true } +arrow-schema = { workspace = true } arrow-select = { workspace = true } bytes = { workspace = true } bzip2 = { workspace = true } csv = { workspace = true } +datafusion = { workspace = true } enum-iterator = { workspace = true } flexbuffers = { workspace = true } futures = { workspace = true } @@ -29,6 +31,7 @@ lazy_static = { workspace = true } log = { workspace = true } mimalloc = { workspace = true } parquet = { workspace = true, features = [] } +rand = { workspace = true } reqwest = { workspace = true } serde = { workspace = true } simplelog = { workspace = true } @@ -37,6 +40,7 @@ uuid = { workspace = true, features = ["v4"] } vortex-alp = { path = "../encodings/alp" } vortex-array = { path = "../vortex-array" } vortex-buffer = { path = "../vortex-buffer" } +vortex-datafusion = { path = "../vortex-datafusion" } vortex-datetime-parts = { path = "../encodings/datetime-parts" } vortex-dict = { path = "../encodings/dict" } vortex-dtype = { path = "../vortex-dtype" } @@ -56,3 +60,7 @@ harness = false [[bench]] name = "random_access" harness = false + +[[bench]] +name = "datafusion_benchmark" +harness = false diff --git a/bench-vortex/benches/datafusion_benchmark.rs b/bench-vortex/benches/datafusion_benchmark.rs new file mode 100644 index 0000000000..1e9204039c --- /dev/null +++ b/bench-vortex/benches/datafusion_benchmark.rs @@ -0,0 +1,160 @@ +use std::sync::Arc; + +use arrow_array::builder::{StringBuilder, UInt32Builder}; +use arrow_array::RecordBatch; +use arrow_schema::{DataType, Field, Schema}; +use criterion::{black_box, criterion_group, criterion_main, Criterion}; +use datafusion::datasource::MemTable; +use datafusion::functions_aggregate::expr_fn::sum; +use datafusion::logical_expr::lit; +use datafusion::prelude::{col, SessionContext}; +use lazy_static::lazy_static; +use vortex::compress::Compressor; +use vortex::encoding::EncodingRef; +use vortex::{Array, Context, IntoArray, ToArrayData}; +use vortex_datafusion::{VortexMemTable, VortexMemTableOptions}; +use vortex_fastlanes::{BitPackedEncoding, DeltaEncoding, FoREncoding}; + +lazy_static! { + pub static ref CTX: Context = Context::default().with_encodings([ + &BitPackedEncoding as EncodingRef, + // &DictEncoding, + &FoREncoding, + &DeltaEncoding, + ]); +} + +fn toy_dataset_arrow() -> RecordBatch { + // 64,000 rows of string and numeric data. + // 8,000 values of first string, second string, third string, etc. + + let names = vec![ + "Alexander", + "Anastasia", + "Archibald", + "Bartholomew", + "Benjamin", + "Christopher", + "Elizabeth", + "Gabriella", + ]; + + let mut col1 = StringBuilder::with_capacity(64_000, 64_000_000); + let mut col2 = UInt32Builder::with_capacity(64_000); + for i in 0..64_000 { + col1.append_value(names[i % 8]); + col2.append_value(u32::try_from(i).unwrap()); + } + + let col1 = col1.finish(); + let col2 = col2.finish(); + + RecordBatch::try_new( + Arc::new(Schema::new(vec![ + Field::new("names", DataType::Utf8, false), + Field::new("scores", DataType::UInt32, false), + ])), + vec![Arc::new(col1), Arc::new(col2)], + ) + .unwrap() +} + +fn toy_dataset_vortex() -> Array { + let uncompressed = toy_dataset_arrow().to_array_data().into_array(); + println!("uncompressed vortex size: {}B", uncompressed.nbytes()); + + let compressor = Compressor::new(&CTX); + let compressed = compressor.compress(&uncompressed, None).unwrap(); + println!("compressed vortex size: {} B", compressed.nbytes()); + compressed +} + +fn bench_datafusion(c: &mut Criterion) { + let mut group = c.benchmark_group("datafusion"); + + let session = SessionContext::new(); + + let arrow_dataset = toy_dataset_arrow(); + let arrow_table = + Arc::new(MemTable::try_new(arrow_dataset.schema(), vec![vec![arrow_dataset]]).unwrap()); + + group.bench_function("arrow", |b| { + let arrow_table = arrow_table.clone(); + b.to_async( + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(), + ) + .iter(|| async { + black_box(session.read_table(arrow_table.clone()).unwrap()) + .filter(col("scores").gt_eq(lit(3_000))) + .unwrap() + .filter(col("scores").lt_eq(lit(4_000))) + .unwrap() + .aggregate(vec![], vec![sum(col("scores"))]) + .unwrap() + .collect() + .await + .unwrap(); + }) + }); + + let vortex_dataset = toy_dataset_vortex(); + let vortex_table_pushdown = Arc::new( + VortexMemTable::try_new(vortex_dataset, VortexMemTableOptions::default()).unwrap(), + ); + group.bench_function("vortex_pushdown", |b| { + let vortex_table_pushdown = vortex_table_pushdown.clone(); + b.to_async(tokio::runtime::Runtime::new().unwrap()) + .iter(|| async { + black_box(session.read_table(vortex_table_pushdown.clone()).unwrap()) + .filter(col("scores").gt_eq(lit(3_000))) + .unwrap() + .filter(col("scores").lt_eq(lit(4_000))) + .unwrap() + .aggregate(vec![], vec![sum(col("scores"))]) + .unwrap() + .collect() + .await + .unwrap(); + }) + }); + + let vortex_dataset = toy_dataset_vortex(); + let vortex_table_no_pushdown = Arc::new( + VortexMemTable::try_new( + vortex_dataset, + VortexMemTableOptions::default().with_disable_pushdown(true), + ) + .unwrap(), + ); + group.bench_function("vortex_no_pushdown", |b| { + let vortex_table_no_pushdown = vortex_table_no_pushdown.clone(); + b.to_async( + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(), + ) + .iter(|| async { + black_box( + session + .read_table(vortex_table_no_pushdown.clone()) + .unwrap(), + ) + .filter(col("scores").gt_eq(lit(3_000))) + .unwrap() + .filter(col("scores").lt_eq(lit(4_000))) + .unwrap() + .aggregate(vec![], vec![sum(col("scores"))]) + .unwrap() + .collect() + .await + .unwrap(); + }) + }); +} + +criterion_group!(benches, bench_datafusion); +criterion_main!(benches); diff --git a/vortex-datafusion/src/lib.rs b/vortex-datafusion/src/lib.rs index e52a819688..b02c63f273 100644 --- a/vortex-datafusion/src/lib.rs +++ b/vortex-datafusion/src/lib.rs @@ -45,7 +45,7 @@ pub struct VortexMemTableOptions { } impl VortexMemTableOptions { - pub fn with_disable_pushdown(&mut self, disable_pushdown: bool) -> &mut Self { + pub fn with_disable_pushdown(mut self, disable_pushdown: bool) -> Self { self.disable_pushdown = disable_pushdown; self } @@ -83,7 +83,7 @@ impl SessionContextExt for SessionContext { /// Only arrays that have a top-level [struct type](vortex_dtype::StructDType) can be exposed as /// a table to DataFusion. #[derive(Debug, Clone)] -pub(crate) struct VortexMemTable { +pub struct VortexMemTable { array: Array, schema_ref: SchemaRef, options: VortexMemTableOptions, @@ -297,6 +297,8 @@ fn can_be_pushed_down(expr: &Expr, schema_columns: &HashSet) -> DFResult | Expr::IsFalse(_) | Expr::IsNotTrue(_) | Expr::IsNotFalse(_) + | Expr::Column(_) + | Expr::Literal(_) // TODO(aduffy): ensure that cast can be pushed down. | Expr::Cast(_) => true, _ => false, @@ -374,17 +376,20 @@ fn execute_unfiltered( array: &Array, projection: &Vec, ) -> DFResult { + println!("EXECUTE_UNFILTERED"); // Construct the RecordBatch by flattening each struct field and transmuting to an ArrayRef. let struct_array = array .clone() .into_struct() .map_err(|vortex_error| DataFusionError::Execution(format!("{}", vortex_error)))?; + println!("PROJECTION: {:?}", projection); let projected_struct = struct_array .project(projection.as_slice()) .map_err(|vortex_err| { exec_datafusion_err!("projection pushdown to Vortex failed: {vortex_err}") })?; + println!("PROJECTED SCHEMA: {:?}", projected_struct.dtype()); let batch = RecordBatch::from( projected_struct .into_canonical() @@ -463,6 +468,7 @@ impl ExecutionPlan for VortexScanExec { partition: usize, _context: Arc, ) -> DFResult { + println!("EXECUTE VortexScanExec"); let chunk = if let Ok(chunked_array) = ChunkedArray::try_from(&self.array) { chunked_array .chunk(partition) @@ -485,13 +491,12 @@ mod test { use vortex::array::struct_::StructArray; use vortex::array::varbin::VarBinArray; use vortex::validity::Validity; - use vortex::IntoArray; + use vortex::{Array, IntoArray}; use vortex_dtype::{DType, Nullability}; - use crate::SessionContextExt; + use crate::{SessionContextExt, VortexMemTableOptions}; - #[tokio::test] - async fn test_datafusion_simple() { + fn presidents_array() -> Array { let names = VarBinArray::from_vec( vec![ "Washington", @@ -508,15 +513,53 @@ mod test { Validity::NonNullable, ); - let presidents = StructArray::from_fields(&[ + StructArray::from_fields(&[ ("president", names.into_array()), ("term_start", term_start.into_array()), ]) - .into_array(); + .into_array() + } + + #[tokio::test] + async fn test_datafusion_pushdown() { + let ctx = SessionContext::new(); + + let df = ctx.read_vortex(presidents_array()).unwrap(); + + let distinct_names = df + .filter(col("term_start").gt_eq(lit(1795))) + .unwrap() + .aggregate(vec![], vec![count_distinct(col("president"))]) + .unwrap() + .collect() + .await + .unwrap(); + assert_eq!(distinct_names.len(), 1); + + assert_eq!( + *distinct_names[0] + .column(0) + .as_primitive::() + .values() + .first() + .unwrap(), + 4i64 + ); + } + + #[tokio::test] + async fn test_datafusion_no_pushdown() { let ctx = SessionContext::new(); - let df = ctx.read_vortex(presidents).unwrap(); + let df = ctx + .read_vortex_opts( + presidents_array(), + // Disable pushdown. We run this test to make sure that the naive codepath also + // produces correct results and does not panic anywhere. + VortexMemTableOptions::default().with_disable_pushdown(true), + ) + .unwrap(); let distinct_names = df .filter(col("term_start").gt_eq(lit(1795))) diff --git a/vortex-datafusion/src/plans.rs b/vortex-datafusion/src/plans.rs index f4906afe6a..0b4c2f8f53 100644 --- a/vortex-datafusion/src/plans.rs +++ b/vortex-datafusion/src/plans.rs @@ -9,8 +9,9 @@ use std::task::{Context, Poll}; use arrow_array::cast::AsArray; use arrow_array::types::UInt64Type; -use arrow_array::{ArrayRef, RecordBatch, UInt64Array}; +use arrow_array::{ArrayRef, RecordBatch, RecordBatchOptions, UInt64Array}; use arrow_schema::{DataType, Field, Schema, SchemaRef}; +use datafusion::arrow::compute::cast; use datafusion::prelude::SessionContext; use datafusion_common::{DFSchema, Result as DFResult}; use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; @@ -70,7 +71,9 @@ impl RowSelectorExec { impl Debug for RowSelectorExec { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("RowSelectorExec").finish() + f.debug_struct("RowSelectorExec") + .field("filter_exprs", &self.filter_exprs) + .finish() } } @@ -102,7 +105,7 @@ impl ExecutionPlan for RowSelectorExec { self: Arc, _children: Vec>, ) -> DFResult> { - panic!("with_new_children not supported for RowSelectorExec") + Ok(self) } fn execute( @@ -171,13 +174,11 @@ where type Item = DFResult; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - println!("BEGIN poll_next"); let this = self.project(); // If we have already polled the one-shot future with the filter records, indicate // that the stream has finished. if *this.polled_inner { - println!("EXIT"); return Poll::Ready(None); } @@ -186,7 +187,6 @@ where // initial batch for us to process. // // We want to avoid ever calling it again. - println!("POLL record_batch"); let record_batch = ready!(this.inner.poll(cx))?; *this.polled_inner = true; @@ -195,7 +195,6 @@ where // // The result of a conjunction expression is a BooleanArray containing `true` for rows // where the conjunction was satisfied, and `false` otherwise. - println!("CREATE session"); let session = SessionContext::new(); let df_schema = DFSchema::try_from(this.schema_ref.clone())?; let physical_expr = @@ -216,7 +215,6 @@ where let indices: ArrayRef = Arc::new(UInt64Array::from(selection_indices)); let indices_batch = RecordBatch::try_new(ROW_SELECTOR_SCHEMA_REF.clone(), vec![indices])?; - println!("RETURNING Poll::Ready"); Poll::Ready(Some(Ok(indices_batch))) } } @@ -274,7 +272,10 @@ impl TakeRowsExec { impl Debug for TakeRowsExec { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("Take").finish() + f.debug_struct("TakeRowsExec") + .field("projection", &self.projection) + .field("output_schema", &self.output_schema) + .finish() } } @@ -301,7 +302,7 @@ impl ExecutionPlan for TakeRowsExec { self: Arc, _children: Vec>, ) -> DFResult> { - panic!("unsupported with_new_children for {:?}", &self) + Ok(self) } fn execute( @@ -374,22 +375,36 @@ where ArrayData::from_arrow(record_batch.column(0).as_primitive::(), false) .into_array(); + // If no columns in the output projection, we send back a RecordBatch with empty schema. + // This is common for COUNT queries. + if this.output_projection.is_empty() { + let opts = RecordBatchOptions::new().with_row_count(Some(row_indices.len())); + return Poll::Ready(Some(Ok(RecordBatch::try_new_with_options( + Arc::new(Schema::empty()), + vec![], + &opts, + ) + .unwrap()))); + } + // Assemble the output columns using the row indices. // NOTE(aduffy): this re-decodes the fields from the filter schema, which is unnecessary. let mut columns = Vec::new(); - for field_idx in this.output_projection { - let encoded = this.vortex_array.field(*field_idx).unwrap(); + for (output_idx, src_idx) in this.output_projection.iter().enumerate() { + let encoded = this.vortex_array.field(*src_idx).expect("field access"); let decoded = take(&encoded, &row_indices) - .unwrap() + .expect("take") .into_canonical() - .unwrap() + .expect("into_canonical") .into_arrow(); + let data_type = this.output_schema.field(output_idx).data_type(); - columns.push(decoded); + columns.push(cast(&decoded, data_type).expect("cast")); } - // Send back a single record batch of the decoded data - let output_batch = RecordBatch::try_new(this.output_schema.clone(), columns).unwrap(); + // Send back a single record batch of the decoded data. + let output_batch = RecordBatch::try_new(this.output_schema.clone(), columns) + .expect("RecordBatch::try_new"); Poll::Ready(Some(Ok(output_batch))) } From f417c510b390c4a5f0f9552a3ca73376c06e3c5b Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Wed, 26 Jun 2024 10:25:36 -0400 Subject: [PATCH 07/16] fix bug in VortexScanExec, address some comments --- bench-vortex/benches/datafusion_benchmark.rs | 144 ++++++++++--------- vortex-datafusion/src/lib.rs | 64 +++++---- vortex-datafusion/src/plans.rs | 28 ++-- 3 files changed, 126 insertions(+), 110 deletions(-) diff --git a/bench-vortex/benches/datafusion_benchmark.rs b/bench-vortex/benches/datafusion_benchmark.rs index 1e9204039c..50b0c125a1 100644 --- a/bench-vortex/benches/datafusion_benchmark.rs +++ b/bench-vortex/benches/datafusion_benchmark.rs @@ -3,15 +3,19 @@ use std::sync::Arc; use arrow_array::builder::{StringBuilder, UInt32Builder}; use arrow_array::RecordBatch; use arrow_schema::{DataType, Field, Schema}; -use criterion::{black_box, criterion_group, criterion_main, Criterion}; -use datafusion::datasource::MemTable; +use criterion::{BenchmarkGroup, black_box, Criterion, criterion_group, criterion_main}; +use criterion::measurement::Measurement; +use datafusion::common::Result as DFResult; +use datafusion::datasource::{MemTable, TableProvider}; +use datafusion::execution::memory_pool::human_readable_size; use datafusion::functions_aggregate::expr_fn::sum; use datafusion::logical_expr::lit; -use datafusion::prelude::{col, SessionContext}; +use datafusion::prelude::{col, DataFrame, SessionContext}; use lazy_static::lazy_static; + +use vortex::{Array, Context, IntoArray, ToArrayData}; use vortex::compress::Compressor; use vortex::encoding::EncodingRef; -use vortex::{Array, Context, IntoArray, ToArrayData}; use vortex_datafusion::{VortexMemTable, VortexMemTableOptions}; use vortex_fastlanes::{BitPackedEncoding, DeltaEncoding, FoREncoding}; @@ -28,7 +32,7 @@ fn toy_dataset_arrow() -> RecordBatch { // 64,000 rows of string and numeric data. // 8,000 values of first string, second string, third string, etc. - let names = vec![ + let names = [ "Alexander", "Anastasia", "Archibald", @@ -61,25 +65,33 @@ fn toy_dataset_arrow() -> RecordBatch { fn toy_dataset_vortex() -> Array { let uncompressed = toy_dataset_arrow().to_array_data().into_array(); - println!("uncompressed vortex size: {}B", uncompressed.nbytes()); + println!( + "uncompressed size: {:?}", + human_readable_size(uncompressed.nbytes()) + ); let compressor = Compressor::new(&CTX); let compressed = compressor.compress(&uncompressed, None).unwrap(); - println!("compressed vortex size: {} B", compressed.nbytes()); + println!( + "vortex compressed size: {:?}", + human_readable_size(compressed.nbytes()) + ); compressed } -fn bench_datafusion(c: &mut Criterion) { - let mut group = c.benchmark_group("datafusion"); - - let session = SessionContext::new(); - - let arrow_dataset = toy_dataset_arrow(); - let arrow_table = - Arc::new(MemTable::try_new(arrow_dataset.schema(), vec![vec![arrow_dataset]]).unwrap()); +fn filter_agg_query(df: DataFrame) -> DFResult { + // SELECT SUM(scores) FROM table WHERE scores >= 3000 AND scores <= 4000 + df.filter(col("scores").gt_eq(lit(3_000)))? + .filter(col("scores").lt_eq(lit(4_000)))? + .aggregate(vec![], vec![sum(col("scores"))]) +} - group.bench_function("arrow", |b| { - let arrow_table = arrow_table.clone(); +fn measure_provider( + group: &mut BenchmarkGroup, + session: &SessionContext, + table: Arc, +) { + group.bench_function("planning", |b| { b.to_async( tokio::runtime::Builder::new_current_thread() .enable_all() @@ -87,40 +99,57 @@ fn bench_datafusion(c: &mut Criterion) { .unwrap(), ) .iter(|| async { - black_box(session.read_table(arrow_table.clone()).unwrap()) - .filter(col("scores").gt_eq(lit(3_000))) + // Force physical planner to execute on our TableProvider. + filter_agg_query(black_box(session).read_table(table.clone()).unwrap()) .unwrap() - .filter(col("scores").lt_eq(lit(4_000))) - .unwrap() - .aggregate(vec![], vec![sum(col("scores"))]) + .create_physical_plan() + .await + .unwrap(); + }); + }); + + group.bench_function("exec", |b| { + b.to_async( + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(), + ) + .iter(|| async { + // Force full query execution with .collect() + filter_agg_query(black_box(session).read_table(table.clone()).unwrap()) .unwrap() .collect() .await .unwrap(); - }) + }); }); +} + +fn bench_arrow(mut group: BenchmarkGroup, session: &SessionContext) { + let arrow_dataset = toy_dataset_arrow(); + let arrow_table = + Arc::new(MemTable::try_new(arrow_dataset.schema(), vec![vec![arrow_dataset]]).unwrap()); + + measure_provider(&mut group, session, arrow_table); +} +fn bench_vortex_pushdown_enabled( + mut group: BenchmarkGroup, + session: &SessionContext, +) { let vortex_dataset = toy_dataset_vortex(); let vortex_table_pushdown = Arc::new( VortexMemTable::try_new(vortex_dataset, VortexMemTableOptions::default()).unwrap(), ); - group.bench_function("vortex_pushdown", |b| { - let vortex_table_pushdown = vortex_table_pushdown.clone(); - b.to_async(tokio::runtime::Runtime::new().unwrap()) - .iter(|| async { - black_box(session.read_table(vortex_table_pushdown.clone()).unwrap()) - .filter(col("scores").gt_eq(lit(3_000))) - .unwrap() - .filter(col("scores").lt_eq(lit(4_000))) - .unwrap() - .aggregate(vec![], vec![sum(col("scores"))]) - .unwrap() - .collect() - .await - .unwrap(); - }) - }); + measure_provider(&mut group, session, vortex_table_pushdown); +} + +fn bench_vortex_pushdown_disabled( + mut group: BenchmarkGroup, + session: &SessionContext, +) { let vortex_dataset = toy_dataset_vortex(); let vortex_table_no_pushdown = Arc::new( VortexMemTable::try_new( @@ -129,31 +158,18 @@ fn bench_datafusion(c: &mut Criterion) { ) .unwrap(), ); - group.bench_function("vortex_no_pushdown", |b| { - let vortex_table_no_pushdown = vortex_table_no_pushdown.clone(); - b.to_async( - tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(), - ) - .iter(|| async { - black_box( - session - .read_table(vortex_table_no_pushdown.clone()) - .unwrap(), - ) - .filter(col("scores").gt_eq(lit(3_000))) - .unwrap() - .filter(col("scores").lt_eq(lit(4_000))) - .unwrap() - .aggregate(vec![], vec![sum(col("scores"))]) - .unwrap() - .collect() - .await - .unwrap(); - }) - }); + + measure_provider(&mut group, session, vortex_table_no_pushdown); +} + +fn bench_datafusion(c: &mut Criterion) { + bench_arrow(c.benchmark_group("arrow"), &SessionContext::new()); + bench_vortex_pushdown_enabled(c.benchmark_group("vortex-pushdown"), &SessionContext::new()); + + bench_vortex_pushdown_disabled( + c.benchmark_group("vortex-no_pushdown"), + &SessionContext::new(), + ); } criterion_group!(benches, bench_datafusion); diff --git a/vortex-datafusion/src/lib.rs b/vortex-datafusion/src/lib.rs index b02c63f273..fce0fc1e99 100644 --- a/vortex-datafusion/src/lib.rs +++ b/vortex-datafusion/src/lib.rs @@ -127,7 +127,7 @@ impl TableProvider for VortexMemTable { /// The array is flattened directly into the nearest Arrow-compatible encoding. async fn scan( &self, - _state: &SessionState, + state: &SessionState, projection: Option<&Vec>, filters: &[Expr], _limit: Option, @@ -151,46 +151,55 @@ impl TableProvider for VortexMemTable { Some(filters.to_vec()) }; - let filter_projection = filter_exprs - .clone() - .map(|exprs| get_filter_projection(exprs.as_slice(), self.schema_ref.clone())); - let partitioning = if let Ok(chunked_array) = ChunkedArray::try_from(&self.array) { Partitioning::RoundRobinBatch(chunked_array.nchunks()) } else { Partitioning::UnknownPartitioning(1) }; - let plan_properties = PlanProperties::new( - EquivalenceProperties::new(self.schema_ref.clone()), - partitioning, - ExecutionMode::Bounded, - ); - let output_projection: Vec = match projection { None => (0..self.schema_ref.fields().len()).collect(), Some(proj) => proj.clone(), }; - match (filter_exprs, filter_projection) { + match filter_exprs { // If there is a filter expression, we execute in two phases, first performing a filter // on the input to get back row indices, and then taking the remaining struct columns // using the calculated indices from the filter. - (Some(filter_exprs), Some(filter_projection)) => Ok(make_filter_then_take_plan( - self.schema_ref.clone(), - filter_exprs, - filter_projection, - self.array.clone(), - output_projection.clone(), - )), + Some(filter_exprs) => { + let filter_projection = + get_filter_projection(filter_exprs.as_slice(), self.schema_ref.clone()); + + Ok(make_filter_then_take_plan( + self.schema_ref.clone(), + filter_exprs, + filter_projection, + self.array.clone(), + output_projection.clone(), + state, + )) + } // If no filters were pushed down, we materialize the entire StructArray into a // RecordBatch and let DataFusion process the entire query. - _ => Ok(Arc::new(VortexScanExec { - array: self.array.clone(), - scan_projection: output_projection.clone(), - plan_properties, - })), + _ => { + let output_schema = Arc::new( + self.schema_ref + .project(output_projection.as_slice()) + .expect("project output schema"), + ); + let plan_properties = PlanProperties::new( + EquivalenceProperties::new(output_schema), + partitioning, + ExecutionMode::Bounded, + ); + + Ok(Arc::new(VortexScanExec { + array: self.array.clone(), + scan_projection: output_projection.clone(), + plan_properties, + })) + } } } @@ -244,6 +253,7 @@ fn make_filter_then_take_plan( filter_projection: Vec, array: Array, output_projection: Vec, + _session_state: &SessionState, ) -> Arc { let struct_array = StructArray::try_from(array).unwrap(); @@ -376,20 +386,17 @@ fn execute_unfiltered( array: &Array, projection: &Vec, ) -> DFResult { - println!("EXECUTE_UNFILTERED"); // Construct the RecordBatch by flattening each struct field and transmuting to an ArrayRef. let struct_array = array .clone() .into_struct() .map_err(|vortex_error| DataFusionError::Execution(format!("{}", vortex_error)))?; - println!("PROJECTION: {:?}", projection); let projected_struct = struct_array .project(projection.as_slice()) .map_err(|vortex_err| { exec_datafusion_err!("projection pushdown to Vortex failed: {vortex_err}") })?; - println!("PROJECTED SCHEMA: {:?}", projected_struct.dtype()); let batch = RecordBatch::from( projected_struct .into_canonical() @@ -468,7 +475,6 @@ impl ExecutionPlan for VortexScanExec { partition: usize, _context: Arc, ) -> DFResult { - println!("EXECUTE VortexScanExec"); let chunk = if let Ok(chunked_array) = ChunkedArray::try_from(&self.array) { chunked_array .chunk(partition) @@ -564,6 +570,8 @@ mod test { let distinct_names = df .filter(col("term_start").gt_eq(lit(1795))) .unwrap() + .filter(col("term_start").lt(lit(2000))) + .unwrap() .aggregate(vec![], vec![count_distinct(col("president"))]) .unwrap() .collect() diff --git a/vortex-datafusion/src/plans.rs b/vortex-datafusion/src/plans.rs index 0b4c2f8f53..e17ebca8e3 100644 --- a/vortex-datafusion/src/plans.rs +++ b/vortex-datafusion/src/plans.rs @@ -12,7 +12,7 @@ use arrow_array::types::UInt64Type; use arrow_array::{ArrayRef, RecordBatch, RecordBatchOptions, UInt64Array}; use arrow_schema::{DataType, Field, Schema, SchemaRef}; use datafusion::arrow::compute::cast; -use datafusion::prelude::SessionContext; +use datafusion::execution::context::SessionState; use datafusion_common::{DFSchema, Result as DFResult}; use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; use datafusion_expr::Expr; @@ -111,7 +111,7 @@ impl ExecutionPlan for RowSelectorExec { fn execute( &self, partition: usize, - _context: Arc, + context: Arc, ) -> DFResult { assert_eq!( partition, 0, @@ -136,6 +136,7 @@ impl ExecutionPlan for RowSelectorExec { polled_inner: false, conjunction_expr, schema_ref: stream_schema, + context: context.clone(), })) } } @@ -151,20 +152,7 @@ pub(crate) struct RowIndicesStream { conjunction_expr: Expr, schema_ref: SchemaRef, -} - -impl RowIndicesStream -where - F: Future>, -{ - pub fn new(record_batch_fut: F, conjunction_expr: Expr, schema_ref: SchemaRef) -> Self { - Self { - inner: record_batch_fut, - polled_inner: false, - conjunction_expr, - schema_ref, - } - } + context: Arc, } impl Stream for RowIndicesStream @@ -195,10 +183,13 @@ where // // The result of a conjunction expression is a BooleanArray containing `true` for rows // where the conjunction was satisfied, and `false` otherwise. - let session = SessionContext::new(); + let session_state = SessionState::new_with_config_rt( + this.context.session_config().clone(), + this.context.runtime_env().clone(), + ); let df_schema = DFSchema::try_from(this.schema_ref.clone())?; let physical_expr = - session.create_physical_expr(this.conjunction_expr.clone(), &df_schema)?; + session_state.create_physical_expr(this.conjunction_expr.clone(), &df_schema)?; let selection = physical_expr .evaluate(&record_batch)? .into_array(record_batch.num_rows())?; @@ -455,6 +446,7 @@ mod test { polled_inner: false, conjunction_expr: and((col("a") % lit(2)).eq(lit(0)), col("b").is_true()), schema_ref: _schema, + context: Arc::new(Default::default()), }; let rows: Vec = futures::executor::block_on_stream(filtering_stream) From 5d65ad4d2eb114b820991758e382f27e2e745548 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Wed, 26 Jun 2024 10:32:31 -0400 Subject: [PATCH 08/16] remove SessionState creation --- vortex-datafusion/src/plans.rs | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/vortex-datafusion/src/plans.rs b/vortex-datafusion/src/plans.rs index e17ebca8e3..cc4d839080 100644 --- a/vortex-datafusion/src/plans.rs +++ b/vortex-datafusion/src/plans.rs @@ -12,11 +12,10 @@ use arrow_array::types::UInt64Type; use arrow_array::{ArrayRef, RecordBatch, RecordBatchOptions, UInt64Array}; use arrow_schema::{DataType, Field, Schema, SchemaRef}; use datafusion::arrow::compute::cast; -use datafusion::execution::context::SessionState; use datafusion_common::{DFSchema, Result as DFResult}; use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; use datafusion_expr::Expr; -use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; +use datafusion_physical_expr::{create_physical_expr, EquivalenceProperties, Partitioning}; use datafusion_physical_plan::{ DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, PlanProperties, }; @@ -183,13 +182,9 @@ where // // The result of a conjunction expression is a BooleanArray containing `true` for rows // where the conjunction was satisfied, and `false` otherwise. - let session_state = SessionState::new_with_config_rt( - this.context.session_config().clone(), - this.context.runtime_env().clone(), - ); let df_schema = DFSchema::try_from(this.schema_ref.clone())?; let physical_expr = - session_state.create_physical_expr(this.conjunction_expr.clone(), &df_schema)?; + create_physical_expr(this.conjunction_expr, &df_schema, &Default::default())?; let selection = physical_expr .evaluate(&record_batch)? .into_array(record_batch.num_rows())?; From 0ad3007eff7f28efaacbefaf49849a33f15def89 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Wed, 26 Jun 2024 10:37:07 -0400 Subject: [PATCH 09/16] more samples --- bench-vortex/benches/datafusion_benchmark.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/bench-vortex/benches/datafusion_benchmark.rs b/bench-vortex/benches/datafusion_benchmark.rs index 50b0c125a1..f8dbc9db00 100644 --- a/bench-vortex/benches/datafusion_benchmark.rs +++ b/bench-vortex/benches/datafusion_benchmark.rs @@ -3,8 +3,8 @@ use std::sync::Arc; use arrow_array::builder::{StringBuilder, UInt32Builder}; use arrow_array::RecordBatch; use arrow_schema::{DataType, Field, Schema}; -use criterion::{BenchmarkGroup, black_box, Criterion, criterion_group, criterion_main}; use criterion::measurement::Measurement; +use criterion::{black_box, criterion_group, criterion_main, BenchmarkGroup, Criterion}; use datafusion::common::Result as DFResult; use datafusion::datasource::{MemTable, TableProvider}; use datafusion::execution::memory_pool::human_readable_size; @@ -12,10 +12,9 @@ use datafusion::functions_aggregate::expr_fn::sum; use datafusion::logical_expr::lit; use datafusion::prelude::{col, DataFrame, SessionContext}; use lazy_static::lazy_static; - -use vortex::{Array, Context, IntoArray, ToArrayData}; use vortex::compress::Compressor; use vortex::encoding::EncodingRef; +use vortex::{Array, Context, IntoArray, ToArrayData}; use vortex_datafusion::{VortexMemTable, VortexMemTableOptions}; use vortex_fastlanes::{BitPackedEncoding, DeltaEncoding, FoREncoding}; @@ -43,9 +42,9 @@ fn toy_dataset_arrow() -> RecordBatch { "Gabriella", ]; - let mut col1 = StringBuilder::with_capacity(64_000, 64_000_000); - let mut col2 = UInt32Builder::with_capacity(64_000); - for i in 0..64_000 { + let mut col1 = StringBuilder::with_capacity(640_000, 64_000_000); + let mut col2 = UInt32Builder::with_capacity(640_000); + for i in 0..640_000 { col1.append_value(names[i % 8]); col2.append_value(u32::try_from(i).unwrap()); } From f17cb70188997eeaf624e61d53cb3bcba3a50c3c Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Wed, 26 Jun 2024 10:47:46 -0400 Subject: [PATCH 10/16] renable DictEncoding for bench --- bench-vortex/benches/datafusion_benchmark.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bench-vortex/benches/datafusion_benchmark.rs b/bench-vortex/benches/datafusion_benchmark.rs index f8dbc9db00..4ea5740e4e 100644 --- a/bench-vortex/benches/datafusion_benchmark.rs +++ b/bench-vortex/benches/datafusion_benchmark.rs @@ -16,12 +16,13 @@ use vortex::compress::Compressor; use vortex::encoding::EncodingRef; use vortex::{Array, Context, IntoArray, ToArrayData}; use vortex_datafusion::{VortexMemTable, VortexMemTableOptions}; +use vortex_dict::DictEncoding; use vortex_fastlanes::{BitPackedEncoding, DeltaEncoding, FoREncoding}; lazy_static! { pub static ref CTX: Context = Context::default().with_encodings([ &BitPackedEncoding as EncodingRef, - // &DictEncoding, + &DictEncoding, &FoREncoding, &DeltaEncoding, ]); From c3abd3db5823461220ecf9f5e78c3faa2cdd598d Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Wed, 26 Jun 2024 11:02:26 -0400 Subject: [PATCH 11/16] cleanup bench, add comment, fix unit test --- bench-vortex/benches/datafusion_benchmark.rs | 62 +++++++++++++------- vortex-array/src/array/struct_/mod.rs | 6 +- vortex-datafusion/src/plans.rs | 2 +- 3 files changed, 47 insertions(+), 23 deletions(-) diff --git a/bench-vortex/benches/datafusion_benchmark.rs b/bench-vortex/benches/datafusion_benchmark.rs index 4ea5740e4e..a19e1809e1 100644 --- a/bench-vortex/benches/datafusion_benchmark.rs +++ b/bench-vortex/benches/datafusion_benchmark.rs @@ -63,9 +63,13 @@ fn toy_dataset_arrow() -> RecordBatch { .unwrap() } -fn toy_dataset_vortex() -> Array { +fn toy_dataset_vortex(compress: bool) -> Array { let uncompressed = toy_dataset_arrow().to_array_data().into_array(); + if !compress { + return uncompressed; + } + println!( "uncompressed size: {:?}", human_readable_size(uncompressed.nbytes()) @@ -134,41 +138,57 @@ fn bench_arrow(mut group: BenchmarkGroup, session: &SessionCo measure_provider(&mut group, session, arrow_table); } -fn bench_vortex_pushdown_enabled( - mut group: BenchmarkGroup, - session: &SessionContext, -) { - let vortex_dataset = toy_dataset_vortex(); - let vortex_table_pushdown = Arc::new( - VortexMemTable::try_new(vortex_dataset, VortexMemTableOptions::default()).unwrap(), - ); - - measure_provider(&mut group, session, vortex_table_pushdown); -} - -fn bench_vortex_pushdown_disabled( +fn bench_vortex( mut group: BenchmarkGroup, session: &SessionContext, + disable_pushdown: bool, + compress: bool, ) { - let vortex_dataset = toy_dataset_vortex(); - let vortex_table_no_pushdown = Arc::new( + let vortex_dataset = toy_dataset_vortex(compress); + let vortex_table = Arc::new( VortexMemTable::try_new( vortex_dataset, - VortexMemTableOptions::default().with_disable_pushdown(true), + VortexMemTableOptions::default().with_disable_pushdown(disable_pushdown), ) .unwrap(), ); - measure_provider(&mut group, session, vortex_table_no_pushdown); + measure_provider(&mut group, session, vortex_table); } fn bench_datafusion(c: &mut Criterion) { bench_arrow(c.benchmark_group("arrow"), &SessionContext::new()); - bench_vortex_pushdown_enabled(c.benchmark_group("vortex-pushdown"), &SessionContext::new()); - bench_vortex_pushdown_disabled( - c.benchmark_group("vortex-no_pushdown"), + // compress=true, pushdown enabled + bench_vortex( + c.benchmark_group("vortex-pushdown-compressed"), + &SessionContext::new(), + false, + true, + ); + + // compress=false, pushdown enabled + bench_vortex( + c.benchmark_group("vortex-pushdown-uncompressed"), + &SessionContext::new(), + false, + false, + ); + + // compress=true, pushdown disabled + bench_vortex( + c.benchmark_group("vortex-nopushdown-uncompressed"), + &SessionContext::new(), + true, + true, + ); + + // compress=false, pushdown disabled + bench_vortex( + c.benchmark_group("vortex-nopushdown-uncompressed"), &SessionContext::new(), + true, + false, ); } diff --git a/vortex-array/src/array/struct_/mod.rs b/vortex-array/src/array/struct_/mod.rs index cac5fecb49..0a2be3af00 100644 --- a/vortex-array/src/array/struct_/mod.rs +++ b/vortex-array/src/array/struct_/mod.rs @@ -120,6 +120,8 @@ impl StructArray { } impl StructArray { + // TODO(aduffy): Add equivalent function to support field masks for nested column access. + /// Return a new StructArray with the given projection applied. /// /// Projection does not copy data arrays. Projection is defined by an ordinal array slice @@ -127,7 +129,9 @@ impl StructArray { /// perform column re-ordering, deletion, or duplication at a logical level, without any data /// copying. /// - /// This function will return an error if the projection includes invalid column IDs. + /// # Panics + /// This function will panic an error if the projection references columns not within the + /// schema boundaries. pub fn project(&self, projection: &[usize]) -> VortexResult { let mut children = Vec::with_capacity(projection.len()); let mut names = Vec::with_capacity(projection.len()); diff --git a/vortex-datafusion/src/plans.rs b/vortex-datafusion/src/plans.rs index cc4d839080..6c774c35cd 100644 --- a/vortex-datafusion/src/plans.rs +++ b/vortex-datafusion/src/plans.rs @@ -439,7 +439,7 @@ mod test { let filtering_stream = RowIndicesStream { inner, polled_inner: false, - conjunction_expr: and((col("a") % lit(2)).eq(lit(0)), col("b").is_true()), + conjunction_expr: and((col("a") % lit(2u64)).eq(lit(0u64)), col("b").is_true()), schema_ref: _schema, context: Arc::new(Default::default()), }; From fcc964d05ce3076632eb54fb0b41b0ecbe5b6715 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Wed, 26 Jun 2024 11:08:56 -0400 Subject: [PATCH 12/16] fix test name --- bench-vortex/benches/datafusion_benchmark.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bench-vortex/benches/datafusion_benchmark.rs b/bench-vortex/benches/datafusion_benchmark.rs index a19e1809e1..3e683d8544 100644 --- a/bench-vortex/benches/datafusion_benchmark.rs +++ b/bench-vortex/benches/datafusion_benchmark.rs @@ -177,7 +177,7 @@ fn bench_datafusion(c: &mut Criterion) { // compress=true, pushdown disabled bench_vortex( - c.benchmark_group("vortex-nopushdown-uncompressed"), + c.benchmark_group("vortex-nopushdown-compressed"), &SessionContext::new(), true, true, From f2f6c26b756b96098e4d200a2d423b90572b815a Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Wed, 26 Jun 2024 11:55:28 -0400 Subject: [PATCH 13/16] RowIndicesExec receives a vortex array, not a recordbatch --- vortex-datafusion/src/plans.rs | 92 ++++++++++++++++++---------------- 1 file changed, 49 insertions(+), 43 deletions(-) diff --git a/vortex-datafusion/src/plans.rs b/vortex-datafusion/src/plans.rs index 6c774c35cd..2e0daf3159 100644 --- a/vortex-datafusion/src/plans.rs +++ b/vortex-datafusion/src/plans.rs @@ -11,7 +11,6 @@ use arrow_array::cast::AsArray; use arrow_array::types::UInt64Type; use arrow_array::{ArrayRef, RecordBatch, RecordBatchOptions, UInt64Array}; use arrow_schema::{DataType, Field, Schema, SchemaRef}; -use datafusion::arrow::compute::cast; use datafusion_common::{DFSchema, Result as DFResult}; use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; use datafusion_expr::Expr; @@ -25,7 +24,7 @@ use pin_project::pin_project; use vortex::array::struct_::StructArray; use vortex::arrow::FromArrowArray; use vortex::compute::take::take; -use vortex::{ArrayDType, ArrayData, IntoArray, IntoCanonical}; +use vortex::{Array, ArrayDType, ArrayData, IntoArray, IntoCanonical}; use crate::datatype::infer_schema; use crate::expr::{make_conjunction, simplify_expr}; @@ -120,10 +119,7 @@ impl ExecutionPlan for RowSelectorExec { let stream_schema = Arc::new(infer_schema(self.filter_struct.dtype())); let filter_struct = self.filter_struct.clone(); - let inner = Box::pin(async move { - let arrow_array = filter_struct.into_canonical().unwrap().into_arrow(); - Ok(RecordBatch::from(arrow_array.as_struct())) - }); + let one_shot = Box::pin(async move { filter_struct.into_array() }); let conjunction_expr = simplify_expr( &make_conjunction(&self.filter_exprs)?, @@ -131,7 +127,7 @@ impl ExecutionPlan for RowSelectorExec { )?; Ok(Box::pin(RowIndicesStream { - inner, + one_shot, polled_inner: false, conjunction_expr, schema_ref: stream_schema, @@ -144,8 +140,9 @@ impl ExecutionPlan for RowSelectorExec { #[pin_project::pin_project] pub(crate) struct RowIndicesStream { /// The inner future that returns `DFResult`. + /// This future should only poll one time. #[pin] - inner: F, + one_shot: F, polled_inner: bool, @@ -156,14 +153,14 @@ pub(crate) struct RowIndicesStream { impl Stream for RowIndicesStream where - F: Future>, + F: Future, { type Item = DFResult; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.project(); - // If we have already polled the one-shot future with the filter records, indicate + // If we have already polled the one-shot future of filter records, indicate // that the stream has finished. if *this.polled_inner { return Poll::Ready(None); @@ -172,13 +169,20 @@ where // Get the unfiltered record batch. // Since this is a one-shot, we only want to poll the inner future once, to create the // initial batch for us to process. - // - // We want to avoid ever calling it again. - let record_batch = ready!(this.inner.poll(cx))?; + let vortex_struct = ready!(this.one_shot.poll(cx)); *this.polled_inner = true; - // Using a local SessionContext, generate a physical plan to execute the conjunction query - // against the filter columns. + // Immediately convert to Arrow RecordBatch for processing. + // TODO(aduffy): attempt to pushdown the filter to Vortex without decoding. + let record_batch = RecordBatch::from( + vortex_struct + .into_canonical() + .unwrap() + .into_arrow() + .as_struct(), + ); + + // Generate a physical plan to execute the conjunction query against the filter columns. // // The result of a conjunction expression is a BooleanArray containing `true` for rows // where the conjunction was satisfied, and `false` otherwise. @@ -207,7 +211,7 @@ where impl RecordBatchStream for RowIndicesStream where - F: Future>, + F: Future, { fn schema(&self) -> SchemaRef { self.schema_ref.clone() @@ -238,14 +242,13 @@ impl TakeRowsExec { row_indices: Arc, table: &StructArray, ) -> Self { + let output_schema = Arc::new(schema_ref.project(projection).unwrap()); let plan_properties = PlanProperties::new( - EquivalenceProperties::new(schema_ref.clone()), + EquivalenceProperties::new(output_schema.clone()), Partitioning::RoundRobinBatch(1), ExecutionMode::Bounded, ); - let output_schema = Arc::new(schema_ref.project(projection).unwrap()); - Self { plan_properties, projection: projection.to_owned(), @@ -373,24 +376,18 @@ where .unwrap()))); } - // Assemble the output columns using the row indices. - // NOTE(aduffy): this re-decodes the fields from the filter schema, which is unnecessary. - let mut columns = Vec::new(); - for (output_idx, src_idx) in this.output_projection.iter().enumerate() { - let encoded = this.vortex_array.field(*src_idx).expect("field access"); - let decoded = take(&encoded, &row_indices) - .expect("take") - .into_canonical() - .expect("into_canonical") - .into_arrow(); - let data_type = this.output_schema.field(output_idx).data_type(); - - columns.push(cast(&decoded, data_type).expect("cast")); - } + // TODO(aduffy): this re-decodes the fields from the filter schema, which is wasteful. + // We should find a way to avoid decoding the filter columns and only decode the other + // columns, then stitch the StructArray back together from those. + let projected_for_output = this.vortex_array.project(this.output_projection).unwrap(); + let decoded = take(&projected_for_output.into_array(), &row_indices) + .expect("take") + .into_canonical() + .expect("into_canonical") + .into_arrow(); // Send back a single record batch of the decoded data. - let output_batch = RecordBatch::try_new(this.output_schema.clone(), columns) - .expect("RecordBatch::try_new"); + let output_batch = RecordBatch::from(decoded.as_struct()); Poll::Ready(Some(Ok(output_batch))) } @@ -409,10 +406,16 @@ where mod test { use std::sync::Arc; - use arrow_array::{BooleanArray, RecordBatch, UInt64Array}; + use arrow_array::{RecordBatch, UInt64Array}; use arrow_schema::{DataType, Field, Schema}; use datafusion_expr::{and, col, lit}; use itertools::Itertools; + use vortex::array::bool::BoolArray; + use vortex::array::primitive::PrimitiveArray; + use vortex::array::struct_::StructArray; + use vortex::validity::Validity; + use vortex::IntoArray; + use vortex_dtype::FieldName; use crate::plans::{RowIndicesStream, ROW_SELECTOR_SCHEMA_REF}; @@ -424,20 +427,23 @@ mod test { ])); let _schema = schema.clone(); - let inner = Box::pin(async move { - Ok(RecordBatch::try_new( - _schema, + let one_shot = Box::pin(async move { + StructArray::try_new( + Arc::new([FieldName::from("a"), FieldName::from("b")]), vec![ - Arc::new(UInt64Array::from(vec![0u64, 1, 2])), - Arc::new(BooleanArray::from(vec![false, false, true])), + PrimitiveArray::from(vec![0u64, 1, 2]).into_array(), + BoolArray::from(vec![false, false, true]).into_array(), ], + 3, + Validity::NonNullable, ) - .unwrap()) + .unwrap() + .into_array() }); let _schema = schema.clone(); let filtering_stream = RowIndicesStream { - inner, + one_shot, polled_inner: false, conjunction_expr: and((col("a") % lit(2u64)).eq(lit(0u64)), col("b").is_true()), schema_ref: _schema, From 8cd7f8f061192b822551a4e3879437d39bba21b3 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Wed, 26 Jun 2024 13:14:25 -0400 Subject: [PATCH 14/16] fix bug in varbin_to_arrow --- bench-vortex/benches/datafusion_benchmark.rs | 5 ++-- vortex-array/src/array/bool/accessors.rs | 2 -- vortex-array/src/array/varbin/compute/take.rs | 2 +- vortex-array/src/canonical.rs | 3 +- vortex-array/src/stats/statsset.rs | 30 ------------------- 5 files changed, 4 insertions(+), 38 deletions(-) diff --git a/bench-vortex/benches/datafusion_benchmark.rs b/bench-vortex/benches/datafusion_benchmark.rs index 3e683d8544..73c3ced0e0 100644 --- a/bench-vortex/benches/datafusion_benchmark.rs +++ b/bench-vortex/benches/datafusion_benchmark.rs @@ -8,9 +8,8 @@ use criterion::{black_box, criterion_group, criterion_main, BenchmarkGroup, Crit use datafusion::common::Result as DFResult; use datafusion::datasource::{MemTable, TableProvider}; use datafusion::execution::memory_pool::human_readable_size; -use datafusion::functions_aggregate::expr_fn::sum; use datafusion::logical_expr::lit; -use datafusion::prelude::{col, DataFrame, SessionContext}; +use datafusion::prelude::{col, count_distinct, DataFrame, SessionContext}; use lazy_static::lazy_static; use vortex::compress::Compressor; use vortex::encoding::EncodingRef; @@ -87,7 +86,7 @@ fn filter_agg_query(df: DataFrame) -> DFResult { // SELECT SUM(scores) FROM table WHERE scores >= 3000 AND scores <= 4000 df.filter(col("scores").gt_eq(lit(3_000)))? .filter(col("scores").lt_eq(lit(4_000)))? - .aggregate(vec![], vec![sum(col("scores"))]) + .aggregate(vec![], vec![count_distinct(col("names"))]) } fn measure_provider( diff --git a/vortex-array/src/array/bool/accessors.rs b/vortex-array/src/array/bool/accessors.rs index e35b25f8bf..6c464f1cff 100644 --- a/vortex-array/src/array/bool/accessors.rs +++ b/vortex-array/src/array/bool/accessors.rs @@ -1,4 +1,3 @@ -use itertools::Itertools; use vortex_error::VortexResult; use crate::accessor::ArrayAccessor; @@ -22,7 +21,6 @@ impl ArrayAccessor for BoolArray { Validity::AllInvalid => Ok(f(&mut (0..self.len()).map(|_| None))), Validity::Array(valid) => { let valids = valid.into_bool()?.boolean_buffer(); - println!("nulls: {:?}", valids.iter().collect_vec()); let mut iter = valids.iter().zip(bools.iter()).map(|(is_valid, value)| { if is_valid { Some(if value { &TRUE } else { &FALSE }) diff --git a/vortex-array/src/array/varbin/compute/take.rs b/vortex-array/src/array/varbin/compute/take.rs index 16bdf8fb21..2ce2b576f1 100644 --- a/vortex-array/src/array/varbin/compute/take.rs +++ b/vortex-array/src/array/varbin/compute/take.rs @@ -49,7 +49,7 @@ fn take( return Ok(take_nullable(dtype, offsets, data, indices, v)); } - let mut builder = VarBinBuilder::::with_capacity(indices.len()); + let mut builder = VarBinBuilder::::with_capacity(indices.len()); for &idx in indices { let idx = idx.to_usize().unwrap(); let start = offsets[idx].to_usize().unwrap(); diff --git a/vortex-array/src/canonical.rs b/vortex-array/src/canonical.rs index 6fbe21b251..2d737243fb 100644 --- a/vortex-array/src/canonical.rs +++ b/vortex-array/src/canonical.rs @@ -209,8 +209,7 @@ fn struct_to_arrow(struct_array: StructArray) -> ArrayRef { fn varbin_to_arrow(varbin_array: VarBinArray) -> ArrayRef { let offsets = varbin_array .offsets() - .into_canonical() - .and_then(Canonical::into_primitive) + .into_primitive() .expect("flatten_primitive"); let offsets = match offsets.ptype() { PType::I32 | PType::I64 => offsets, diff --git a/vortex-array/src/stats/statsset.rs b/vortex-array/src/stats/statsset.rs index a2f9e85748..cc5dec36d5 100644 --- a/vortex-array/src/stats/statsset.rs +++ b/vortex-array/src/stats/statsset.rs @@ -27,36 +27,6 @@ impl StatsSet { } } - // pub fn constant(len: usize, scalar: &Scalar) -> Self { - // let mut stats = HashMap::from([ - // (Stat::Max, scalar.clone()), - // (Stat::Min, scalar.clone()), - // (Stat::IsConstant, true.into()), - // (Stat::IsSorted, true.into()), - // (Stat::RunCount, 1.into()), - // ]); - // - // match scalar.dtype() { - // DType::Bool(_) => { - // stats.insert(Stat::TrueCount, 0.into()); - // } - // DType::Primitive(ptype, _) => { - // ptype.byte_width(); - // stats.insert( - // Stat::BitWidthFreq, - // vec![0; ptype.byte_width() * 8 + 1].into(), - // ); - // stats.insert( - // Stat::TrailingZeroFreq, - // vec![ptype.byte_width() * 8; ptype.byte_width() * 8 + 1].into(), - // ); - // } - // _ => {} - // } - // - // Self::from(stats) - // } - /// Specialized constructor for the case where the StatsSet represents /// an array consisting entirely of [null](vortex_dtype::DType::Null) values. pub fn nulls(len: usize, dtype: &DType) -> Self { From df06c5fbbbb730a079c3bbecafe0265b6b08d8dc Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Wed, 26 Jun 2024 13:27:38 -0400 Subject: [PATCH 15/16] fix pytests --- pyvortex/test/test_array.py | 3 +-- requirements-dev.lock | 1 + requirements.lock | 1 + 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/pyvortex/test/test_array.py b/pyvortex/test/test_array.py index 119beb914e..0825f184e3 100644 --- a/pyvortex/test/test_array.py +++ b/pyvortex/test/test_array.py @@ -18,10 +18,9 @@ def test_varbin_array_round_trip(): def test_varbin_array_take(): a = vortex.encode(pa.array(["a", "b", "c", "d"])) - # TODO(ngates): ensure we correctly round-trip to a string and not large_string assert a.take(vortex.encode(pa.array([0, 2]))).to_pyarrow().combine_chunks() == pa.array( ["a", "c"], - type=pa.large_utf8(), + type=pa.utf8(), ) diff --git a/requirements-dev.lock b/requirements-dev.lock index 69b5191d9f..c5b17d839f 100644 --- a/requirements-dev.lock +++ b/requirements-dev.lock @@ -6,6 +6,7 @@ # features: [] # all-features: false # with-sources: false +# generate-hashes: false -e file:. -e file:pyvortex diff --git a/requirements.lock b/requirements.lock index bf905daba0..cb652616f8 100644 --- a/requirements.lock +++ b/requirements.lock @@ -6,6 +6,7 @@ # features: [] # all-features: false # with-sources: false +# generate-hashes: false -e file:. -e file:pyvortex From aa8e787b93e9625dbc1a727dfaa1cb4ce273c1f8 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Wed, 26 Jun 2024 14:01:12 -0400 Subject: [PATCH 16/16] remove unnecessary clones and things --- vortex-datafusion/src/lib.rs | 35 +++++++++-------------------------- 1 file changed, 9 insertions(+), 26 deletions(-) diff --git a/vortex-datafusion/src/lib.rs b/vortex-datafusion/src/lib.rs index fce0fc1e99..2529ecc4cf 100644 --- a/vortex-datafusion/src/lib.rs +++ b/vortex-datafusion/src/lib.rs @@ -145,10 +145,10 @@ impl TableProvider for VortexMemTable { projection } - let filter_exprs: Option> = if filters.is_empty() { + let filter_exprs = if filters.is_empty() { None } else { - Some(filters.to_vec()) + Some(filters) }; let partitioning = if let Ok(chunked_array) = ChunkedArray::try_from(&self.array) { @@ -168,7 +168,7 @@ impl TableProvider for VortexMemTable { // using the calculated indices from the filter. Some(filter_exprs) => { let filter_projection = - get_filter_projection(filter_exprs.as_slice(), self.schema_ref.clone()); + get_filter_projection(filter_exprs, self.schema_ref.clone()); Ok(make_filter_then_take_plan( self.schema_ref.clone(), @@ -216,21 +216,10 @@ impl TableProvider for VortexMemTable { .collect()); } - // Verify for each filter whether its expression tree consists solely of operations we know - // how to pushdown, and that they only reference columns in our source. - // TODO(aduffy): figure out if we actually need to do this, or what guarantees about the - // filters that DataFusion provides. - let schema_columns: HashSet = self - .schema_ref - .fields - .iter() - .map(|field| field.name().clone()) - .collect(); - filters .iter() .map(|expr| { - if can_be_pushed_down(expr, &schema_columns)? { + if can_be_pushed_down(expr)? { Ok(TableProviderFilterPushDown::Exact) } else { Ok(TableProviderFilterPushDown::Unsupported) @@ -249,7 +238,7 @@ impl TableProvider for VortexMemTable { /// columns. fn make_filter_then_take_plan( schema: SchemaRef, - filter_exprs: Vec, + filter_exprs: &[Expr], filter_projection: Vec, array: Array, output_projection: Vec, @@ -261,7 +250,7 @@ fn make_filter_then_take_plan( .project(filter_projection.as_slice()) .expect("projecting filter struct"); - let row_selector_op = Arc::new(RowSelectorExec::new(&filter_exprs, &filter_struct)); + let row_selector_op = Arc::new(RowSelectorExec::new(filter_exprs, &filter_struct)); Arc::new(TakeRowsExec::new( schema.clone(), @@ -272,14 +261,8 @@ fn make_filter_then_take_plan( } /// Check if the given expression tree can be pushed down into the scan. -fn can_be_pushed_down(expr: &Expr, schema_columns: &HashSet) -> DFResult { +fn can_be_pushed_down(expr: &Expr) -> DFResult { // If the filter references a column not known to our schema, we reject the filter for pushdown. - // TODO(aduffy): is this necessary? Under what conditions would this happen? - let column_refs = get_column_references(expr); - if !column_refs.is_subset(schema_columns) { - return Ok(false); - } - fn is_supported(expr: &Expr) -> bool { match expr { Expr::BinaryExpr(binary_expr) => { @@ -383,7 +366,7 @@ impl DisplayAs for VortexScanExec { /// This function will return an Error if `array` is not struct-typed. It will also return an /// error if the projection references columns fn execute_unfiltered( - array: &Array, + array: Array, projection: &Vec, ) -> DFResult { // Construct the RecordBatch by flattening each struct field and transmuting to an ArrayRef. @@ -483,7 +466,7 @@ impl ExecutionPlan for VortexScanExec { self.array.clone() }; - execute_unfiltered(&chunk, &self.scan_projection) + execute_unfiltered(chunk, &self.scan_projection) } }