Skip to content

Commit

Permalink
Spark 4: Fix miscellaneous tests including logic, repart, hive_delimi…
Browse files Browse the repository at this point in the history
…ted. [databricks] (#11129)

Fixes #11031.

This PR addresses tests that fail on Spark 4.0 in the following files:
1. `integration_tests/src/main/python/datasourcev2_read_test.py`
2. `integration_tests/src/main/python/expand_exec_test.py`
3. `integration_tests/src/main/python/get_json_test.py`
4. `integration_tests/src/main/python/hive_delimited_text_test.py`
5. `integration_tests/src/main/python/logic_test.py`
6. `integration_tests/src/main/python/repart_test.py`
7. `integration_tests/src/main/python/time_window_test.py`
8. `integration_tests/src/main/python/json_matrix_test.py`
9. `integration_tests/src/main/python/misc_expr_test.py`
10. `integration_tests/src/main/python/orc_write_test.py`

Signed-off-by: MithunR <mithunr@nvidia.com>
  • Loading branch information
mythrocks authored Jul 18, 2024
1 parent 7027304 commit 125feb2
Show file tree
Hide file tree
Showing 10 changed files with 112 additions and 23 deletions.
6 changes: 5 additions & 1 deletion integration_tests/src/main/python/datasourcev2_read_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2021, NVIDIA CORPORATION.
# Copyright (c) 2021-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -41,12 +41,16 @@ def test_read_all_types():
readTable("int,bool,byte,short,long,string,float,double,date,timestamp", columnarClass),
conf={'spark.rapids.sql.castFloatToString.enabled': 'true'})


@disable_ansi_mode # Cannot run in ANSI mode until COUNT aggregation is supported.
# See https://github.com/NVIDIA/spark-rapids/issues/5114
@validate_execs_in_gpu_plan('HostColumnarToGpu')
def test_read_all_types_count():
assert_gpu_and_cpu_row_counts_equal(
readTable("int,bool,byte,short,long,string,float,double,date,timestamp", columnarClass),
conf={'spark.rapids.sql.castFloatToString.enabled': 'true'})


@validate_execs_in_gpu_plan('HostColumnarToGpu')
def test_read_arrow_off():
assert_gpu_and_cpu_are_equal_collect(
Expand Down
8 changes: 6 additions & 2 deletions integration_tests/src/main/python/expand_exec_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@

from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_are_equal_sql
from data_gen import *
import pyspark.sql.functions as f
from marks import ignore_order
from marks import disable_ansi_mode, ignore_order
from pyspark.sql import functions as f

@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
# Many Spark versions have issues sorting large decimals,
Expand All @@ -38,6 +38,10 @@ def op_df(spark, length=2048):
"select count(b), count(c) from pre_pro group by cube((a+b), if((a+b)>100, c, null))",
"select count(b), count(c) from pre_pro group by rollup((a+b), if((a+b)>100, c, null))"]


@disable_ansi_mode # Cannot run in ANSI mode until COUNT aggregation is supported.
# See https://github.com/NVIDIA/spark-rapids/issues/5114
# Additionally, this test should protect against overflow on (a+b).
@ignore_order(local=True)
@pytest.mark.parametrize('sql', pre_pro_sqls, ids=["distinct_agg", "cube", "rollup"])
def test_expand_pre_project(sql):
Expand Down
6 changes: 5 additions & 1 deletion integration_tests/src/main/python/get_json_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
from data_gen import *
from pyspark.sql.types import *
from marks import *
from spark_session import is_databricks113_or_later, is_databricks_runtime
from spark_init_internal import spark_version
from spark_session import is_before_spark_400, is_databricks113_or_later, is_databricks_runtime

def mk_json_str_gen(pattern):
return StringGen(pattern).with_special_case('').with_special_pattern('.{0,10}')
Expand Down Expand Up @@ -126,6 +127,9 @@ def test_get_json_object_normalize_non_string_output():
f.get_json_object('jsonStr', '$')),
conf={'spark.rapids.sql.expression.GetJsonObject': 'true'})


@pytest.mark.skipif(condition=not is_before_spark_400(),
reason="https://github.com/NVIDIA/spark-rapids/issues/11130")
def test_get_json_object_quoted_question():
schema = StructType([StructField("jsonStr", StringType())])
data = [[r'{"?":"QUESTION"}']]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2022-2023, NVIDIA CORPORATION.
# Copyright (c) 2022-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -425,6 +425,8 @@ def create_hive_table_with_custom_timestamp_format(spark, property_location):
conf=hive_text_enabled_conf)


@disable_ansi_mode # Cannot run in ANSI mode until COUNT aggregation is supported.
# See https://github.com/NVIDIA/spark-rapids/issues/5114
@pytest.mark.skipif(is_spark_cdh(),
reason="Hive text reads are disabled on CDH, as per "
"https://github.com/NVIDIA/spark-rapids/pull/7628")
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/src/main/python/json_matrix_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -723,8 +723,8 @@ def test_from_json_decs(std_input_path, input_file, dt):
pytest.param("boolean_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10479')),
pytest.param("invalid_ridealong_columns.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10534')),
pytest.param("int_array_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15278')),
"int_struct_formatted.json",
pytest.param("int_mixed_array_struct_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15278'))])
pytest.param("int_struct_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15278')),
pytest.param("int_mixed_array_struct_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/11154'))])
@pytest.mark.parametrize('read_func', [read_json_df])
def test_scan_json_strings(std_input_path, read_func, spark_tmp_table_factory, input_file):
assert_gpu_and_cpu_are_equal_collect(
Expand Down
10 changes: 7 additions & 3 deletions integration_tests/src/main/python/logic_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2022, NVIDIA CORPORATION.
# Copyright (c) 2020-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -16,7 +16,7 @@

from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_error
from data_gen import *
from spark_session import is_before_spark_330
from spark_session import is_before_spark_330, is_before_spark_400
from marks import incompat, approximate_float
from pyspark.sql.types import *
import pyspark.sql.functions as f
Expand Down Expand Up @@ -74,12 +74,16 @@ def do_it(spark, lhs_bool_arg, arith_arg, op):
ansi_conf = {'spark.sql.ansi.enabled': ansi_enabled}
bypass_map = {'AND': 'a', 'OR': 'b'}
expect_error = int_arg == INT_MAX and (lhs_arg == 'NULL' or bypass_map[logic_op] != lhs_arg)
exception = \
"java.lang.ArithmeticException" if is_before_spark_330() else \
"SparkArithmeticException" if is_before_spark_400() else \
"pyspark.errors.exceptions.captured.ArithmeticException"

if ansi_enabled == 'true' and expect_error:
assert_gpu_and_cpu_error(
df_fun=lambda spark: do_it(spark, lhs_arg, int_arg, logic_op).collect(),
conf=ansi_conf,
error_message="java.lang.ArithmeticException" if is_before_spark_330() else "SparkArithmeticException")
error_message=exception)
else:
assert_gpu_and_cpu_are_equal_collect(
func=lambda spark: do_it(spark, lhs_arg, int_arg, logic_op),
Expand Down
7 changes: 6 additions & 1 deletion integration_tests/src/main/python/misc_expr_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2022, NVIDIA CORPORATION.
# Copyright (c) 2020-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,7 @@
from marks import incompat, approximate_float
from pyspark.sql.types import *
import pyspark.sql.functions as f
from spark_session import is_before_spark_400

def test_mono_id():
assert_gpu_and_cpu_are_equal_collect(
Expand All @@ -32,6 +33,10 @@ def test_part_id():
f.col('a'),
f.spark_partition_id()))


@pytest.mark.skipif(condition=not is_before_spark_400(),
reason="raise_error() not currently implemented for Spark 4.0. "
"See https://github.com/NVIDIA/spark-rapids/issues/10107.")
def test_raise_error():
data_gen = ShortGen(nullable=False, min_val=0, max_val=20, special_cases=[])
assert_gpu_and_cpu_are_equal_collect(
Expand Down
11 changes: 8 additions & 3 deletions integration_tests/src/main/python/orc_write_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import pytest

from asserts import assert_gpu_and_cpu_writes_are_equal_collect, assert_gpu_fallback_write
from spark_session import is_before_spark_320, is_spark_321cdh, is_spark_cdh, with_cpu_session, with_gpu_session
from spark_session import is_before_spark_320, is_before_spark_400, is_spark_321cdh, is_spark_cdh, with_cpu_session, with_gpu_session
from conftest import is_not_utc
from datetime import date, datetime, timezone
from data_gen import *
Expand Down Expand Up @@ -357,6 +357,11 @@ def test_orc_write_column_name_with_dots(spark_tmp_path):
@ignore_order
def test_orc_do_not_lowercase_columns(spark_tmp_path):
data_path = spark_tmp_path + "/ORC_DATA"

# The wording of the `is not exists` error message in Spark 4.x is unfortunate, but accurate:
# https://github.com/apache/spark/blob/4501285a49e4c0429c9cf2c105f044e1c8a93d21/python/pyspark/errors/error-conditions.json#L487
expected_error_message = "No StructField named acol" if is_before_spark_400() else \
"Key `acol` is not exists."
assert_gpu_and_cpu_writes_are_equal_collect(
# column is uppercase
lambda spark, path: spark.range(0, 1000).select(col("id").alias("Acol")).write.orc(path),
Expand All @@ -367,10 +372,10 @@ def test_orc_do_not_lowercase_columns(spark_tmp_path):
with_cpu_session(lambda spark: spark.read.orc(data_path + "/CPU").schema["acol"])
assert False
except KeyError as e:
assert "No StructField named acol" in str(e)
assert expected_error_message in str(e)
try:
# reading lowercase causes exception
with_gpu_session(lambda spark: spark.read.orc(data_path + "/GPU").schema["acol"])
assert False
except KeyError as e:
assert "No StructField named acol" in str(e)
assert expected_error_message in str(e)
45 changes: 42 additions & 3 deletions integration_tests/src/main/python/repart_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2023, NVIDIA CORPORATION.
# Copyright (c) 2020-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -14,7 +14,7 @@

import pytest

from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_fallback_collect
from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_error, assert_gpu_fallback_collect
from spark_session import is_before_spark_320, is_before_spark_330
from conftest import is_not_utc
from data_gen import *
Expand Down Expand Up @@ -266,7 +266,6 @@ def test_hash_fallback(data_gen):
([('a', long_gen), ('b', StructGen([('b1', long_gen)]))], ['a']),
([('a', long_gen), ('b', ArrayGen(long_gen, max_length=2))], ['a']),
([('a', byte_gen)], [f.col('a') - 5]),
([('a', long_gen)], [f.col('a') + 15]),
([('a', ArrayGen(long_gen, max_length=2)), ('b', long_gen)], ['a']),
([('a', StructGen([('aa', ArrayGen(long_gen, max_length=2))])), ('b', long_gen)], ['a']),
([('a', byte_gen), ('b', boolean_gen)], ['a', 'b']),
Expand All @@ -291,6 +290,46 @@ def test_hash_repartition_exact(gen, num_parts):
.withColumn('hashed', f.hash(*part_on))\
.selectExpr('*', 'pmod(hashed, {})'.format(num_parts)))


@ignore_order(local=True) # To avoid extra data shuffle by 'sort on Spark' for this repartition test.
@pytest.mark.parametrize('num_parts', [1, 2, 10, 17, 19, 32], ids=idfn)
@pytest.mark.parametrize('is_ansi_mode', [False, True], ids=idfn)
@allow_non_gpu(*non_utc_allow)
def test_hash_repartition_exact_longs_no_overflow(num_parts, is_ansi_mode):
gen = LongGen(min_val=-1000, max_val=1000, special_cases=[]) if is_ansi_mode else long_gen
data_gen = [('a', gen)]
part_on = [f.col('a') + 15]
conf = {'spark.sql.ansi.enabled': is_ansi_mode}

assert_gpu_and_cpu_are_equal_collect(
lambda spark: gen_df(spark, data_gen, length=1024)
.repartition(num_parts, *part_on)
.withColumn('id', f.spark_partition_id())
.withColumn('hashed', f.hash(*part_on))
.selectExpr('*', 'pmod(hashed, {})'.format(num_parts)), conf=conf)


@ignore_order(local=True) # To avoid extra data shuffle by 'sort on Spark' for this repartition test.
@pytest.mark.parametrize('num_parts', [17], ids=idfn)
@allow_non_gpu(*non_utc_allow)
def test_hash_repartition_long_overflow_ansi_exception(num_parts):
data_gen = [('a', long_gen)]
part_on = [f.col('a') + 15]
conf = ansi_enabled_conf

def test_function(spark):
return gen_df(spark, data_gen, length=1024) \
.withColumn('plus15', f.col('a') + 15) \
.repartition(num_parts, f.col('plus15')) \
.withColumn('id', f.spark_partition_id()) \
.withColumn('hashed', f.hash(*part_on)) \
.selectExpr('*', 'pmod(hashed, {})'.format(num_parts))

assert_gpu_and_cpu_error(
lambda spark: test_function(spark).collect(),
conf=conf, error_message="ArithmeticException")


# Test a query that should cause Spark to leverage getShuffleRDD
@ignore_order(local=True)
def test_union_with_filter():
Expand Down
34 changes: 28 additions & 6 deletions integration_tests/src/main/python/time_window_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2021-2022, NVIDIA CORPORATION.
# Copyright (c) 2021-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -21,6 +21,7 @@
from pyspark.sql.types import *
import pyspark.sql.functions as f
from pyspark.sql.window import Window
from spark_session import is_before_spark_330

# do it over a day so we have more chance of overlapping values
_restricted_start = datetime(2020, 1, 1, tzinfo=timezone.utc)
Expand All @@ -43,14 +44,35 @@ def test_grouped_tumbling_window(data_gen):
def test_grouped_sliding_window(data_gen):
row_gen = StructGen([['ts', _restricted_ts_gen],['data', data_gen]], nullable=False)
assert_gpu_and_cpu_are_equal_collect(
lambda spark : gen_df(spark, row_gen).groupBy(f.window('ts', '5 hour', '1 hour')).agg(f.max("data").alias("max_data")))
lambda spark: gen_df(spark, row_gen).groupBy(f.window('ts', '5 hour', '1 hour')).agg(f.max("data").alias("max_data")))

@pytest.mark.parametrize('data_gen', integral_gens + [string_gen], ids=idfn)

@pytest.mark.parametrize('is_ansi_enabled',
[False,
pytest.param(True,
marks=pytest.mark.skipif(
condition=is_before_spark_330(),
reason="Prior to Spark 3.3.0, time interval calculations included "
"multiplication/division. This makes interval operations susceptible "
"to overflow-related exceptions when in ANSI mode. "
"Spark versions >= 3.3.0 do the same calculations via Mod. "
"Running this test in ANSI mode on Spark < 3.3.0 will cause aggregation "
"operations like Product to fall back to CPU. "
"See https://github.com/NVIDIA/spark-rapids/issues/5114."))])
@pytest.mark.parametrize('data_gen', [byte_gen, long_gen, string_gen], ids=idfn)
@ignore_order
def test_grouped_sliding_window_array(data_gen):
row_gen = StructGen([['ts', _restricted_ts_gen],['data', ArrayGen(data_gen)]], nullable=False)
def test_grouped_sliding_window_array(data_gen, is_ansi_enabled):
"""
When in ANSI mode, only valid indices are used.
Tests for accessing arrays with invalid indices are done in array_test.py.
"""
array_gen = ArrayGen(data_gen, min_length=4 if is_ansi_enabled else 0)
conf = {'spark.sql.ansi.enabled': is_ansi_enabled}
row_gen = StructGen([['ts', _restricted_ts_gen], ['data', array_gen]], nullable=False)
assert_gpu_and_cpu_are_equal_collect(
lambda spark : gen_df(spark, row_gen).groupBy(f.window('ts', '5 hour', '1 hour')).agg(f.max(f.col("data")[3]).alias("max_data")))
lambda spark: gen_df(spark, row_gen).groupBy(f.window('ts', '5 hour', '1 hour')).agg(f.max(f.col("data")[3]).alias("max_data")),
conf=conf)


@pytest.mark.parametrize('data_gen', integral_gens + [string_gen], ids=idfn)
@ignore_order
Expand Down

0 comments on commit 125feb2

Please sign in to comment.