Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22159][SQL][FOLLOW-UP] Make config names consistently end with "enabled". #19462

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1878,7 +1878,7 @@ def toPandas(self):
1 5 Bob
"""
import pandas as pd
if self.sql_ctx.getConf("spark.sql.execution.arrow.enable", "false").lower() == "true":
if self.sql_ctx.getConf("spark.sql.execution.arrow.enabled", "false").lower() == "true":
try:
import pyarrow
tables = self._collectAsArrow()
Expand All @@ -1889,7 +1889,7 @@ def toPandas(self):
return pd.DataFrame.from_records([], columns=self.columns)
except ImportError as e:
msg = "note: pyarrow must be installed and available on calling Python process " \
"if using spark.sql.execution.arrow.enable=true"
"if using spark.sql.execution.arrow.enabled=true"
raise ImportError("%s\n%s" % (e.message, msg))
else:
pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
Expand Down
6 changes: 3 additions & 3 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -3088,7 +3088,7 @@ class ArrowTests(ReusedPySparkTestCase):
def setUpClass(cls):
ReusedPySparkTestCase.setUpClass()
cls.spark = SparkSession(cls.sc)
cls.spark.conf.set("spark.sql.execution.arrow.enable", "true")
cls.spark.conf.set("spark.sql.execution.arrow.enabled", "true")
cls.schema = StructType([
StructField("1_str_t", StringType(), True),
StructField("2_int_t", IntegerType(), True),
Expand Down Expand Up @@ -3120,9 +3120,9 @@ def test_null_conversion(self):

def test_toPandas_arrow_toggle(self):
df = self.spark.createDataFrame(self.data, schema=self.schema)
self.spark.conf.set("spark.sql.execution.arrow.enable", "false")
self.spark.conf.set("spark.sql.execution.arrow.enabled", "false")
pdf = df.toPandas()
self.spark.conf.set("spark.sql.execution.arrow.enable", "true")
self.spark.conf.set("spark.sql.execution.arrow.enabled", "true")
pdf_arrow = df.toPandas()
self.assertFramesEqual(pdf_arrow, pdf)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ case class HashAggregateExec(
private def enableTwoLevelHashMap(ctx: CodegenContext) = {
if (!checkIfFastHashMapSupported(ctx)) {
if (modes.forall(mode => mode == Partial || mode == PartialMerge) && !Utils.isTesting) {
logInfo("spark.sql.codegen.aggregate.map.twolevel.enable is set to true, but"
logInfo("spark.sql.codegen.aggregate.map.twolevel.enabled is set to true, but"
+ " current version of codegened fast hashmap does not support this aggregate.")
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,29 +24,29 @@ import org.apache.spark.SparkConf
class SingleLevelAggregateHashMapSuite extends DataFrameAggregateSuite with BeforeAndAfter {
override protected def sparkConf: SparkConf = super.sparkConf
.set("spark.sql.codegen.fallback", "false")
.set("spark.sql.codegen.aggregate.map.twolevel.enable", "false")
.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "false")

// adding some checking after each test is run, assuring that the configs are not changed
// in test code
after {
assert(sparkConf.get("spark.sql.codegen.fallback") == "false",
"configuration parameter changed in test body")
assert(sparkConf.get("spark.sql.codegen.aggregate.map.twolevel.enable") == "false",
assert(sparkConf.get("spark.sql.codegen.aggregate.map.twolevel.enabled") == "false",
"configuration parameter changed in test body")
}
}

class TwoLevelAggregateHashMapSuite extends DataFrameAggregateSuite with BeforeAndAfter {
override protected def sparkConf: SparkConf = super.sparkConf
.set("spark.sql.codegen.fallback", "false")
.set("spark.sql.codegen.aggregate.map.twolevel.enable", "true")
.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "true")

// adding some checking after each test is run, assuring that the configs are not changed
// in test code
after {
assert(sparkConf.get("spark.sql.codegen.fallback") == "false",
"configuration parameter changed in test body")
assert(sparkConf.get("spark.sql.codegen.aggregate.map.twolevel.enable") == "true",
assert(sparkConf.get("spark.sql.codegen.aggregate.map.twolevel.enabled") == "true",
"configuration parameter changed in test body")
}
}
Expand All @@ -57,15 +57,15 @@ class TwoLevelAggregateHashMapWithVectorizedMapSuite

override protected def sparkConf: SparkConf = super.sparkConf
.set("spark.sql.codegen.fallback", "false")
.set("spark.sql.codegen.aggregate.map.twolevel.enable", "true")
.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "true")
.set("spark.sql.codegen.aggregate.map.vectorized.enable", "true")

// adding some checking after each test is run, assuring that the configs are not changed
// in test code
after {
assert(sparkConf.get("spark.sql.codegen.fallback") == "false",
"configuration parameter changed in test body")
assert(sparkConf.get("spark.sql.codegen.aggregate.map.twolevel.enable") == "true",
assert(sparkConf.get("spark.sql.codegen.aggregate.map.twolevel.enabled") == "true",
"configuration parameter changed in test body")
assert(sparkConf.get("spark.sql.codegen.aggregate.map.vectorized.enable") == "true",
"configuration parameter changed in test body")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,14 @@ class AggregateBenchmark extends BenchmarkBase {

benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { iter =>
sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enable", "false")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "false")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "false")
f()
}

benchmark.addCase(s"codegen = T hashmap = T", numIters = 5) { iter =>
sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enable", "true")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "true")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "true")
f()
}
Expand Down Expand Up @@ -149,14 +149,14 @@ class AggregateBenchmark extends BenchmarkBase {

benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { iter =>
sparkSession.conf.set("spark.sql.codegen.wholeStage", value = true)
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enable", "false")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "false")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "false")
f()
}

benchmark.addCase(s"codegen = T hashmap = T", numIters = 5) { iter =>
sparkSession.conf.set("spark.sql.codegen.wholeStage", value = true)
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enable", "true")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "true")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "true")
f()
}
Expand Down Expand Up @@ -189,14 +189,14 @@ class AggregateBenchmark extends BenchmarkBase {

benchmark.addCase(s"codegen = T hashmap = F", numIters = 3) { iter =>
sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enable", "false")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "false")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "false")
f()
}

benchmark.addCase(s"codegen = T hashmap = T", numIters = 5) { iter =>
sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enable", "true")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "true")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "true")
f()
}
Expand Down Expand Up @@ -228,14 +228,14 @@ class AggregateBenchmark extends BenchmarkBase {

benchmark.addCase(s"codegen = T hashmap = F") { iter =>
sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enable", "false")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "false")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "false")
f()
}

benchmark.addCase(s"codegen = T hashmap = T") { iter =>
sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enable", "true")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "true")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "true")
f()
}
Expand Down Expand Up @@ -277,14 +277,14 @@ class AggregateBenchmark extends BenchmarkBase {

benchmark.addCase(s"codegen = T hashmap = F") { iter =>
sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enable", "false")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "false")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "false")
f()
}

benchmark.addCase(s"codegen = T hashmap = T") { iter =>
sparkSession.conf.set("spark.sql.codegen.wholeStage", "true")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enable", "true")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.twolevel.enabled", "true")
sparkSession.conf.set("spark.sql.codegen.aggregate.map.vectorized.enable", "true")
f()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1015,7 +1015,7 @@ class HashAggregationQueryWithControlledFallbackSuite extends AggregationQuerySu

override protected def checkAnswer(actual: => DataFrame, expectedAnswer: Seq[Row]): Unit = {
Seq("true", "false").foreach { enableTwoLevelMaps =>
withSQLConf("spark.sql.codegen.aggregate.map.twolevel.enable" ->
withSQLConf("spark.sql.codegen.aggregate.map.twolevel.enabled" ->
enableTwoLevelMaps) {
(1 to 3).foreach { fallbackStartsAt =>
withSQLConf("spark.sql.TungstenAggregate.testFallbackStartsAt" ->
Expand Down