Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-3971][SQL] Backport #2843 to branch-1.1 #3113

Closed
wants to merge 7 commits into from
16 changes: 7 additions & 9 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@ package org.apache.spark.util
import java.io._
import java.net._
import java.nio.ByteBuffer
import java.util.{Properties, Locale, Random, UUID}
import java.util.concurrent.{ThreadFactory, ConcurrentHashMap, Executors, ThreadPoolExecutor}

import org.apache.log4j.PropertyConfigurator
import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadFactory, ThreadPoolExecutor}
import java.util.{Locale, Properties, Random, UUID}

import scala.collection.JavaConversions._
import scala.collection.Map
Expand All @@ -37,9 +35,10 @@ import com.google.common.io.{ByteStreams, Files}
import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.commons.lang3.SystemUtils
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.apache.log4j.PropertyConfigurator
import org.eclipse.jetty.util.MultiException
import org.json4s._
import tachyon.client.{TachyonFile,TachyonFS}
import tachyon.client.{TachyonFS, TachyonFile}

import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
Expand Down Expand Up @@ -952,8 +951,8 @@ private[spark] object Utils extends Logging {
*/
def getCallSite(skipClass: String => Boolean = coreExclusionFunction): CallSite = {
val trace = Thread.currentThread.getStackTrace()
.filterNot { ste:StackTraceElement =>
// When running under some profilers, the current stack trace might contain some bogus
.filterNot { ste:StackTraceElement =>
// When running under some profilers, the current stack trace might contain some bogus
// frames. This is intended to ensure that we don't crash in these situations by
// ignoring any frames that we can't examine.
(ste == null || ste.getMethodName == null || ste.getMethodName.contains("getStackTrace"))
Expand Down Expand Up @@ -1354,7 +1353,7 @@ private[spark] object Utils extends Logging {
}
}

/**
/**
* Execute the given block, logging and re-throwing any uncaught exception.
* This is particularly useful for wrapping code that runs in a thread, to ensure
* that exceptions are printed, and to avoid having to catch Throwable.
Expand Down Expand Up @@ -1551,7 +1550,6 @@ private[spark] object Utils extends Logging {
"%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n")
PropertyConfigurator.configure(pro)
}

}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this file touched only for whitespace?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, these changes were introduced while resolving merge conflicts. Will revert them.

/**
Expand Down
5 changes: 5 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
protected[sql] def executePlan(plan: LogicalPlan): this.QueryExecution =
new this.QueryExecution { val logical = plan }

sparkContext.getConf.getAll.foreach {
case (key, value) if key.startsWith("spark.sql") => setConf(key, value)
case _ =>
}

/**
* :: DeveloperApi ::
* Allows catalyst LogicalPlans to be executed as a SchemaRDD. Note that the LogicalPlan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,43 +48,35 @@ case class SetCommand(
extends LeafNode with Command with Logging {

override protected[sql] lazy val sideEffectResult: Seq[String] = (key, value) match {
// Set value for key k.
case (Some(k), Some(v)) =>
if (k == SQLConf.Deprecated.MAPRED_REDUCE_TASKS) {
logWarning(s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
// Configures the deprecated "mapred.reduce.tasks" property.
case (Some(SQLConf.Deprecated.MAPRED_REDUCE_TASKS), Some(v)) =>
logWarning(
s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS} instead.")
context.setConf(SQLConf.SHUFFLE_PARTITIONS, v)
Array(s"${SQLConf.SHUFFLE_PARTITIONS}=$v")
} else {
context.setConf(k, v)
Array(s"$k=$v")
}

// Query the value bound to key k.
case (Some(k), _) =>
// TODO (lian) This is just a workaround to make the Simba ODBC driver work.
// Should remove this once we get the ODBC driver updated.
if (k == "-v") {
val hiveJars = Seq(
"hive-exec-0.12.0.jar",
"hive-service-0.12.0.jar",
"hive-common-0.12.0.jar",
"hive-hwi-0.12.0.jar",
"hive-0.12.0.jar").mkString(":")

Array(
"system:java.class.path=" + hiveJars,
"system:sun.java.command=shark.SharkServer2")
}
else {
Array(s"$k=${context.getConf(k, "<undefined>")}")
}

// Query all key-value pairs that are set in the SQLConf of the context.
case (None, None) =>
context.getAllConfs.map { case (k, v) =>
s"$k=$v"
}.toSeq
context.setConf(SQLConf.SHUFFLE_PARTITIONS, v)
Seq(s"${SQLConf.SHUFFLE_PARTITIONS}=$v")

// Configures a single property.
case (Some(k), Some(v)) =>
context.setConf(k, v)
Seq(s"$k=$v")

// Queries all key-value pairs that are set in the SQLConf of the context. Notice that different
// from Hive, here "SET -v" is an alias of "SET". (In Hive, "SET" returns all changed properties
// while "SET -v" returns all properties.)
case (Some("-v") | None, None) =>
context.getAllConfs.map { case (k, v) => s"$k=$v" }.toSeq

// Queries the deprecated "mapred.reduce.tasks" property.
case (Some(SQLConf.Deprecated.MAPRED_REDUCE_TASKS), None) =>
logWarning(
s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
s"showing ${SQLConf.SHUFFLE_PARTITIONS} instead.")
Seq(s"${SQLConf.SHUFFLE_PARTITIONS}=${context.numShufflePartitions}")

// Queries a single property.
case (Some(k), None) =>
Seq(s"$k=${context.getConf(k, "<undefined>")}")

case _ =>
throw new IllegalArgumentException()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,18 @@

package org.apache.spark.sql.test

import org.apache.spark.sql.{SQLConf, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext

/** A SQLContext that can be used for local testing. */
object TestSQLContext
extends SQLContext(new SparkContext("local", "TestSQLContext", new SparkConf()))
extends SQLContext(
new SparkContext(
"local[2]",
"TestSQLContext",
new SparkConf().set("spark.sql.testkey", "true"))) {

/** Fewer partitions to speed up testing. */
override private[spark] def numShufflePartitions: Int =
getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt
}
11 changes: 10 additions & 1 deletion sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,25 @@

package org.apache.spark.sql

import org.scalatest.FunSuiteLike

import org.apache.spark.sql.test._

/* Implicits */
import TestSQLContext._

class SQLConfSuite extends QueryTest {
class SQLConfSuite extends QueryTest with FunSuiteLike {

val testKey = "test.key.0"
val testVal = "test.val.0"

test("propagate from spark conf") {
// We create a new context here to avoid order dependence with other tests that might call
// clear().
val newContext = new SQLContext(TestSQLContext.sparkContext)
assert(newContext.getConf("spark.sql.testkey", "false") == "true")
}

test("programmatic ways of basic setting and getting") {
clear()
assert(getAllConfs.size === 0)
Expand Down
Loading