diff --git a/integration_tests/src/main/python/parquet_test.py b/integration_tests/src/main/python/parquet_test.py index 6032d469fb2..7d041b387e4 100644 --- a/integration_tests/src/main/python/parquet_test.py +++ b/integration_tests/src/main/python/parquet_test.py @@ -485,6 +485,8 @@ def test_parquet_read_buffer_allocation_empty_blocks(spark_tmp_path, v1_enabled_ lambda spark : spark.read.parquet(data_path).filter("id < 2 or id > 990"), conf=all_confs) + +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 @pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) @pytest.mark.skipif(is_databricks_runtime(), reason="https://github.com/NVIDIA/spark-rapids/issues/7733") @@ -797,6 +799,8 @@ def test_parquet_read_nano_as_longs_true(std_input_path): 'FileSourceScanExec', conf=conf) + +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 def test_many_column_project(): def _create_wide_data_frame(spark, num_cols): schema_dict = {} @@ -1285,27 +1289,64 @@ def test_parquet_read_case_insensitivity(spark_tmp_path): ) -# test read INT32 as INT8/INT16/Date -@pytest.mark.parametrize('reader_confs', reader_opt_confs) -@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_parquet_int32_downcast(spark_tmp_path, reader_confs, v1_enabled_list): +def run_test_parquet_int32_downcast(spark_tmp_path, + reader_confs, + v1_enabled_list, + ansi_conf): + """ + This tests whether Parquet files with columns written as INT32 can be + read as having INT8, INT16 and DATE columns, with ANSI mode enabled/disabled. + """ data_path = spark_tmp_path + '/PARQUET_DATA' write_schema = [("d", date_gen), ('s', short_gen), ('b', byte_gen)] + + # For test setup, write with ANSI disabled. + # Otherwise, CAST(d AS INT) will fail on Spark CPU. with_cpu_session( lambda spark: gen_df(spark, write_schema).selectExpr( "cast(d as Int) as d", "cast(s as Int) as s", - "cast(b as Int) as b").write.parquet(data_path)) + "cast(b as Int) as b").write.parquet(data_path), conf=ansi_disabled_conf) read_schema = StructType([StructField("d", DateType()), StructField("s", ShortType()), StructField("b", ByteType())]) conf = copy_and_update(reader_confs, - {'spark.sql.sources.useV1SourceList': v1_enabled_list}) + {'spark.sql.sources.useV1SourceList': v1_enabled_list}, + ansi_conf) assert_gpu_and_cpu_are_equal_collect( lambda spark: spark.read.schema(read_schema).parquet(data_path), conf=conf) + +@pytest.mark.parametrize('reader_confs', reader_opt_confs) +@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) +def test_parquet_int32_downcast_ansi_disabled(spark_tmp_path, reader_confs, v1_enabled_list): + """ + This tests whether Parquet files with columns written as INT32 can be + read as having INT8, INT16 and DATE columns, with ANSI mode disabled. + """ + run_test_parquet_int32_downcast(spark_tmp_path, + reader_confs, + v1_enabled_list, + ansi_disabled_conf) + + +def test_parquet_int32_downcast_ansi_enabled(spark_tmp_path): + """ + This is the flipside of test_parquet_int32_downcast_ansi_disabled. + This tests whether Parquet files with columns written as INT32 can be + read as having INT8, INT16 and DATE columns, now tested with ANSI + enabled. + A limited combination of test parameters is used to test ANSI enabled, + in the interest of brevity. + """ + run_test_parquet_int32_downcast(spark_tmp_path, + reader_confs=native_parquet_file_reader_conf, + v1_enabled_list="", + ansi_conf=ansi_disabled_conf) + + @pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) @pytest.mark.parametrize("types", [("byte", "short"), ("byte", "int"), ("short", "int")], ids=idfn) @@ -1340,6 +1381,10 @@ def test_parquet_nested_column_missing(spark_tmp_path, reader_confs, v1_enabled_ lambda spark: spark.read.schema(read_schema).parquet(data_path), conf=conf) +@pytest.mark.skipif(condition=is_databricks_runtime() and is_databricks_version_or_later(14,3), + reason="https://github.com/NVIDIA/spark-rapids/issues/11512") +@pytest.mark.skipif(condition=is_spark_400_or_later(), + reason="https://github.com/NVIDIA/spark-rapids/issues/11512") def test_parquet_check_schema_compatibility(spark_tmp_path): data_path = spark_tmp_path + '/PARQUET_DATA' gen_list = [('int', int_gen), ('long', long_gen), ('dec32', decimal_gen_32bit)] @@ -1431,13 +1476,16 @@ def test_parquet_read_encryption(spark_tmp_path, reader_confs, v1_enabled_list): assert_spark_exception( lambda: with_gpu_session( lambda spark: spark.read.parquet(data_path).collect()), - error_message='Could not read footer for file') + error_message='Could not read footer') # Common message fragment between all Spark versions. + # Note that this isn't thrown explicitly by the plugin. assert_spark_exception( lambda: with_gpu_session( lambda spark: spark.read.parquet(data_path).collect(), conf=conf), error_message='The GPU does not support reading encrypted Parquet files') + +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 def test_parquet_read_count(spark_tmp_path): parquet_gens = [int_gen, string_gen, double_gen] gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)]