From 4a78c01998dc65b8a3ddfd77b44bc07a816766d8 Mon Sep 17 00:00:00 2001 From: Haoyang Li Date: Wed, 12 Feb 2025 18:23:40 +0800 Subject: [PATCH 1/3] fix velox runtime error in hybrid scan when filter timestamp Signed-off-by: Haoyang Li --- .../src/main/python/hybrid_parquet_test.py | 20 +++++++++++++++++-- .../rapids/hybrid/HybridExecutionUtils.scala | 5 +++++ 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/integration_tests/src/main/python/hybrid_parquet_test.py b/integration_tests/src/main/python/hybrid_parquet_test.py index 977a7d3ab0c..7a9c3f15cb9 100644 --- a/integration_tests/src/main/python/hybrid_parquet_test.py +++ b/integration_tests/src/main/python/hybrid_parquet_test.py @@ -159,8 +159,8 @@ def test_hybrid_parquet_read_fallback_to_gpu(spark_tmp_path, parquet_gens): def check_filter_pushdown(plan, pushed_exprs, not_pushed_exprs): plan = str(plan) filter_part, scan_part = plan.split("Scan parquet") - for expr in pushed_exprs: - assert expr in scan_part + # for expr in pushed_exprs: + # assert expr in scan_part for expr in not_pushed_exprs: assert expr in filter_part @@ -227,3 +227,19 @@ def udf_fallback(s): assert_gpu_and_cpu_are_equal_collect( lambda spark: spark.read.parquet(data_path).filter("ascii(a) >= 50 and udf_fallback(a) = 'udf_100'"), conf=filter_split_conf) + +@allow_non_gpu(*non_utc_allow) +def test_hybrid_parquet_filter_pushdown_timestamp(spark_tmp_path): + data_path = spark_tmp_path + '/PARQUET_DATA' + with_cpu_session( + lambda spark: gen_df(spark, [('a', TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc)))]).write.parquet(data_path), + conf=rebase_write_corrected_conf) + + # Timestamp is not fully supported in Hybrid Filter, so it should remain on the GPU + plan = with_gpu_session( + lambda spark: spark.read.parquet(data_path).filter(f.col("a") > f.lit(datetime(2024, 1, 1, tzinfo=timezone.utc)))._jdf.queryExecution().executedPlan(), + conf=filter_split_conf) + check_filter_pushdown(plan, pushed_exprs=[], not_pushed_exprs=['isnotnull', '>']) + assert_gpu_and_cpu_are_equal_collect( + lambda spark: spark.read.parquet(data_path).filter(f.col("a") > f.lit(datetime(2024, 1, 1, tzinfo=timezone.utc))), + conf=filter_split_conf) diff --git a/sql-plugin/src/main/scala/org/apache/spark/rapids/hybrid/HybridExecutionUtils.scala b/sql-plugin/src/main/scala/org/apache/spark/rapids/hybrid/HybridExecutionUtils.scala index 99dd09fec24..ec701957c17 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/rapids/hybrid/HybridExecutionUtils.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/rapids/hybrid/HybridExecutionUtils.scala @@ -318,8 +318,13 @@ object HybridExecutionUtils extends PredicateHelper { } } + def isTimestampCondition(expr: Expression): Boolean = { + expr.references.exists(attr => attr.dataType == TimestampType) + } + def isExprSupportedByHybridScan(condition: Expression, whitelistExprsName: String): Boolean = { condition match { + case filter if isTimestampCondition(filter) => false // Timestamp is not fully supported in Hybrid Filter case filter if HybridExecutionUtils.supportedByHybridFilters(whitelistExprsName) .exists(_.isInstance(filter)) => val childrenSupported = filter.children.forall( From ccc3276d9fe5ad8e2fc1fbc7691ef78d3020b390 Mon Sep 17 00:00:00 2001 From: Haoyang Li Date: Wed, 12 Feb 2025 18:27:21 +0800 Subject: [PATCH 2/3] refine Signed-off-by: Haoyang Li --- integration_tests/src/main/python/hybrid_parquet_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/integration_tests/src/main/python/hybrid_parquet_test.py b/integration_tests/src/main/python/hybrid_parquet_test.py index 7a9c3f15cb9..f384040c7d6 100644 --- a/integration_tests/src/main/python/hybrid_parquet_test.py +++ b/integration_tests/src/main/python/hybrid_parquet_test.py @@ -159,8 +159,8 @@ def test_hybrid_parquet_read_fallback_to_gpu(spark_tmp_path, parquet_gens): def check_filter_pushdown(plan, pushed_exprs, not_pushed_exprs): plan = str(plan) filter_part, scan_part = plan.split("Scan parquet") - # for expr in pushed_exprs: - # assert expr in scan_part + for expr in pushed_exprs: + assert expr in scan_part for expr in not_pushed_exprs: assert expr in filter_part From e355b76ee25481d898e3d0aa2cdfbd93a3f51eeb Mon Sep 17 00:00:00 2001 From: Haoyang Li Date: Thu, 13 Feb 2025 10:40:32 +0800 Subject: [PATCH 3/3] should be hybrid test Signed-off-by: Haoyang Li --- integration_tests/src/main/python/hybrid_parquet_test.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/integration_tests/src/main/python/hybrid_parquet_test.py b/integration_tests/src/main/python/hybrid_parquet_test.py index f384040c7d6..9482616366d 100644 --- a/integration_tests/src/main/python/hybrid_parquet_test.py +++ b/integration_tests/src/main/python/hybrid_parquet_test.py @@ -228,6 +228,9 @@ def udf_fallback(s): lambda spark: spark.read.parquet(data_path).filter("ascii(a) >= 50 and udf_fallback(a) = 'udf_100'"), conf=filter_split_conf) +@pytest.mark.skipif(is_databricks_runtime(), reason="Hybrid feature does not support Databricks currently") +@pytest.mark.skipif(not is_hybrid_backend_loaded(), reason="HybridScan specialized tests") +@hybrid_test @allow_non_gpu(*non_utc_allow) def test_hybrid_parquet_filter_pushdown_timestamp(spark_tmp_path): data_path = spark_tmp_path + '/PARQUET_DATA'