Skip to content

Commit

Permalink
Merge branch 'branch-2.4' of https://github.com/apache/spark into bra…
Browse files Browse the repository at this point in the history
…nch-2.4
  • Loading branch information
AngersZhuuuu committed Jan 6, 2021
2 parents ee9f7e7 + 3e6a6b7 commit 52498b6
Show file tree
Hide file tree
Showing 17 changed files with 327 additions and 30 deletions.
1 change: 1 addition & 0 deletions .github/workflows/publish_snapshot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ on:

jobs:
publish-snapshot:
if: github.repository == 'apache/spark'
runs-on: ubuntu-latest
steps:
- name: Checkout Spark repository
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,10 +408,12 @@ public void remove() {
}

private void handleFailedDelete() {
// remove the spill file from disk
File file = spillWriters.removeFirst().getFile();
if (file != null && file.exists() && !file.delete()) {
logger.error("Was unable to delete spill file {}", file.getAbsolutePath());
if (spillWriters.size() > 0) {
// remove the spill file from disk
File file = spillWriters.removeFirst().getFile();
if (file != null && file.exists() && !file.delete()) {
logger.error("Was unable to delete spill file {}", file.getAbsolutePath());
}
}
}
}
Expand Down
40 changes: 40 additions & 0 deletions core/src/main/scala/org/apache/spark/ContextAwareIterator.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

import org.apache.spark.annotation.DeveloperApi

/**
* :: DeveloperApi ::
* A TaskContext aware iterator.
*
* As the Python evaluation consumes the parent iterator in a separate thread,
* it could consume more data from the parent even after the task ends and the parent is closed.
* If an off-heap access exists in the parent iterator, it could cause segmentation fault
* which crashes the executor.
* Thus, we should use [[ContextAwareIterator]] to stop consuming after the task ends.
*/
@DeveloperApi
class ContextAwareIterator[+T](val context: TaskContext, val delegate: Iterator[T])
extends Iterator[T] {

override def hasNext: Boolean =
!context.isCompleted() && !context.isInterrupted() && delegate.hasNext

override def next(): T = delegate.next()
}
23 changes: 17 additions & 6 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -522,10 +522,14 @@ private[spark] class SparkSubmit extends Logging {
OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.queue"),
OptionAssigner(args.numExecutors, YARN, ALL_DEPLOY_MODES,
confKey = "spark.executor.instances"),
OptionAssigner(args.pyFiles, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.pyFiles"),
OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.jars"),
OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.files"),
OptionAssigner(args.archives, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.archives"),
OptionAssigner(args.pyFiles, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.pyFiles",
mergeFn = Some(mergeFileLists(_, _))),
OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.jars",
mergeFn = Some(mergeFileLists(_, _))),
OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.files",
mergeFn = Some(mergeFileLists(_, _))),
OptionAssigner(args.archives, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.archives",
mergeFn = Some(mergeFileLists(_, _))),
OptionAssigner(args.principal, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.principal"),
OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.keytab"),

Expand Down Expand Up @@ -586,7 +590,13 @@ private[spark] class SparkSubmit extends Logging {
(deployMode & opt.deployMode) != 0 &&
(clusterManager & opt.clusterManager) != 0) {
if (opt.clOption != null) { childArgs += (opt.clOption, opt.value) }
if (opt.confKey != null) { sparkConf.set(opt.confKey, opt.value) }
if (opt.confKey != null) {
if (opt.mergeFn.isDefined && sparkConf.contains(opt.confKey)) {
sparkConf.set(opt.confKey, opt.mergeFn.get.apply(sparkConf.get(opt.confKey), opt.value))
} else {
sparkConf.set(opt.confKey, opt.value)
}
}
}
}

Expand Down Expand Up @@ -1345,4 +1355,5 @@ private case class OptionAssigner(
clusterManager: Int,
deployMode: Int,
clOption: String = null,
confKey: String = null)
confKey: String = null,
mergeFn: Option[(String, String) => String] = None)
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,8 @@ public void spillInIterator() throws IOException {
iter2.next();
}
assertFalse(iter2.hasNext());
// calls hasNext twice deliberately, make sure it's idempotent
assertFalse(iter2.hasNext());
} finally {
map.free();
for (File spillFile : spillFilesCreated) {
Expand Down
80 changes: 78 additions & 2 deletions core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@
package org.apache.spark.deploy

import java.io._
import java.net.URI
import java.net.{URI, URL}
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths}

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.io.Source

Expand Down Expand Up @@ -874,6 +873,83 @@ class SparkSubmitSuite
(Set(archive1.toURI.toString, archive2.toURI.toString))
}

test("SPARK-27575: yarn confs should merge new value with existing value") {
val tmpJarDir = Utils.createTempDir()
val jar1 = TestUtils.createJarWithFiles(Map("test.resource" -> "1"), tmpJarDir)
val jar2 = TestUtils.createJarWithFiles(Map("test.resource" -> "USER"), tmpJarDir)

val tmpJarDirYarnOpt = Utils.createTempDir()
val jar1YarnOpt = TestUtils.createJarWithFiles(Map("test.resource" -> "2"), tmpJarDirYarnOpt)
val jar2YarnOpt = TestUtils.createJarWithFiles(Map("test.resource" -> "USER2"),
tmpJarDirYarnOpt)

val tmpFileDir = Utils.createTempDir()
val file1 = File.createTempFile("tmpFile1", "", tmpFileDir)
val file2 = File.createTempFile("tmpFile2", "", tmpFileDir)

val tmpFileDirYarnOpt = Utils.createTempDir()
val file1YarnOpt = File.createTempFile("tmpPy1YarnOpt", ".py", tmpFileDirYarnOpt)
val file2YarnOpt = File.createTempFile("tmpPy2YarnOpt", ".egg", tmpFileDirYarnOpt)

val tmpPyFileDir = Utils.createTempDir()
val pyFile1 = File.createTempFile("tmpPy1", ".py", tmpPyFileDir)
val pyFile2 = File.createTempFile("tmpPy2", ".egg", tmpPyFileDir)

val tmpPyFileDirYarnOpt = Utils.createTempDir()
val pyFile1YarnOpt = File.createTempFile("tmpPy1YarnOpt", ".py", tmpPyFileDirYarnOpt)
val pyFile2YarnOpt = File.createTempFile("tmpPy2YarnOpt", ".egg", tmpPyFileDirYarnOpt)

val tmpArchiveDir = Utils.createTempDir()
val archive1 = File.createTempFile("archive1", ".zip", tmpArchiveDir)
val archive2 = File.createTempFile("archive2", ".zip", tmpArchiveDir)

val tmpArchiveDirYarnOpt = Utils.createTempDir()
val archive1YarnOpt = File.createTempFile("archive1YarnOpt", ".zip", tmpArchiveDirYarnOpt)
val archive2YarnOpt = File.createTempFile("archive2YarnOpt", ".zip", tmpArchiveDirYarnOpt)

val tempPyFile = File.createTempFile("tmpApp", ".py")
tempPyFile.deleteOnExit()

val args = Seq(
"--class", UserClasspathFirstTest.getClass.getName.stripPrefix("$"),
"--name", "testApp",
"--master", "yarn",
"--deploy-mode", "client",
"--jars", s"${tmpJarDir.getAbsolutePath}/*.jar",
"--files", s"${tmpFileDir.getAbsolutePath}/tmpFile*",
"--py-files", s"${tmpPyFileDir.getAbsolutePath}/tmpPy*",
"--archives", s"${tmpArchiveDir.getAbsolutePath}/*.zip",
"--conf", "spark.yarn.dist.files=" +
s"${Seq(file1YarnOpt, file2YarnOpt).map(_.toURI.toString).mkString(",")}",
"--conf", "spark.yarn.dist.pyFiles=" +
s"${Seq(pyFile1YarnOpt, pyFile2YarnOpt).map(_.toURI.toString).mkString(",")}",
"--conf", "spark.yarn.dist.jars=" +
s"${Seq(jar1YarnOpt, jar2YarnOpt).map(_.toURI.toString).mkString(",")}",
"--conf", "spark.yarn.dist.archives=" +
s"${Seq(archive1YarnOpt, archive2YarnOpt).map(_.toURI.toString).mkString(",")}",
tempPyFile.toURI().toString())

def assertEqualsWithURLs(expected: Set[URL], confValue: String): Unit = {
val confValPaths = confValue.split(",").map(new Path(_)).toSet
assert(expected.map(u => new Path(u.toURI)) === confValPaths)
}

def assertEqualsWithFiles(expected: Set[File], confValue: String): Unit = {
assertEqualsWithURLs(expected.map(_.toURI.toURL), confValue)
}

val appArgs = new SparkSubmitArguments(args)
val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs)
assertEqualsWithURLs(
Set(jar1, jar2, jar1YarnOpt, jar2YarnOpt), conf.get("spark.yarn.dist.jars"))
assertEqualsWithFiles(
Set(file1, file2, file1YarnOpt, file2YarnOpt), conf.get("spark.yarn.dist.files"))
assertEqualsWithFiles(
Set(pyFile1, pyFile2, pyFile1YarnOpt, pyFile2YarnOpt), conf.get("spark.yarn.dist.pyFiles"))
assertEqualsWithFiles(Set(archive1, archive2, archive1YarnOpt, archive2YarnOpt),
conf.get("spark.yarn.dist.archives"))
}

// scalastyle:on println

private def checkDownloadedFile(sourcePath: String, outputPath: String): Unit = {
Expand Down
4 changes: 2 additions & 2 deletions dev/deps/spark-deps-hadoop-3.1
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ jersey-container-servlet/2.22.2//jersey-container-servlet-2.22.2.jar
jersey-guava/2.22.2//jersey-guava-2.22.2.jar
jersey-media-jaxb/2.22.2//jersey-media-jaxb-2.22.2.jar
jersey-server/2.22.2//jersey-server-2.22.2.jar
jetty-webapp/9.4.28.v20200408//jetty-webapp-9.4.28.v20200408.jar
jetty-xml/9.4.28.v20200408//jetty-xml-9.4.28.v20200408.jar
jetty-webapp/9.4.34.v20201102//jetty-webapp-9.4.34.v20201102.jar
jetty-xml/9.4.34.v20201102//jetty-xml-9.4.34.v20201102.jar
jline/2.14.6//jline-2.14.6.jar
joda-time/2.9.3//joda-time-2.9.3.jar
jodd-core/3.5.2//jodd-core-3.5.2.jar
Expand Down
6 changes: 6 additions & 0 deletions docs/sql-migration-guide-upgrade.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ displayTitle: Spark SQL Upgrading Guide
* Table of contents
{:toc}

## Upgrading from Spark SQL 2.4.7 to 2.4.8

- In Spark 2.4.8, `AnalysisException` is replaced by its sub-classes that are thrown for tables from Hive external catalog in the following situations:
* `ALTER TABLE .. ADD PARTITION` throws `PartitionsAlreadyExistException` if new partition exists already
* `ALTER TABLE .. DROP PARTITION` throws `NoSuchPartitionsException` for not existing partitions

## Upgrading from Spark SQL 2.4.5 to 2.4.6

- In Spark 2.4.6, the `RESET` command does not reset the static SQL configuration values to the default. It only clears the runtime SQL configuration values.
Expand Down
5 changes: 5 additions & 0 deletions python/run-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,16 @@ def run_individual_python_test(target_dir, test_name, pyspark_python):
tmp_dir = os.path.join(target_dir, str(uuid.uuid4()))
os.mkdir(tmp_dir)
env["TMPDIR"] = tmp_dir
metastore_dir = os.path.join(tmp_dir, str(uuid.uuid4()))
while os.path.isdir(metastore_dir):
metastore_dir = os.path.join(metastore_dir, str(uuid.uuid4()))
os.mkdir(metastore_dir)

# Also override the JVM's temp directory by setting driver and executor options.
spark_args = [
"--conf", "spark.driver.extraJavaOptions=-Djava.io.tmpdir={0}".format(tmp_dir),
"--conf", "spark.executor.extraJavaOptions=-Djava.io.tmpdir={0}".format(tmp_dir),
"--conf", "spark.sql.warehouse.dir='{0}'".format(metastore_dir),
"pyspark-shell"
]
env["PYSPARK_SUBMIT_ARGS"] = " ".join(spark_args)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,14 +344,11 @@ object JoinReorderDP extends PredicateHelper with Logging {
}

def betterThan(other: JoinPlan, conf: SQLConf): Boolean = {
if (other.planCost.card == 0 || other.planCost.size == 0) {
false
} else {
val relativeRows = BigDecimal(this.planCost.card) / BigDecimal(other.planCost.card)
val relativeSize = BigDecimal(this.planCost.size) / BigDecimal(other.planCost.size)
relativeRows * conf.joinReorderCardWeight +
relativeSize * (1 - conf.joinReorderCardWeight) < 1
}
val thisCost = BigDecimal(this.planCost.card) * conf.joinReorderCardWeight +
BigDecimal(this.planCost.size) * (1 - conf.joinReorderCardWeight)
val otherCost = BigDecimal(other.planCost.card) * conf.joinReorderCardWeight +
BigDecimal(other.planCost.size) * (1 - conf.joinReorderCardWeight)
thisCost < otherCost
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
import org.apache.spark.sql.catalyst.optimizer.JoinReorderDP.JoinPlan
import org.apache.spark.sql.catalyst.plans.{Cross, Inner, PlanTest}
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
Expand Down Expand Up @@ -315,4 +316,18 @@ class JoinReorderSuite extends PlanTest with StatsEstimationTestBase {
case (a1, a2) => a1.semanticEquals(a2)
}
}

test("SPARK-33935: betterThan should be consistent") {
val plan1 = JoinPlan(null, null, null, Cost(300, 80))
val plan2 = JoinPlan(null, null, null, Cost(500, 30))

// cost1 = 300*0.7 + 80*0.3 = 234
// cost2 = 500*0.7 + 30*0.3 = 359

assert(!plan1.betterThan(plan1, conf))
assert(!plan2.betterThan(plan2, conf))

assert(plan1.betterThan(plan2, conf))
assert(!plan2.betterThan(plan1, conf))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -293,12 +293,12 @@ class StarJoinCostBasedReorderSuite extends PlanTest with StatsEstimationTestBas
(nameToAttr("f1_fk2") === nameToAttr("d2_pk")))

val expected =
f1.join(d2, Inner, Some(nameToAttr("f1_fk2") === nameToAttr("d2_pk")))
.join(d1, Inner, Some(nameToAttr("f1_fk1") === nameToAttr("d1_pk")))
.join(t3.join(t4, Inner, Some(nameToAttr("t3_c1") === nameToAttr("t4_c1"))), Inner,
Some(nameToAttr("t3_c1") === nameToAttr("t4_c1")))
t3.join(t4, Inner, Some(nameToAttr("t3_c1") === nameToAttr("t4_c1")))
.join(t1.join(t2, Inner, Some(nameToAttr("t1_c1") === nameToAttr("t2_c1"))), Inner,
Some(nameToAttr("t1_c2") === nameToAttr("t4_c2")))
.join(f1
.join(d2, Inner, Some(nameToAttr("f1_fk2") === nameToAttr("d2_pk")))
.join(d1, Inner, Some(nameToAttr("f1_fk1") === nameToAttr("d1_pk"))))
.select(outputsOf(d1, t1, t2, t3, t4, f1, d2): _*)

assertEqualPlans(query, expected)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public static void populate(WritableColumnVector col, InternalRow row, int field
} else {
if (t == DataTypes.BooleanType) {
col.putBooleans(0, capacity, row.getBoolean(fieldIdx));
} else if (t == DataTypes.BinaryType) {
col.putByteArray(0, row.getBinary(fieldIdx));
} else if (t == DataTypes.ByteType) {
col.putBytes(0, capacity, row.getByte(fieldIdx));
} else if (t == DataTypes.ShortType) {
Expand Down Expand Up @@ -94,6 +96,9 @@ public static void populate(WritableColumnVector col, InternalRow row, int field
col.putInts(0, capacity, row.getInt(fieldIdx));
} else if (t instanceof TimestampType) {
col.putLongs(0, capacity, row.getLong(fieldIdx));
} else {
throw new RuntimeException(String.format("DataType %s is not supported" +
" in column vectorized reader.", t.sql()));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.File

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.{ContextAwareIterator, SparkEnv, TaskContext}
import org.apache.spark.api.python.ChainedPythonFunctions
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -88,6 +88,7 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chil

inputRDD.mapPartitions { iter =>
val context = TaskContext.get()
val contextAwareIterator = new ContextAwareIterator(context, iter)

// The queue used to buffer input rows so we can drain it to
// combine input with output from Python.
Expand Down Expand Up @@ -119,7 +120,7 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chil
})

// Add rows to queue to join later with the result.
val projectedRowIter = iter.map { inputRow =>
val projectedRowIter = contextAwareIterator.map { inputRow =>
queue.add(inputRow.asInstanceOf[UnsafeRow])
projection(inputRow)
}
Expand Down
26 changes: 26 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3114,6 +3114,32 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
}
}

test("SPARK-33593: Vector reader got incorrect data with binary partition value") {
Seq("false", "true").foreach(value => {
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> value) {
withTable("t1") {
sql(
"""CREATE TABLE t1(name STRING, id BINARY, part BINARY)
|USING PARQUET PARTITIONED BY (part)""".stripMargin)
sql("INSERT INTO t1 PARTITION(part = 'Spark SQL') VALUES('a', X'537061726B2053514C')")
checkAnswer(sql("SELECT name, cast(id as string), cast(part as string) FROM t1"),
Row("a", "Spark SQL", "Spark SQL"))
}
}

withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> value) {
withTable("t2") {
sql(
"""CREATE TABLE t2(name STRING, id BINARY, part BINARY)
|USING ORC PARTITIONED BY (part)""".stripMargin)
sql("INSERT INTO t2 PARTITION(part = 'Spark SQL') VALUES('a', X'537061726B2053514C')")
checkAnswer(sql("SELECT name, cast(id as string), cast(part as string) FROM t2"),
Row("a", "Spark SQL", "Spark SQL"))
}
}
})
}
}

case class Foo(bar: Option[String])
Loading

0 comments on commit 52498b6

Please sign in to comment.