Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 4: Fix miscellaneous tests including logic, repart, hive_delimited. [databricks] #11129

Merged
Merged
6 changes: 5 additions & 1 deletion integration_tests/src/main/python/datasourcev2_read_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2021, NVIDIA CORPORATION.
# Copyright (c) 2021-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -41,12 +41,16 @@ def test_read_all_types():
readTable("int,bool,byte,short,long,string,float,double,date,timestamp", columnarClass),
conf={'spark.rapids.sql.castFloatToString.enabled': 'true'})


@disable_ansi_mode # Cannot run in ANSI mode until COUNT aggregation is supported.
# See https://github.com/NVIDIA/spark-rapids/issues/5114
@validate_execs_in_gpu_plan('HostColumnarToGpu')
def test_read_all_types_count():
assert_gpu_and_cpu_row_counts_equal(
readTable("int,bool,byte,short,long,string,float,double,date,timestamp", columnarClass),
conf={'spark.rapids.sql.castFloatToString.enabled': 'true'})


@validate_execs_in_gpu_plan('HostColumnarToGpu')
def test_read_arrow_off():
assert_gpu_and_cpu_are_equal_collect(
Expand Down
8 changes: 6 additions & 2 deletions integration_tests/src/main/python/expand_exec_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@

from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_are_equal_sql
from data_gen import *
import pyspark.sql.functions as f
from marks import ignore_order
from marks import disable_ansi_mode, ignore_order
from pyspark.sql import functions as f

@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
# Many Spark versions have issues sorting large decimals,
Expand All @@ -38,6 +38,10 @@ def op_df(spark, length=2048):
"select count(b), count(c) from pre_pro group by cube((a+b), if((a+b)>100, c, null))",
"select count(b), count(c) from pre_pro group by rollup((a+b), if((a+b)>100, c, null))"]


@disable_ansi_mode # Cannot run in ANSI mode until COUNT aggregation is supported.
# See https://github.com/NVIDIA/spark-rapids/issues/5114
# Additionally, this test should protect against overflow on (a+b).
@ignore_order(local=True)
@pytest.mark.parametrize('sql', pre_pro_sqls, ids=["distinct_agg", "cube", "rollup"])
def test_expand_pre_project(sql):
Expand Down
6 changes: 5 additions & 1 deletion integration_tests/src/main/python/get_json_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
from data_gen import *
from pyspark.sql.types import *
from marks import *
from spark_session import is_databricks113_or_later, is_databricks_runtime
from spark_init_internal import spark_version
from spark_session import is_before_spark_400, is_databricks113_or_later, is_databricks_runtime

def mk_json_str_gen(pattern):
return StringGen(pattern).with_special_case('').with_special_pattern('.{0,10}')
Expand Down Expand Up @@ -126,6 +127,9 @@ def test_get_json_object_normalize_non_string_output():
f.get_json_object('jsonStr', '$')),
conf={'spark.rapids.sql.expression.GetJsonObject': 'true'})


@pytest.mark.skipif(condition=not is_before_spark_400(),
reason="https://github.com/NVIDIA/spark-rapids/issues/11130")
def test_get_json_object_quoted_question():
schema = StructType([StructField("jsonStr", StringType())])
data = [[r'{"?":"QUESTION"}']]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2022-2023, NVIDIA CORPORATION.
# Copyright (c) 2022-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -425,6 +425,8 @@ def create_hive_table_with_custom_timestamp_format(spark, property_location):
conf=hive_text_enabled_conf)


@disable_ansi_mode # Cannot run in ANSI mode until COUNT aggregation is supported.
# See https://github.com/NVIDIA/spark-rapids/issues/5114
@pytest.mark.skipif(is_spark_cdh(),
reason="Hive text reads are disabled on CDH, as per "
"https://github.com/NVIDIA/spark-rapids/pull/7628")
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/src/main/python/json_matrix_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -723,8 +723,8 @@ def test_from_json_decs(std_input_path, input_file, dt):
pytest.param("boolean_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10479')),
pytest.param("invalid_ridealong_columns.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/10534')),
pytest.param("int_array_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15278')),
"int_struct_formatted.json",
pytest.param("int_mixed_array_struct_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15278'))])
pytest.param("int_struct_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/rapidsai/cudf/issues/15278')),
pytest.param("int_mixed_array_struct_formatted.json", marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/11154'))])
@pytest.mark.parametrize('read_func', [read_json_df])
def test_scan_json_strings(std_input_path, read_func, spark_tmp_table_factory, input_file):
assert_gpu_and_cpu_are_equal_collect(
Expand Down
10 changes: 7 additions & 3 deletions integration_tests/src/main/python/logic_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2022, NVIDIA CORPORATION.
# Copyright (c) 2020-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -16,7 +16,7 @@

from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_error
from data_gen import *
from spark_session import is_before_spark_330
from spark_session import is_before_spark_330, is_before_spark_400
from marks import incompat, approximate_float
from pyspark.sql.types import *
import pyspark.sql.functions as f
Expand Down Expand Up @@ -74,12 +74,16 @@ def do_it(spark, lhs_bool_arg, arith_arg, op):
ansi_conf = {'spark.sql.ansi.enabled': ansi_enabled}
bypass_map = {'AND': 'a', 'OR': 'b'}
expect_error = int_arg == INT_MAX and (lhs_arg == 'NULL' or bypass_map[logic_op] != lhs_arg)
exception = \
"java.lang.ArithmeticException" if is_before_spark_330() else \
"SparkArithmeticException" if is_before_spark_400() else \
"pyspark.errors.exceptions.captured.ArithmeticException"

if ansi_enabled == 'true' and expect_error:
assert_gpu_and_cpu_error(
df_fun=lambda spark: do_it(spark, lhs_arg, int_arg, logic_op).collect(),
conf=ansi_conf,
error_message="java.lang.ArithmeticException" if is_before_spark_330() else "SparkArithmeticException")
error_message=exception)
else:
assert_gpu_and_cpu_are_equal_collect(
func=lambda spark: do_it(spark, lhs_arg, int_arg, logic_op),
Expand Down
7 changes: 6 additions & 1 deletion integration_tests/src/main/python/misc_expr_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2022, NVIDIA CORPORATION.
# Copyright (c) 2020-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,7 @@
from marks import incompat, approximate_float
from pyspark.sql.types import *
import pyspark.sql.functions as f
from spark_session import is_before_spark_400

def test_mono_id():
assert_gpu_and_cpu_are_equal_collect(
Expand All @@ -32,6 +33,10 @@ def test_part_id():
f.col('a'),
f.spark_partition_id()))


@pytest.mark.skipif(condition=not is_before_spark_400(),
reason="raise_error() not currently implemented for Spark 4.0. "
"See https://github.com/NVIDIA/spark-rapids/issues/10107.")
def test_raise_error():
data_gen = ShortGen(nullable=False, min_val=0, max_val=20, special_cases=[])
assert_gpu_and_cpu_are_equal_collect(
Expand Down
11 changes: 8 additions & 3 deletions integration_tests/src/main/python/orc_write_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import pytest

from asserts import assert_gpu_and_cpu_writes_are_equal_collect, assert_gpu_fallback_write
from spark_session import is_before_spark_320, is_spark_321cdh, is_spark_cdh, with_cpu_session, with_gpu_session
from spark_session import is_before_spark_320, is_before_spark_400, is_spark_321cdh, is_spark_cdh, with_cpu_session, with_gpu_session
from conftest import is_not_utc
from datetime import date, datetime, timezone
from data_gen import *
Expand Down Expand Up @@ -357,6 +357,11 @@ def test_orc_write_column_name_with_dots(spark_tmp_path):
@ignore_order
def test_orc_do_not_lowercase_columns(spark_tmp_path):
data_path = spark_tmp_path + "/ORC_DATA"

# The wording of the `is not exists` error message in Spark 4.x is unfortunate, but accurate:
# https://github.com/apache/spark/blob/4501285a49e4c0429c9cf2c105f044e1c8a93d21/python/pyspark/errors/error-conditions.json#L487
expected_error_message = "No StructField named acol" if is_before_spark_400() else \
"Key `acol` is not exists."
assert_gpu_and_cpu_writes_are_equal_collect(
# column is uppercase
lambda spark, path: spark.range(0, 1000).select(col("id").alias("Acol")).write.orc(path),
Expand All @@ -367,10 +372,10 @@ def test_orc_do_not_lowercase_columns(spark_tmp_path):
with_cpu_session(lambda spark: spark.read.orc(data_path + "/CPU").schema["acol"])
assert False
except KeyError as e:
assert "No StructField named acol" in str(e)
assert expected_error_message in str(e)
try:
# reading lowercase causes exception
with_gpu_session(lambda spark: spark.read.orc(data_path + "/GPU").schema["acol"])
assert False
except KeyError as e:
assert "No StructField named acol" in str(e)
assert expected_error_message in str(e)
45 changes: 42 additions & 3 deletions integration_tests/src/main/python/repart_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2023, NVIDIA CORPORATION.
# Copyright (c) 2020-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -14,7 +14,7 @@

import pytest

from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_fallback_collect
from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_error, assert_gpu_fallback_collect
from spark_session import is_before_spark_320, is_before_spark_330
from conftest import is_not_utc
from data_gen import *
Expand Down Expand Up @@ -266,7 +266,6 @@ def test_hash_fallback(data_gen):
([('a', long_gen), ('b', StructGen([('b1', long_gen)]))], ['a']),
([('a', long_gen), ('b', ArrayGen(long_gen, max_length=2))], ['a']),
([('a', byte_gen)], [f.col('a') - 5]),
([('a', long_gen)], [f.col('a') + 15]),
([('a', ArrayGen(long_gen, max_length=2)), ('b', long_gen)], ['a']),
([('a', StructGen([('aa', ArrayGen(long_gen, max_length=2))])), ('b', long_gen)], ['a']),
([('a', byte_gen), ('b', boolean_gen)], ['a', 'b']),
Expand All @@ -291,6 +290,46 @@ def test_hash_repartition_exact(gen, num_parts):
.withColumn('hashed', f.hash(*part_on))\
.selectExpr('*', 'pmod(hashed, {})'.format(num_parts)))


@ignore_order(local=True) # To avoid extra data shuffle by 'sort on Spark' for this repartition test.
@pytest.mark.parametrize('num_parts', [1, 2, 10, 17, 19, 32], ids=idfn)
@pytest.mark.parametrize('is_ansi_mode', [False, True], ids=idfn)
@allow_non_gpu(*non_utc_allow)
def test_hash_repartition_exact_longs_no_overflow(num_parts, is_ansi_mode):
gen = LongGen(min_val=-1000, max_val=1000, special_cases=[]) if is_ansi_mode else long_gen
data_gen = [('a', gen)]
part_on = [f.col('a') + 15]
conf = {'spark.sql.ansi.enabled': is_ansi_mode}

assert_gpu_and_cpu_are_equal_collect(
lambda spark: gen_df(spark, data_gen, length=1024)
.repartition(num_parts, *part_on)
.withColumn('id', f.spark_partition_id())
.withColumn('hashed', f.hash(*part_on))
.selectExpr('*', 'pmod(hashed, {})'.format(num_parts)), conf=conf)


@ignore_order(local=True) # To avoid extra data shuffle by 'sort on Spark' for this repartition test.
@pytest.mark.parametrize('num_parts', [17], ids=idfn)
@allow_non_gpu(*non_utc_allow)
def test_hash_repartition_long_overflow_ansi_exception(num_parts):
data_gen = [('a', long_gen)]
part_on = [f.col('a') + 15]
conf = ansi_enabled_conf

def test_function(spark):
return gen_df(spark, data_gen, length=1024) \
.withColumn('plus15', f.col('a') + 15) \
.repartition(num_parts, f.col('plus15')) \
.withColumn('id', f.spark_partition_id()) \
.withColumn('hashed', f.hash(*part_on)) \
.selectExpr('*', 'pmod(hashed, {})'.format(num_parts))

assert_gpu_and_cpu_error(
lambda spark: test_function(spark).collect(),
conf=conf, error_message="ArithmeticException")


# Test a query that should cause Spark to leverage getShuffleRDD
@ignore_order(local=True)
def test_union_with_filter():
Expand Down
34 changes: 28 additions & 6 deletions integration_tests/src/main/python/time_window_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2021-2022, NVIDIA CORPORATION.
# Copyright (c) 2021-2024, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -21,6 +21,7 @@
from pyspark.sql.types import *
import pyspark.sql.functions as f
from pyspark.sql.window import Window
from spark_session import is_before_spark_330

# do it over a day so we have more chance of overlapping values
_restricted_start = datetime(2020, 1, 1, tzinfo=timezone.utc)
Expand All @@ -43,14 +44,35 @@ def test_grouped_tumbling_window(data_gen):
def test_grouped_sliding_window(data_gen):
row_gen = StructGen([['ts', _restricted_ts_gen],['data', data_gen]], nullable=False)
assert_gpu_and_cpu_are_equal_collect(
lambda spark : gen_df(spark, row_gen).groupBy(f.window('ts', '5 hour', '1 hour')).agg(f.max("data").alias("max_data")))
lambda spark: gen_df(spark, row_gen).groupBy(f.window('ts', '5 hour', '1 hour')).agg(f.max("data").alias("max_data")))

@pytest.mark.parametrize('data_gen', integral_gens + [string_gen], ids=idfn)

@pytest.mark.parametrize('is_ansi_enabled',
[False,
pytest.param(True,
marks=pytest.mark.skipif(
condition=is_before_spark_330(),
reason="Prior to Spark 3.3.0, time interval calculations included "
"multiplication/division. This makes interval operations susceptible "
"to overflow-related exceptions when in ANSI mode. "
"Spark versions >= 3.3.0 do the same calculations via Mod. "
"Running this test in ANSI mode on Spark < 3.3.0 will cause aggregation "
"operations like Product to fall back to CPU. "
"See https://github.com/NVIDIA/spark-rapids/issues/5114."))])
@pytest.mark.parametrize('data_gen', [byte_gen, long_gen, string_gen], ids=idfn)
@ignore_order
def test_grouped_sliding_window_array(data_gen):
row_gen = StructGen([['ts', _restricted_ts_gen],['data', ArrayGen(data_gen)]], nullable=False)
def test_grouped_sliding_window_array(data_gen, is_ansi_enabled):
"""
When in ANSI mode, only valid indices are used.
Tests for accessing arrays with invalid indices are done in array_test.py.
"""
array_gen = ArrayGen(data_gen, min_length=4 if is_ansi_enabled else 0)
conf = {'spark.sql.ansi.enabled': is_ansi_enabled}
row_gen = StructGen([['ts', _restricted_ts_gen], ['data', array_gen]], nullable=False)
assert_gpu_and_cpu_are_equal_collect(
lambda spark : gen_df(spark, row_gen).groupBy(f.window('ts', '5 hour', '1 hour')).agg(f.max(f.col("data")[3]).alias("max_data")))
lambda spark: gen_df(spark, row_gen).groupBy(f.window('ts', '5 hour', '1 hour')).agg(f.max(f.col("data")[3]).alias("max_data")),
conf=conf)


@pytest.mark.parametrize('data_gen', integral_gens + [string_gen], ids=idfn)
@ignore_order
Expand Down