diff --git a/integration_tests/src/main/python/hybrid_parquet_test.py b/integration_tests/src/main/python/hybrid_parquet_test.py index 977a7d3ab0c..9482616366d 100644 --- a/integration_tests/src/main/python/hybrid_parquet_test.py +++ b/integration_tests/src/main/python/hybrid_parquet_test.py @@ -227,3 +227,22 @@ def udf_fallback(s): assert_gpu_and_cpu_are_equal_collect( lambda spark: spark.read.parquet(data_path).filter("ascii(a) >= 50 and udf_fallback(a) = 'udf_100'"), conf=filter_split_conf) + +@pytest.mark.skipif(is_databricks_runtime(), reason="Hybrid feature does not support Databricks currently") +@pytest.mark.skipif(not is_hybrid_backend_loaded(), reason="HybridScan specialized tests") +@hybrid_test +@allow_non_gpu(*non_utc_allow) +def test_hybrid_parquet_filter_pushdown_timestamp(spark_tmp_path): + data_path = spark_tmp_path + '/PARQUET_DATA' + with_cpu_session( + lambda spark: gen_df(spark, [('a', TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc)))]).write.parquet(data_path), + conf=rebase_write_corrected_conf) + + # Timestamp is not fully supported in Hybrid Filter, so it should remain on the GPU + plan = with_gpu_session( + lambda spark: spark.read.parquet(data_path).filter(f.col("a") > f.lit(datetime(2024, 1, 1, tzinfo=timezone.utc)))._jdf.queryExecution().executedPlan(), + conf=filter_split_conf) + check_filter_pushdown(plan, pushed_exprs=[], not_pushed_exprs=['isnotnull', '>']) + assert_gpu_and_cpu_are_equal_collect( + lambda spark: spark.read.parquet(data_path).filter(f.col("a") > f.lit(datetime(2024, 1, 1, tzinfo=timezone.utc))), + conf=filter_split_conf) diff --git a/sql-plugin/src/main/scala/org/apache/spark/rapids/hybrid/HybridExecutionUtils.scala b/sql-plugin/src/main/scala/org/apache/spark/rapids/hybrid/HybridExecutionUtils.scala index 99dd09fec24..ec701957c17 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/rapids/hybrid/HybridExecutionUtils.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/rapids/hybrid/HybridExecutionUtils.scala @@ -318,8 +318,13 @@ object HybridExecutionUtils extends PredicateHelper { } } + def isTimestampCondition(expr: Expression): Boolean = { + expr.references.exists(attr => attr.dataType == TimestampType) + } + def isExprSupportedByHybridScan(condition: Expression, whitelistExprsName: String): Boolean = { condition match { + case filter if isTimestampCondition(filter) => false // Timestamp is not fully supported in Hybrid Filter case filter if HybridExecutionUtils.supportedByHybridFilters(whitelistExprsName) .exists(_.isInstance(filter)) => val childrenSupported = filter.children.forall(