From ed1a84650527701eee7d3124aee770bfb4cdac12 Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Fri, 2 Aug 2024 23:49:41 -0700 Subject: [PATCH 1/7] fix: Optimize rpad --- native/spark-expr/src/scalar_funcs.rs | 49 ++++++++++++++++----------- 1 file changed, 29 insertions(+), 20 deletions(-) diff --git a/native/spark-expr/src/scalar_funcs.rs b/native/spark-expr/src/scalar_funcs.rs index 7cbaf12aa..7cb163905 100644 --- a/native/spark-expr/src/scalar_funcs.rs +++ b/native/spark-expr/src/scalar_funcs.rs @@ -15,15 +15,14 @@ // specific language governing permissions and limitations // under the License. -use std::{cmp::min, sync::Arc}; - use arrow::{ array::{ - ArrayRef, AsArray, Decimal128Builder, Float32Array, Float64Array, GenericStringArray, - Int16Array, Int32Array, Int64Array, Int64Builder, Int8Array, OffsetSizeTrait, + ArrayRef, AsArray, Decimal128Builder, Float32Array, Float64Array, Int16Array, Int32Array, + Int64Array, Int64Builder, Int8Array, OffsetSizeTrait, }, datatypes::{validate_decimal_precision, Decimal128Type, Int64Type}, }; +use arrow_array::builder::GenericStringBuilder; use arrow_array::{Array, ArrowNativeTypeOp, BooleanArray, Decimal128Array}; use arrow_schema::{DataType, DECIMAL128_MAX_PRECISION}; use datafusion::{functions::math::round::round, physical_plan::ColumnarValue}; @@ -35,6 +34,8 @@ use num::{ integer::{div_ceil, div_floor}, BigInt, Signed, ToPrimitive, }; +use std::fmt::Write; +use std::{cmp::min, sync::Arc}; use unicode_segmentation::UnicodeSegmentation; mod unhex; @@ -390,7 +391,7 @@ pub fn spark_round( pub fn spark_rpad(args: &[ColumnarValue]) -> Result { match args { [ColumnarValue::Array(array), ColumnarValue::Scalar(ScalarValue::Int32(Some(length)))] => { - match args[0].data_type() { + match array.data_type() { DataType::Utf8 => spark_rpad_internal::(array, *length), DataType::LargeUtf8 => spark_rpad_internal::(array, *length), // TODO: handle Dictionary types @@ -410,29 +411,37 @@ fn spark_rpad_internal( length: i32, ) -> Result { let string_array = as_generic_string_array::(array)?; + let length = 0.max(length) as usize; + let empty_str = ""; + let space_string = " ".repeat(length); + + let mut builder = + GenericStringBuilder::::with_capacity(string_array.len(), string_array.len() * length); - let result = string_array - .iter() - .map(|string| match string { + for string in string_array.iter() { + match string { Some(string) => { - let length = if length < 0 { 0 } else { length as usize }; if length == 0 { - Ok(Some("".to_string())) + builder.append_value(empty_str); + } else if length == 1 && string.len() > 0 { + // Special case: when length == 1, no need to calculate expensive graphemes + builder.append_value(string); } else { - let graphemes = string.graphemes(true).collect::>(); - if length < graphemes.len() { - Ok(Some(string.to_string())) + let graphemes_len = string.graphemes(true).count(); + if length <= graphemes_len { + builder.append_value(string); } else { - let mut s = string.to_string(); - s.push_str(" ".repeat(length - graphemes.len()).as_str()); - Ok(Some(s)) + // write_str updates only the value buffer, not null nor offset buffer + // This is convenient for concatenating str(s) + builder.write_str(string)?; + builder.append_value(&space_string[graphemes_len..]); } } } - _ => Ok(None), - }) - .collect::, DataFusionError>>()?; - Ok(ColumnarValue::Array(Arc::new(result))) + _ => builder.append_null(), + } + } + Ok(ColumnarValue::Array(Arc::new(builder.finish()))) } // Let Decimal(p3, s3) as return type i.e. Decimal(p1, s1) / Decimal(p2, s2) = Decimal(p3, s3). From efc62865ae9d66c854b2b9839eb3b085bdee642c Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Sat, 3 Aug 2024 03:16:45 -0700 Subject: [PATCH 2/7] fix: Optimize rpad --- native/spark-expr/src/scalar_funcs.rs | 23 +++++++------------ .../tpcds-micro-benchmarks/char_type.sql | 7 ++++++ .../benchmark/CometTPCDSMicroBenchmark.scala | 1 + 3 files changed, 16 insertions(+), 15 deletions(-) create mode 100644 spark/src/test/resources/tpcds-micro-benchmarks/char_type.sql diff --git a/native/spark-expr/src/scalar_funcs.rs b/native/spark-expr/src/scalar_funcs.rs index 7cb163905..4212e48ab 100644 --- a/native/spark-expr/src/scalar_funcs.rs +++ b/native/spark-expr/src/scalar_funcs.rs @@ -36,7 +36,6 @@ use num::{ }; use std::fmt::Write; use std::{cmp::min, sync::Arc}; -use unicode_segmentation::UnicodeSegmentation; mod unhex; pub use unhex::spark_unhex; @@ -412,7 +411,6 @@ fn spark_rpad_internal( ) -> Result { let string_array = as_generic_string_array::(array)?; let length = 0.max(length) as usize; - let empty_str = ""; let space_string = " ".repeat(length); let mut builder = @@ -421,21 +419,16 @@ fn spark_rpad_internal( for string in string_array.iter() { match string { Some(string) => { - if length == 0 { - builder.append_value(empty_str); - } else if length == 1 && string.len() > 0 { - // Special case: when length == 1, no need to calculate expensive graphemes + // It looks Spark's UTF8String is closer to chars rather than graphemes + // https://stackoverflow.com/a/46290728 + let char_len = string.chars().count(); + if length <= char_len { builder.append_value(string); } else { - let graphemes_len = string.graphemes(true).count(); - if length <= graphemes_len { - builder.append_value(string); - } else { - // write_str updates only the value buffer, not null nor offset buffer - // This is convenient for concatenating str(s) - builder.write_str(string)?; - builder.append_value(&space_string[graphemes_len..]); - } + // write_str updates only the value buffer, not null nor offset buffer + // This is convenient for concatenating str(s) + builder.write_str(string)?; + builder.append_value(&space_string[char_len..]); } } _ => builder.append_null(), diff --git a/spark/src/test/resources/tpcds-micro-benchmarks/char_type.sql b/spark/src/test/resources/tpcds-micro-benchmarks/char_type.sql new file mode 100644 index 000000000..8a5359d4c --- /dev/null +++ b/spark/src/test/resources/tpcds-micro-benchmarks/char_type.sql @@ -0,0 +1,7 @@ +SELECT + cd_gender +FROM customer_demographics +WHERE + cd_gender = 'M' AND + cd_marital_status = 'S' AND + cd_education_status = 'College' diff --git a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala index 01909a4d7..9e6c2fd7b 100644 --- a/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala +++ b/spark/src/test/scala/org/apache/spark/sql/benchmark/CometTPCDSMicroBenchmark.scala @@ -62,6 +62,7 @@ object CometTPCDSMicroBenchmark extends CometTPCQueryBenchmarkBase { "agg_sum_integers_no_grouping", "case_when_column_or_null", "case_when_scalar", + "char_type", "filter_highly_selective", "filter_less_selective", "if_column_or_null", From 567b3ec95a46025f13aafc040c9c955ff8a0d019 Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Sat, 3 Aug 2024 14:34:28 -0700 Subject: [PATCH 3/7] fix: Optimize rpad --- native/Cargo.lock | 1 - native/spark-expr/Cargo.toml | 1 - 2 files changed, 2 deletions(-) diff --git a/native/Cargo.lock b/native/Cargo.lock index 3f6b1d1c7..5efc1098c 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -942,7 +942,6 @@ dependencies = [ "regex", "thiserror", "twox-hash", - "unicode-segmentation", ] [[package]] diff --git a/native/spark-expr/Cargo.toml b/native/spark-expr/Cargo.toml index 96eae39ff..1a8c8aeb4 100644 --- a/native/spark-expr/Cargo.toml +++ b/native/spark-expr/Cargo.toml @@ -41,7 +41,6 @@ chrono-tz = { workspace = true } num = { workspace = true } regex = { workspace = true } thiserror = { workspace = true } -unicode-segmentation = "1.11.0" [dev-dependencies] arrow-data = {workspace = true} From 46434054dba8eff446e2fdff1ffa3dcc9b6186f1 Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Wed, 7 Aug 2024 10:38:37 -0700 Subject: [PATCH 4/7] address review comments --- .../datafusion/expressions/comet_scalar_funcs.rs | 10 +++++----- native/core/src/execution/datafusion/planner.rs | 15 ++++++++++----- native/spark-expr/src/scalar_funcs.rs | 12 ++++++------ .../org/apache/comet/serde/QueryPlanSerde.scala | 4 ++-- .../org/apache/comet/CometExpressionSuite.scala | 12 ++++++++++++ .../spark/sql/comet/CometPlanStabilitySuite.scala | 2 -- 6 files changed, 35 insertions(+), 20 deletions(-) diff --git a/native/core/src/execution/datafusion/expressions/comet_scalar_funcs.rs b/native/core/src/execution/datafusion/expressions/comet_scalar_funcs.rs index 70cbdebae..1203f90d7 100644 --- a/native/core/src/execution/datafusion/expressions/comet_scalar_funcs.rs +++ b/native/core/src/execution/datafusion/expressions/comet_scalar_funcs.rs @@ -21,8 +21,8 @@ use datafusion_comet_spark_expr::scalar_funcs::hash_expressions::{ }; use datafusion_comet_spark_expr::scalar_funcs::{ spark_ceil, spark_decimal_div, spark_floor, spark_hex, spark_isnan, spark_make_decimal, - spark_murmur3_hash, spark_round, spark_rpad, spark_unhex, spark_unscaled_value, spark_xxhash64, - SparkChrFunc, + spark_murmur3_hash, spark_read_side_padding, spark_round, spark_unhex, spark_unscaled_value, + spark_xxhash64, SparkChrFunc, }; use datafusion_common::{DataFusionError, Result as DataFusionResult}; use datafusion_expr::registry::FunctionRegistry; @@ -67,9 +67,9 @@ pub fn create_comet_physical_fun( "floor" => { make_comet_scalar_udf!("floor", spark_floor, data_type) } - "rpad" => { - let func = Arc::new(spark_rpad); - make_comet_scalar_udf!("rpad", func, without data_type) + "read_side_padding" => { + let func = Arc::new(spark_read_side_padding); + make_comet_scalar_udf!("read_side_padding", func, without data_type) } "round" => { make_comet_scalar_udf!("round", spark_round, data_type) diff --git a/native/core/src/execution/datafusion/planner.rs b/native/core/src/execution/datafusion/planner.rs index 5bfd76797..51746520f 100644 --- a/native/core/src/execution/datafusion/planner.rs +++ b/native/core/src/execution/datafusion/planner.rs @@ -1660,11 +1660,16 @@ impl PhysicalPlanner { let data_type = match expr.return_type.as_ref().map(to_arrow_datatype) { Some(t) => t, - None => self - .session_ctx - .udf(fun_name)? - .inner() - .return_type(&input_expr_types)?, + None => { + let fun_name = match fun_name.as_str() { + "read_side_padding" => "rpad", // use the same return type as rpad + other => other, + }; + self.session_ctx + .udf(fun_name)? + .inner() + .return_type(&input_expr_types)? + } }; let fun_expr = diff --git a/native/spark-expr/src/scalar_funcs.rs b/native/spark-expr/src/scalar_funcs.rs index 4212e48ab..ffd6fd212 100644 --- a/native/spark-expr/src/scalar_funcs.rs +++ b/native/spark-expr/src/scalar_funcs.rs @@ -387,25 +387,25 @@ pub fn spark_round( } /// Similar to DataFusion `rpad`, but not to truncate when the string is already longer than length -pub fn spark_rpad(args: &[ColumnarValue]) -> Result { +pub fn spark_read_side_padding(args: &[ColumnarValue]) -> Result { match args { [ColumnarValue::Array(array), ColumnarValue::Scalar(ScalarValue::Int32(Some(length)))] => { match array.data_type() { - DataType::Utf8 => spark_rpad_internal::(array, *length), - DataType::LargeUtf8 => spark_rpad_internal::(array, *length), + DataType::Utf8 => spark_read_side_padding_internal::(array, *length), + DataType::LargeUtf8 => spark_read_side_padding_internal::(array, *length), // TODO: handle Dictionary types other => Err(DataFusionError::Internal(format!( - "Unsupported data type {other:?} for function rpad", + "Unsupported data type {other:?} for function read_side_padding", ))), } } other => Err(DataFusionError::Internal(format!( - "Unsupported arguments {other:?} for function rpad", + "Unsupported arguments {other:?} for function read_side_padding", ))), } } -fn spark_rpad_internal( +fn spark_read_side_padding_internal( array: &ArrayRef, length: i32, ) -> Result { diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index e5acd245c..898041198 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2143,7 +2143,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim } // With Spark 3.4, CharVarcharCodegenUtils.readSidePadding gets called to pad spaces for - // char types. Use rpad to achieve the behavior. + // char types. // See https://github.com/apache/spark/pull/38151 case s: StaticInvoke if s.staticObject.isInstanceOf[Class[CharVarcharCodegenUtils]] && @@ -2159,7 +2159,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim if (argsExpr.forall(_.isDefined)) { val builder = ExprOuterClass.ScalarFunc.newBuilder() - builder.setFunc("rpad") + builder.setFunc("read_side_padding") argsExpr.foreach(arg => builder.addArgs(arg.get)) Some(ExprOuterClass.Expr.newBuilder().setScalarFunc(builder).build()) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index c22c6b06a..fcaf4efaf 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -1756,4 +1756,16 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } } + + test("readSidePadding") { + // https://stackoverflow.com/a/46290728 + val table = "test" + withTable(table) { + sql(s"create table $table(col1 CHAR(2)) using parquet") + sql(s"insert into $table values('é')") // unicode 'e\\u{301}' + sql(s"insert into $table values('é')") // unicode '\\u{e9}' + + checkSparkAnswerAndOperator(s"SELECT * FROM $table") + } + } } diff --git a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala index 62c9d0224..f50cb3f9b 100644 --- a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala @@ -256,14 +256,12 @@ trait CometPlanStabilitySuite extends DisableAdaptiveExecutionSuite with TPCDSBa val queryString = resourceToString( s"$tpcdsGroup/$query.sql", classLoader = Thread.currentThread().getContextClassLoader) - // Disable char/varchar read-side handling for better performance. withSQLConf( CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true", // needed for v1.4/q9, v1.4/q44, v2.7.0/q6, v2.7.0/q64 - "spark.sql.readSideCharPadding" -> "false", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "10MB") { val qe = sql(queryString).queryExecution val plan = qe.executedPlan From f5d128c49862cefe3947d43bb5009d4658409565 Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Wed, 7 Aug 2024 14:44:46 -0700 Subject: [PATCH 5/7] address review comments --- .../apache/comet/CometExpressionSuite.scala | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index fcaf4efaf..341df1bb3 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -1726,6 +1726,18 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } + test("readSidePadding") { + // https://stackoverflow.com/a/46290728 + val table = "test" + withTable(table) { + sql(s"create table $table(col1 CHAR(2)) using parquet") + sql(s"insert into $table values('é')") // unicode 'e\\u{301}' + sql(s"insert into $table values('é')") // unicode '\\u{e9}' + + checkSparkAnswerAndOperator(s"SELECT * FROM $table") + } + } + test("isnan") { Seq("true", "false").foreach { dictionary => withSQLConf("parquet.enable.dictionary" -> dictionary) { @@ -1756,16 +1768,4 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } } - - test("readSidePadding") { - // https://stackoverflow.com/a/46290728 - val table = "test" - withTable(table) { - sql(s"create table $table(col1 CHAR(2)) using parquet") - sql(s"insert into $table values('é')") // unicode 'e\\u{301}' - sql(s"insert into $table values('é')") // unicode '\\u{e9}' - - checkSparkAnswerAndOperator(s"SELECT * FROM $table") - } - } } From d647fe42c98189cb48c61f7d84735a0ab25885e9 Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Wed, 7 Aug 2024 17:01:11 -0700 Subject: [PATCH 6/7] address review comments --- .../org/apache/spark/sql/comet/CometPlanStabilitySuite.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala index f50cb3f9b..62c9d0224 100644 --- a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala @@ -256,12 +256,14 @@ trait CometPlanStabilitySuite extends DisableAdaptiveExecutionSuite with TPCDSBa val queryString = resourceToString( s"$tpcdsGroup/$query.sql", classLoader = Thread.currentThread().getContextClassLoader) + // Disable char/varchar read-side handling for better performance. withSQLConf( CometConf.COMET_ENABLED.key -> "true", CometConf.COMET_EXEC_ENABLED.key -> "true", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true", CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true", // needed for v1.4/q9, v1.4/q44, v2.7.0/q6, v2.7.0/q64 + "spark.sql.readSideCharPadding" -> "false", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "10MB") { val qe = sql(queryString).queryExecution val plan = qe.executedPlan From a5f75a12e772779158a2c7dc109981fea018437b Mon Sep 17 00:00:00 2001 From: Kazuyuki Tanimura Date: Wed, 7 Aug 2024 17:11:32 -0700 Subject: [PATCH 7/7] address review comments --- .../src/test/scala/org/apache/comet/CometExpressionSuite.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala index bab8c2bad..ded5bc5c5 100644 --- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala @@ -1918,6 +1918,8 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper { sql(s"create table $table(col1 CHAR(2)) using parquet") sql(s"insert into $table values('é')") // unicode 'e\\u{301}' sql(s"insert into $table values('é')") // unicode '\\u{e9}' + sql(s"insert into $table values('')") + sql(s"insert into $table values('ab')") checkSparkAnswerAndOperator(s"SELECT * FROM $table") }