From f762dcfc22ddd2c6337ace8e7eb574ae5667dc31 Mon Sep 17 00:00:00 2001 From: Samuel Colvin Date: Sun, 7 Jul 2024 05:52:02 +0100 Subject: [PATCH 1/6] Demonstrate unions can't be null --- datafusion/core/tests/dataframe/mod.rs | 123 ++++++++++++++++++++++++- 1 file changed, 121 insertions(+), 2 deletions(-) diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index e46a92e92818..53fb9d190b7e 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -29,8 +29,9 @@ use arrow::{ }, record_batch::RecordBatch, }; -use arrow_array::Float32Array; -use arrow_schema::ArrowError; +use arrow_array::{Array, Float32Array, Float64Array, UnionArray}; +use arrow_buffer::ScalarBuffer; +use arrow_schema::{ArrowError, UnionFields, UnionMode}; use datafusion_functions_aggregate::count::count_udaf; use object_store::local::LocalFileSystem; use std::fs; @@ -2195,3 +2196,121 @@ async fn write_parquet_results() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn sparse_union_is_null() { + // union of [{A=1}, null, {B=3.2}, {A=34}] + let int_array = Int32Array::from(vec![Some(1), None, None, Some(34)]); + let float_array = Float64Array::from(vec![None, None, Some(3.2), None]); + let type_ids = [0, 0, 1, 0].into_iter().collect::>(); + + let union_fields = [ + (0, Arc::new(Field::new("A", DataType::Int32, false))), + (1, Arc::new(Field::new("B", DataType::Float64, false))), + ] + .into_iter() + .collect::(); + + let children = vec![Arc::new(int_array) as Arc, Arc::new(float_array)]; + + let array = + UnionArray::try_new(union_fields.clone(), type_ids, None, children).unwrap(); + + let schema = Arc::new(Schema::new(vec![Field::new( + "my_union", + DataType::Union(union_fields, UnionMode::Sparse), + false, + )])); + + let batch = RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap(); + + let ctx = SessionContext::new(); + + ctx.register_batch("union_batch", batch).unwrap(); + + let df = ctx.table("union_batch").await.unwrap(); + + // view_all + let expected = [ + "+----------+", + "| my_union |", + "+----------+", + "| {A=1} |", + "| {A=34} |", + "| {A=} |", + "| {B=3.2} |", + "+----------+", + ]; + assert_batches_sorted_eq!(expected, &df.clone().collect().await.unwrap()); + + // filter where is null + let result_df = df.filter(col("my_union").is_null()).unwrap(); + let expected = [ + "+----------+", + "| my_union |", + "+----------+", + "| {A=} |", + "+----------+", + ]; + assert_batches_sorted_eq!(expected, &result_df.collect().await.unwrap()); +} + +#[tokio::test] +async fn dense_union_is_null() { + // union of [{A=1}, null, {B=3.2}, {A=34}] + let int_array = Int32Array::from(vec![Some(1), None, Some(34)]); + let float_array = Float64Array::from(vec![3.2]); + let type_ids = [0, 0, 1, 0].into_iter().collect::>(); + let offsets = [0, 1, 0, 2].into_iter().collect::>(); + + let union_fields = [ + (0, Arc::new(Field::new("A", DataType::Int32, false))), + (1, Arc::new(Field::new("B", DataType::Float64, false))), + ] + .into_iter() + .collect::(); + + let children = vec![Arc::new(int_array) as Arc, Arc::new(float_array)]; + + let array = + UnionArray::try_new(union_fields.clone(), type_ids, Some(offsets), children) + .unwrap(); + + let schema = Arc::new(Schema::new(vec![Field::new( + "my_union", + DataType::Union(union_fields, UnionMode::Dense), + false, + )])); + + let batch = RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap(); + + let ctx = SessionContext::new(); + + ctx.register_batch("union_batch", batch).unwrap(); + + let df = ctx.table("union_batch").await.unwrap(); + + // view_all + let expected = [ + "+----------+", + "| my_union |", + "+----------+", + "| {A=1} |", + "| {A=34} |", + "| {A=} |", + "| {B=3.2} |", + "+----------+", + ]; + assert_batches_sorted_eq!(expected, &df.clone().collect().await.unwrap()); + + // filter where is null + let result_df = df.filter(col("my_union").is_null()).unwrap(); + let expected = [ + "+----------+", + "| my_union |", + "+----------+", + "| {A=} |", + "+----------+", + ]; + assert_batches_sorted_eq!(expected, &result_df.collect().await.unwrap()); +} From 6e2ed65f98b1b512a93f17ef8b57d2a002393525 Mon Sep 17 00:00:00 2001 From: Samuel Colvin Date: Sun, 7 Jul 2024 06:04:28 +0100 Subject: [PATCH 2/6] add scalar test cases --- datafusion/core/tests/dataframe/mod.rs | 54 ++++++++++++++++---------- 1 file changed, 34 insertions(+), 20 deletions(-) diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 53fb9d190b7e..66896a9cd771 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -2197,6 +2197,15 @@ async fn write_parquet_results() -> Result<()> { Ok(()) } +fn union_fields() -> UnionFields { + [ + (0, Arc::new(Field::new("A", DataType::Int32, false))), + (1, Arc::new(Field::new("B", DataType::Float64, false))), + ] + .into_iter() + .collect() +} + #[tokio::test] async fn sparse_union_is_null() { // union of [{A=1}, null, {B=3.2}, {A=34}] @@ -2204,21 +2213,13 @@ async fn sparse_union_is_null() { let float_array = Float64Array::from(vec![None, None, Some(3.2), None]); let type_ids = [0, 0, 1, 0].into_iter().collect::>(); - let union_fields = [ - (0, Arc::new(Field::new("A", DataType::Int32, false))), - (1, Arc::new(Field::new("B", DataType::Float64, false))), - ] - .into_iter() - .collect::(); - let children = vec![Arc::new(int_array) as Arc, Arc::new(float_array)]; - let array = - UnionArray::try_new(union_fields.clone(), type_ids, None, children).unwrap(); + let array = UnionArray::try_new(union_fields(), type_ids, None, children).unwrap(); let schema = Arc::new(Schema::new(vec![Field::new( "my_union", - DataType::Union(union_fields, UnionMode::Sparse), + DataType::Union(union_fields(), UnionMode::Sparse), false, )])); @@ -2263,22 +2264,14 @@ async fn dense_union_is_null() { let type_ids = [0, 0, 1, 0].into_iter().collect::>(); let offsets = [0, 1, 0, 2].into_iter().collect::>(); - let union_fields = [ - (0, Arc::new(Field::new("A", DataType::Int32, false))), - (1, Arc::new(Field::new("B", DataType::Float64, false))), - ] - .into_iter() - .collect::(); - let children = vec![Arc::new(int_array) as Arc, Arc::new(float_array)]; let array = - UnionArray::try_new(union_fields.clone(), type_ids, Some(offsets), children) - .unwrap(); + UnionArray::try_new(union_fields(), type_ids, Some(offsets), children).unwrap(); let schema = Arc::new(Schema::new(vec![Field::new( "my_union", - DataType::Union(union_fields, UnionMode::Dense), + DataType::Union(union_fields(), UnionMode::Dense), false, )])); @@ -2314,3 +2307,24 @@ async fn dense_union_is_null() { ]; assert_batches_sorted_eq!(expected, &result_df.collect().await.unwrap()); } + +/// these should definitely be moved somewhere else, but I'm just adding it here for simplicity now +#[tokio::test] +async fn sparse_union_is_null_scalar() { + let sparse_scalar = ScalarValue::Union( + Some((0_i8, Box::new(ScalarValue::Int32(None)))), + union_fields(), + UnionMode::Sparse, + ); + assert!(sparse_scalar.is_null()); +} + +#[tokio::test] +async fn dense_union_is_null_scalar() { + let dense_scalar = ScalarValue::Union( + Some((0_i8, Box::new(ScalarValue::Int32(None)))), + union_fields(), + UnionMode::Dense, + ); + assert!(dense_scalar.is_null()); +} From 53cc9a0f06c2a5b10317b924fd2af8504c11d3ff Mon Sep 17 00:00:00 2001 From: Samuel Colvin Date: Sun, 7 Jul 2024 19:08:01 +0100 Subject: [PATCH 3/6] support "IS NULL" and "IS NOT NULL" on unions --- datafusion/common/src/scalar/mod.rs | 34 ++++- datafusion/core/tests/dataframe/mod.rs | 116 ++++++++++------ .../src/expressions/is_not_null.rs | 23 +++- .../physical-expr/src/expressions/is_null.rs | 125 +++++++++++++++++- 4 files changed, 247 insertions(+), 51 deletions(-) diff --git a/datafusion/common/src/scalar/mod.rs b/datafusion/common/src/scalar/mod.rs index 55ce76c4b939..26e03a3b9893 100644 --- a/datafusion/common/src/scalar/mod.rs +++ b/datafusion/common/src/scalar/mod.rs @@ -1459,7 +1459,10 @@ impl ScalarValue { ScalarValue::DurationMillisecond(v) => v.is_none(), ScalarValue::DurationMicrosecond(v) => v.is_none(), ScalarValue::DurationNanosecond(v) => v.is_none(), - ScalarValue::Union(v, _, _) => v.is_none(), + ScalarValue::Union(v, _, _) => match v { + Some((_, s)) => s.is_null(), + None => true, + }, ScalarValue::Dictionary(_, v) => v.is_null(), } } @@ -6514,4 +6517,33 @@ mod tests { } intervals } + + fn union_fields() -> UnionFields { + [ + (0, Arc::new(Field::new("A", DataType::Int32, true))), + (1, Arc::new(Field::new("B", DataType::Float64, true))), + ] + .into_iter() + .collect() + } + + #[test] + fn sparse_scalar_union_is_null() { + let sparse_scalar = ScalarValue::Union( + Some((0_i8, Box::new(ScalarValue::Int32(None)))), + union_fields(), + UnionMode::Sparse, + ); + assert!(sparse_scalar.is_null()); + } + + #[test] + fn dense_scalar_union_is_null() { + let dense_scalar = ScalarValue::Union( + Some((0_i8, Box::new(ScalarValue::Int32(None)))), + union_fields(), + UnionMode::Dense, + ); + assert!(dense_scalar.is_null()); + } } diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 66896a9cd771..2d1904d9e166 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -2199,8 +2199,9 @@ async fn write_parquet_results() -> Result<()> { fn union_fields() -> UnionFields { [ - (0, Arc::new(Field::new("A", DataType::Int32, false))), - (1, Arc::new(Field::new("B", DataType::Float64, false))), + (0, Arc::new(Field::new("A", DataType::Int32, true))), + (1, Arc::new(Field::new("B", DataType::Float64, true))), + (2, Arc::new(Field::new("C", DataType::Utf8, true))), ] .into_iter() .collect() @@ -2208,20 +2209,26 @@ fn union_fields() -> UnionFields { #[tokio::test] async fn sparse_union_is_null() { - // union of [{A=1}, null, {B=3.2}, {A=34}] - let int_array = Int32Array::from(vec![Some(1), None, None, Some(34)]); - let float_array = Float64Array::from(vec![None, None, Some(3.2), None]); - let type_ids = [0, 0, 1, 0].into_iter().collect::>(); - - let children = vec![Arc::new(int_array) as Arc, Arc::new(float_array)]; + // union of [{A=1}, {A=}, {B=3.2}, {B=}, {C="a"}, {C=}] + let int_array = Int32Array::from(vec![Some(1), None, None, None, None, None]); + let float_array = Float64Array::from(vec![None, None, Some(3.2), None, None, None]); + let str_array = StringArray::from(vec![None, None, None, None, Some("a"), None]); + let type_ids = [0, 0, 1, 1, 2, 2].into_iter().collect::>(); + + let children = vec![ + Arc::new(int_array) as Arc, + Arc::new(float_array), + Arc::new(str_array), + ]; let array = UnionArray::try_new(union_fields(), type_ids, None, children).unwrap(); - let schema = Arc::new(Schema::new(vec![Field::new( + let field = Field::new( "my_union", DataType::Union(union_fields(), UnionMode::Sparse), - false, - )])); + true, + ); + let schema = Arc::new(Schema::new(vec![field])); let batch = RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap(); @@ -2237,20 +2244,37 @@ async fn sparse_union_is_null() { "| my_union |", "+----------+", "| {A=1} |", - "| {A=34} |", "| {A=} |", "| {B=3.2} |", + "| {B=} |", + "| {C=a} |", + "| {C=} |", "+----------+", ]; assert_batches_sorted_eq!(expected, &df.clone().collect().await.unwrap()); // filter where is null - let result_df = df.filter(col("my_union").is_null()).unwrap(); + let result_df = df.clone().filter(col("my_union").is_null()).unwrap(); let expected = [ "+----------+", "| my_union |", "+----------+", "| {A=} |", + "| {B=} |", + "| {C=} |", + "+----------+", + ]; + assert_batches_sorted_eq!(expected, &result_df.collect().await.unwrap()); + + // filter where is not null + let result_df = df.filter(col("my_union").is_not_null()).unwrap(); + let expected = [ + "+----------+", + "| my_union |", + "+----------+", + "| {A=1} |", + "| {B=3.2} |", + "| {C=a} |", "+----------+", ]; assert_batches_sorted_eq!(expected, &result_df.collect().await.unwrap()); @@ -2259,21 +2283,29 @@ async fn sparse_union_is_null() { #[tokio::test] async fn dense_union_is_null() { // union of [{A=1}, null, {B=3.2}, {A=34}] - let int_array = Int32Array::from(vec![Some(1), None, Some(34)]); - let float_array = Float64Array::from(vec![3.2]); - let type_ids = [0, 0, 1, 0].into_iter().collect::>(); - let offsets = [0, 1, 0, 2].into_iter().collect::>(); - - let children = vec![Arc::new(int_array) as Arc, Arc::new(float_array)]; + let int_array = Int32Array::from(vec![Some(1), None]); + let float_array = Float64Array::from(vec![Some(3.2), None]); + let str_array = StringArray::from(vec![Some("a"), None]); + let type_ids = [0, 0, 1, 1, 2, 2].into_iter().collect::>(); + let offsets = [0, 1, 0, 1, 0, 1] + .into_iter() + .collect::>(); + + let children = vec![ + Arc::new(int_array) as Arc, + Arc::new(float_array), + Arc::new(str_array), + ]; let array = UnionArray::try_new(union_fields(), type_ids, Some(offsets), children).unwrap(); - let schema = Arc::new(Schema::new(vec![Field::new( + let field = Field::new( "my_union", DataType::Union(union_fields(), UnionMode::Dense), - false, - )])); + true, + ); + let schema = Arc::new(Schema::new(vec![field])); let batch = RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap(); @@ -2289,42 +2321,38 @@ async fn dense_union_is_null() { "| my_union |", "+----------+", "| {A=1} |", - "| {A=34} |", "| {A=} |", "| {B=3.2} |", + "| {B=} |", + "| {C=a} |", + "| {C=} |", "+----------+", ]; assert_batches_sorted_eq!(expected, &df.clone().collect().await.unwrap()); // filter where is null - let result_df = df.filter(col("my_union").is_null()).unwrap(); + let result_df = df.clone().filter(col("my_union").is_null()).unwrap(); let expected = [ "+----------+", "| my_union |", "+----------+", "| {A=} |", + "| {B=} |", + "| {C=} |", "+----------+", ]; assert_batches_sorted_eq!(expected, &result_df.collect().await.unwrap()); -} - -/// these should definitely be moved somewhere else, but I'm just adding it here for simplicity now -#[tokio::test] -async fn sparse_union_is_null_scalar() { - let sparse_scalar = ScalarValue::Union( - Some((0_i8, Box::new(ScalarValue::Int32(None)))), - union_fields(), - UnionMode::Sparse, - ); - assert!(sparse_scalar.is_null()); -} -#[tokio::test] -async fn dense_union_is_null_scalar() { - let dense_scalar = ScalarValue::Union( - Some((0_i8, Box::new(ScalarValue::Int32(None)))), - union_fields(), - UnionMode::Dense, - ); - assert!(dense_scalar.is_null()); + // filter where is not null + let result_df = df.filter(col("my_union").is_not_null()).unwrap(); + let expected = [ + "+----------+", + "| my_union |", + "+----------+", + "| {A=1} |", + "| {B=3.2} |", + "| {C=a} |", + "+----------+", + ]; + assert_batches_sorted_eq!(expected, &result_df.collect().await.unwrap()); } diff --git a/datafusion/physical-expr/src/expressions/is_not_null.rs b/datafusion/physical-expr/src/expressions/is_not_null.rs index d8fa77585b5d..aff087d75344 100644 --- a/datafusion/physical-expr/src/expressions/is_not_null.rs +++ b/datafusion/physical-expr/src/expressions/is_not_null.rs @@ -20,6 +20,7 @@ use std::hash::{Hash, Hasher}; use std::{any::Any, sync::Arc}; +use crate::expressions::is_null::union_is_null; use crate::physical_expr::down_cast_any_ref; use crate::PhysicalExpr; use arrow::compute; @@ -27,6 +28,7 @@ use arrow::{ datatypes::{DataType, Schema}, record_batch::RecordBatch, }; +use arrow_array::{BooleanArray, UnionArray}; use datafusion_common::Result; use datafusion_common::ScalarValue; use datafusion_expr::ColumnarValue; @@ -73,9 +75,16 @@ impl PhysicalExpr for IsNotNullExpr { fn evaluate(&self, batch: &RecordBatch) -> Result { let arg = self.arg.evaluate(batch)?; match arg { - ColumnarValue::Array(array) => Ok(ColumnarValue::Array(Arc::new( - compute::is_not_null(array.as_ref())?, - ))), + ColumnarValue::Array(array) => { + let bool_array = if let Some(union_array) = + array.as_any().downcast_ref::() + { + union_is_not_null(union_array)? + } else { + compute::is_not_null(array.as_ref())? + }; + Ok(ColumnarValue::Array(Arc::new(bool_array))) + } ColumnarValue::Scalar(scalar) => Ok(ColumnarValue::Scalar( ScalarValue::Boolean(Some(!scalar.is_null())), )), @@ -112,6 +121,14 @@ pub fn is_not_null(arg: Arc) -> Result> Ok(Arc::new(IsNotNullExpr::new(arg))) } +fn union_is_not_null(union_array: &UnionArray) -> Result { + union_is_null(union_array).map(|is_null| { + compute::not(&is_null) + .expect("Failed to compute is not null") + .into() + }) +} + #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/physical-expr/src/expressions/is_null.rs b/datafusion/physical-expr/src/expressions/is_null.rs index 41becafde6de..c8f270a65137 100644 --- a/datafusion/physical-expr/src/expressions/is_null.rs +++ b/datafusion/physical-expr/src/expressions/is_null.rs @@ -25,6 +25,9 @@ use arrow::{ datatypes::{DataType, Schema}, record_batch::RecordBatch, }; +use arrow_array::{Array, BooleanArray, Int8Array, UnionArray}; +use arrow_buffer::{BooleanBuffer, ScalarBuffer}; +use arrow_ord::cmp; use crate::physical_expr::down_cast_any_ref; use crate::PhysicalExpr; @@ -74,9 +77,16 @@ impl PhysicalExpr for IsNullExpr { fn evaluate(&self, batch: &RecordBatch) -> Result { let arg = self.arg.evaluate(batch)?; match arg { - ColumnarValue::Array(array) => Ok(ColumnarValue::Array(Arc::new( - compute::is_null(array.as_ref())?, - ))), + ColumnarValue::Array(array) => { + let bool_array = if let Some(union_array) = + array.as_any().downcast_ref::() + { + union_is_null(union_array)? + } else { + compute::is_null(array.as_ref())? + }; + Ok(ColumnarValue::Array(Arc::new(bool_array))) + } ColumnarValue::Scalar(scalar) => Ok(ColumnarValue::Scalar( ScalarValue::Boolean(Some(scalar.is_null())), )), @@ -100,6 +110,51 @@ impl PhysicalExpr for IsNullExpr { } } +pub(crate) fn union_is_null(union_array: &UnionArray) -> Result { + if let Some(offsets) = union_array.offsets() { + dense_union_is_null(union_array, offsets) + } else { + sparce_union_is_null(union_array) + } +} + +fn dense_union_is_null( + union_array: &UnionArray, + offsets: &ScalarBuffer, +) -> Result { + let child_arrays = (0..union_array.type_names().len()) + .map(|type_id| { + compute::is_null(&union_array.child(type_id as i8)).map_err(Into::into) + }) + .collect::>>()?; + + let buffer: BooleanBuffer = offsets + .iter() + .zip(union_array.type_ids()) + .map(|(offset, type_id)| { + (&child_arrays[*type_id as usize]).value(*offset as usize) + }) + .collect(); + + Ok(BooleanArray::new(buffer, None)) +} + +fn sparce_union_is_null(union_array: &UnionArray) -> Result { + let type_ids = Int8Array::new(union_array.type_ids().clone(), None); + + let mut union_is_null = + BooleanArray::new(BooleanBuffer::new_unset(union_array.len()), None); + for type_id in 0..union_array.type_names().len() { + let type_id = type_id as i8; + let union_is_child = cmp::eq(&type_ids, &Int8Array::new_scalar(type_id))?; + let child = union_array.child(type_id); + let child_array_is_null = compute::is_null(&child)?; + let child_is_null = compute::and(&union_is_child, &child_array_is_null)?; + union_is_null = compute::or(&union_is_null, &child_is_null)?; + } + Ok(union_is_null) +} + impl PartialEq for IsNullExpr { fn eq(&self, other: &dyn Any) -> bool { down_cast_any_ref(other) @@ -108,6 +163,7 @@ impl PartialEq for IsNullExpr { .unwrap_or(false) } } + /// Create an IS NULL expression pub fn is_null(arg: Arc) -> Result> { Ok(Arc::new(IsNullExpr::new(arg))) @@ -121,6 +177,8 @@ mod tests { array::{BooleanArray, StringArray}, datatypes::*, }; + use arrow_array::{Float64Array, Int32Array}; + use arrow_buffer::ScalarBuffer; use datafusion_common::cast::as_boolean_array; #[test] @@ -145,4 +203,65 @@ mod tests { Ok(()) } + + fn union_fields() -> UnionFields { + [ + (0, Arc::new(Field::new("A", DataType::Int32, true))), + (1, Arc::new(Field::new("B", DataType::Float64, true))), + (2, Arc::new(Field::new("C", DataType::Utf8, true))), + ] + .into_iter() + .collect() + } + + #[test] + fn sparse_union_is_null() { + // union of [{A=1}, {A=}, {B=3.2}, {B=}, {C="a"}, {C=}] + let int_array = Int32Array::from(vec![Some(1), None, None, None, None, None]); + let float_array = + Float64Array::from(vec![None, None, Some(3.2), None, None, None]); + let str_array = StringArray::from(vec![None, None, None, None, Some("a"), None]); + let type_ids = [0, 0, 1, 1, 2, 2].into_iter().collect::>(); + + let children = vec![ + Arc::new(int_array) as Arc, + Arc::new(float_array), + Arc::new(str_array), + ]; + + let array = + UnionArray::try_new(union_fields(), type_ids, None, children).unwrap(); + + let result = union_is_null(&array).unwrap(); + + let expected = &BooleanArray::from(vec![false, true, false, true, false, true]); + assert_eq!(expected, &result); + } + + #[test] + fn dense_union_is_null() { + // union of [{A=1}, {A=}, {B=3.2}, {B=}, {C="a"}, {C=}] + let int_array = Int32Array::from(vec![Some(1), None]); + let float_array = Float64Array::from(vec![Some(3.2), None]); + let str_array = StringArray::from(vec![Some("a"), None]); + let type_ids = [0, 0, 1, 1, 2, 2].into_iter().collect::>(); + let offsets = [0, 1, 0, 1, 0, 1] + .into_iter() + .collect::>(); + + let children = vec![ + Arc::new(int_array) as Arc, + Arc::new(float_array), + Arc::new(str_array), + ]; + + let array = + UnionArray::try_new(union_fields(), type_ids, Some(offsets), children) + .unwrap(); + + let result = union_is_null(&array).unwrap(); + + let expected = &BooleanArray::from(vec![false, true, false, true, false, true]); + assert_eq!(expected, &result); + } } From fa015ef52d3cc53429e98c20715c4fdd376e20d8 Mon Sep 17 00:00:00 2001 From: Samuel Colvin Date: Sun, 7 Jul 2024 19:32:15 +0100 Subject: [PATCH 4/6] formatting --- datafusion/physical-expr/src/expressions/is_not_null.rs | 8 ++------ datafusion/physical-expr/src/expressions/is_null.rs | 4 +--- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/is_not_null.rs b/datafusion/physical-expr/src/expressions/is_not_null.rs index aff087d75344..393c45afcd37 100644 --- a/datafusion/physical-expr/src/expressions/is_not_null.rs +++ b/datafusion/physical-expr/src/expressions/is_not_null.rs @@ -20,7 +20,6 @@ use std::hash::{Hash, Hasher}; use std::{any::Any, sync::Arc}; -use crate::expressions::is_null::union_is_null; use crate::physical_expr::down_cast_any_ref; use crate::PhysicalExpr; use arrow::compute; @@ -122,11 +121,8 @@ pub fn is_not_null(arg: Arc) -> Result> } fn union_is_not_null(union_array: &UnionArray) -> Result { - union_is_null(union_array).map(|is_null| { - compute::not(&is_null) - .expect("Failed to compute is not null") - .into() - }) + super::is_null::union_is_null(union_array) + .map(|is_null| compute::not(&is_null).expect("Failed to compute is not null")) } #[cfg(test)] diff --git a/datafusion/physical-expr/src/expressions/is_null.rs b/datafusion/physical-expr/src/expressions/is_null.rs index c8f270a65137..d081294c5236 100644 --- a/datafusion/physical-expr/src/expressions/is_null.rs +++ b/datafusion/physical-expr/src/expressions/is_null.rs @@ -131,9 +131,7 @@ fn dense_union_is_null( let buffer: BooleanBuffer = offsets .iter() .zip(union_array.type_ids()) - .map(|(offset, type_id)| { - (&child_arrays[*type_id as usize]).value(*offset as usize) - }) + .map(|(offset, type_id)| child_arrays[*type_id as usize].value(*offset as usize)) .collect(); Ok(BooleanArray::new(buffer, None)) From db72566e8ffdfbb3065252fdab23a5fdf30cb4c3 Mon Sep 17 00:00:00 2001 From: Samuel Colvin Date: Mon, 8 Jul 2024 09:57:00 +0100 Subject: [PATCH 5/6] fix comments from @alamb --- .../src/expressions/is_not_null.rs | 63 ++++++++++++++----- .../physical-expr/src/expressions/is_null.rs | 50 ++++++++------- 2 files changed, 77 insertions(+), 36 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/is_not_null.rs b/datafusion/physical-expr/src/expressions/is_not_null.rs index 393c45afcd37..9f7438d13e05 100644 --- a/datafusion/physical-expr/src/expressions/is_not_null.rs +++ b/datafusion/physical-expr/src/expressions/is_not_null.rs @@ -27,7 +27,6 @@ use arrow::{ datatypes::{DataType, Schema}, record_batch::RecordBatch, }; -use arrow_array::{BooleanArray, UnionArray}; use datafusion_common::Result; use datafusion_common::ScalarValue; use datafusion_expr::ColumnarValue; @@ -75,14 +74,9 @@ impl PhysicalExpr for IsNotNullExpr { let arg = self.arg.evaluate(batch)?; match arg { ColumnarValue::Array(array) => { - let bool_array = if let Some(union_array) = - array.as_any().downcast_ref::() - { - union_is_not_null(union_array)? - } else { - compute::is_not_null(array.as_ref())? - }; - Ok(ColumnarValue::Array(Arc::new(bool_array))) + let is_null = super::is_null::compute_is_null(array)?; + let is_not_null = compute::not(&is_null)?; + Ok(ColumnarValue::Array(Arc::new(is_not_null))) } ColumnarValue::Scalar(scalar) => Ok(ColumnarValue::Scalar( ScalarValue::Boolean(Some(!scalar.is_null())), @@ -120,11 +114,6 @@ pub fn is_not_null(arg: Arc) -> Result> Ok(Arc::new(IsNotNullExpr::new(arg))) } -fn union_is_not_null(union_array: &UnionArray) -> Result { - super::is_null::union_is_null(union_array) - .map(|is_null| compute::not(&is_null).expect("Failed to compute is not null")) -} - #[cfg(test)] mod tests { use super::*; @@ -133,6 +122,8 @@ mod tests { array::{BooleanArray, StringArray}, datatypes::*, }; + use arrow_array::{Array, Float64Array, Int32Array, UnionArray}; + use arrow_buffer::ScalarBuffer; use datafusion_common::cast::as_boolean_array; #[test] @@ -156,4 +147,48 @@ mod tests { Ok(()) } + + #[test] + fn union_is_not_null_op() { + // union of [{A=1}, {A=}, {B=1.1}, {B=1.2}, {B=}] + let int_array = Int32Array::from(vec![Some(1), None, None, None, None]); + let float_array = + Float64Array::from(vec![None, None, Some(1.1), Some(1.2), None]); + let type_ids = [0, 0, 1, 1, 1].into_iter().collect::>(); + + let children = vec![Arc::new(int_array) as Arc, Arc::new(float_array)]; + + let union_fields: UnionFields = [ + (0, Arc::new(Field::new("A", DataType::Int32, true))), + (1, Arc::new(Field::new("B", DataType::Float64, true))), + ] + .into_iter() + .collect(); + + let array = + UnionArray::try_new(union_fields.clone(), type_ids, None, children).unwrap(); + + let field = Field::new( + "my_union", + DataType::Union(union_fields, UnionMode::Sparse), + true, + ); + + let schema = Schema::new(vec![field]); + let expr = is_not_null(col("my_union", &schema).unwrap()).unwrap(); + let batch = + RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)]).unwrap(); + + // expression: "a is not null" + let actual = expr + .evaluate(&batch) + .unwrap() + .into_array(batch.num_rows()) + .expect("Failed to convert to array"); + let actual = as_boolean_array(&actual).unwrap(); + + let expected = &BooleanArray::from(vec![true, false, true, true, false]); + + assert_eq!(expected, actual); + } } diff --git a/datafusion/physical-expr/src/expressions/is_null.rs b/datafusion/physical-expr/src/expressions/is_null.rs index d081294c5236..acdb3c0cd85e 100644 --- a/datafusion/physical-expr/src/expressions/is_null.rs +++ b/datafusion/physical-expr/src/expressions/is_null.rs @@ -25,7 +25,7 @@ use arrow::{ datatypes::{DataType, Schema}, record_batch::RecordBatch, }; -use arrow_array::{Array, BooleanArray, Int8Array, UnionArray}; +use arrow_array::{Array, ArrayRef, BooleanArray, Int8Array, UnionArray}; use arrow_buffer::{BooleanBuffer, ScalarBuffer}; use arrow_ord::cmp; @@ -78,14 +78,7 @@ impl PhysicalExpr for IsNullExpr { let arg = self.arg.evaluate(batch)?; match arg { ColumnarValue::Array(array) => { - let bool_array = if let Some(union_array) = - array.as_any().downcast_ref::() - { - union_is_null(union_array)? - } else { - compute::is_null(array.as_ref())? - }; - Ok(ColumnarValue::Array(Arc::new(bool_array))) + Ok(ColumnarValue::Array(Arc::new(compute_is_null(array)?))) } ColumnarValue::Scalar(scalar) => Ok(ColumnarValue::Scalar( ScalarValue::Boolean(Some(scalar.is_null())), @@ -110,11 +103,17 @@ impl PhysicalExpr for IsNullExpr { } } -pub(crate) fn union_is_null(union_array: &UnionArray) -> Result { - if let Some(offsets) = union_array.offsets() { - dense_union_is_null(union_array, offsets) +/// workaround https://github.com/apache/arrow-rs/issues/6017, +/// this can be removed once `arrow::compute::is_null` is fixed +pub(crate) fn compute_is_null(array: ArrayRef) -> Result { + if let Some(union_array) = array.as_any().downcast_ref::() { + if let Some(offsets) = union_array.offsets() { + dense_union_is_null(union_array, offsets) + } else { + sparse_union_is_null(union_array) + } } else { - sparce_union_is_null(union_array) + compute::is_null(array.as_ref()).map_err(Into::into) } } @@ -137,7 +136,7 @@ fn dense_union_is_null( Ok(BooleanArray::new(buffer, None)) } -fn sparce_union_is_null(union_array: &UnionArray) -> Result { +fn sparse_union_is_null(union_array: &UnionArray) -> Result { let type_ids = Int8Array::new(union_array.type_ids().clone(), None); let mut union_is_null = @@ -214,12 +213,16 @@ mod tests { #[test] fn sparse_union_is_null() { - // union of [{A=1}, {A=}, {B=3.2}, {B=}, {C="a"}, {C=}] - let int_array = Int32Array::from(vec![Some(1), None, None, None, None, None]); + // union of [{A=1}, {A=}, {B=1.1}, {B=1.2}, {B=}, {C=}, {C="a"}] + let int_array = + Int32Array::from(vec![Some(1), None, None, None, None, None, None]); let float_array = - Float64Array::from(vec![None, None, Some(3.2), None, None, None]); - let str_array = StringArray::from(vec![None, None, None, None, Some("a"), None]); - let type_ids = [0, 0, 1, 1, 2, 2].into_iter().collect::>(); + Float64Array::from(vec![None, None, Some(1.1), Some(1.2), None, None, None]); + let str_array = + StringArray::from(vec![None, None, None, None, None, None, Some("a")]); + let type_ids = [0, 0, 1, 1, 1, 2, 2] + .into_iter() + .collect::>(); let children = vec![ Arc::new(int_array) as Arc, @@ -230,9 +233,11 @@ mod tests { let array = UnionArray::try_new(union_fields(), type_ids, None, children).unwrap(); - let result = union_is_null(&array).unwrap(); + let array_ref = Arc::new(array) as ArrayRef; + let result = compute_is_null(array_ref).unwrap(); - let expected = &BooleanArray::from(vec![false, true, false, true, false, true]); + let expected = + &BooleanArray::from(vec![false, true, false, false, true, true, false]); assert_eq!(expected, &result); } @@ -257,7 +262,8 @@ mod tests { UnionArray::try_new(union_fields(), type_ids, Some(offsets), children) .unwrap(); - let result = union_is_null(&array).unwrap(); + let array_ref = Arc::new(array) as ArrayRef; + let result = compute_is_null(array_ref).unwrap(); let expected = &BooleanArray::from(vec![false, true, false, true, false, true]); assert_eq!(expected, &result); From 229255d4ea2156e86321059ffd639a487ccbd235 Mon Sep 17 00:00:00 2001 From: Samuel Colvin Date: Mon, 8 Jul 2024 10:30:06 +0100 Subject: [PATCH 6/6] fix docstring --- datafusion/physical-expr/src/expressions/is_null.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/is_null.rs b/datafusion/physical-expr/src/expressions/is_null.rs index acdb3c0cd85e..e2dc941e26bc 100644 --- a/datafusion/physical-expr/src/expressions/is_null.rs +++ b/datafusion/physical-expr/src/expressions/is_null.rs @@ -103,8 +103,8 @@ impl PhysicalExpr for IsNullExpr { } } -/// workaround https://github.com/apache/arrow-rs/issues/6017, -/// this can be removed once `arrow::compute::is_null` is fixed +/// workaround , +/// this can be replaced with a direct call to `arrow::compute::is_null` once it's fixed. pub(crate) fn compute_is_null(array: ArrayRef) -> Result { if let Some(union_array) = array.as_any().downcast_ref::() { if let Some(offsets) = union_array.offsets() {