Skip to content

Commit

Permalink
Provides Spark and Hive version in HiveThriftServer2 for branch-1.1
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed Nov 9, 2014
1 parent 0c2a244 commit d354161
Show file tree
Hide file tree
Showing 7 changed files with 182 additions and 150 deletions.
16 changes: 7 additions & 9 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@ package org.apache.spark.util
import java.io._
import java.net._
import java.nio.ByteBuffer
import java.util.{Properties, Locale, Random, UUID}
import java.util.concurrent.{ThreadFactory, ConcurrentHashMap, Executors, ThreadPoolExecutor}

import org.apache.log4j.PropertyConfigurator
import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadFactory, ThreadPoolExecutor}
import java.util.{Locale, Properties, Random, UUID}

import scala.collection.JavaConversions._
import scala.collection.Map
Expand All @@ -37,9 +35,10 @@ import com.google.common.io.{ByteStreams, Files}
import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.commons.lang3.SystemUtils
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.apache.log4j.PropertyConfigurator
import org.eclipse.jetty.util.MultiException
import org.json4s._
import tachyon.client.{TachyonFile,TachyonFS}
import tachyon.client.{TachyonFS, TachyonFile}

import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
Expand Down Expand Up @@ -952,8 +951,8 @@ private[spark] object Utils extends Logging {
*/
def getCallSite(skipClass: String => Boolean = coreExclusionFunction): CallSite = {
val trace = Thread.currentThread.getStackTrace()
.filterNot { ste:StackTraceElement =>
// When running under some profilers, the current stack trace might contain some bogus
.filterNot { ste:StackTraceElement =>
// When running under some profilers, the current stack trace might contain some bogus
// frames. This is intended to ensure that we don't crash in these situations by
// ignoring any frames that we can't examine.
(ste == null || ste.getMethodName == null || ste.getMethodName.contains("getStackTrace"))
Expand Down Expand Up @@ -1354,7 +1353,7 @@ private[spark] object Utils extends Logging {
}
}

/**
/**
* Execute the given block, logging and re-throwing any uncaught exception.
* This is particularly useful for wrapping code that runs in a thread, to ensure
* that exceptions are printed, and to avoid having to catch Throwable.
Expand Down Expand Up @@ -1551,7 +1550,6 @@ private[spark] object Utils extends Logging {
"%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n")
PropertyConfigurator.configure(pro)
}

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,43 +48,35 @@ case class SetCommand(
extends LeafNode with Command with Logging {

override protected[sql] lazy val sideEffectResult: Seq[String] = (key, value) match {
// Set value for key k.
case (Some(k), Some(v)) =>
if (k == SQLConf.Deprecated.MAPRED_REDUCE_TASKS) {
logWarning(s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
// Configures the deprecated "mapred.reduce.tasks" property.
case (Some(SQLConf.Deprecated.MAPRED_REDUCE_TASKS), Some(v)) =>
logWarning(
s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS} instead.")
context.setConf(SQLConf.SHUFFLE_PARTITIONS, v)
Array(s"${SQLConf.SHUFFLE_PARTITIONS}=$v")
} else {
context.setConf(k, v)
Array(s"$k=$v")
}

// Query the value bound to key k.
case (Some(k), _) =>
// TODO (lian) This is just a workaround to make the Simba ODBC driver work.
// Should remove this once we get the ODBC driver updated.
if (k == "-v") {
val hiveJars = Seq(
"hive-exec-0.12.0.jar",
"hive-service-0.12.0.jar",
"hive-common-0.12.0.jar",
"hive-hwi-0.12.0.jar",
"hive-0.12.0.jar").mkString(":")

Array(
"system:java.class.path=" + hiveJars,
"system:sun.java.command=shark.SharkServer2")
}
else {
Array(s"$k=${context.getConf(k, "<undefined>")}")
}

// Query all key-value pairs that are set in the SQLConf of the context.
case (None, None) =>
context.getAllConfs.map { case (k, v) =>
s"$k=$v"
}.toSeq
context.setConf(SQLConf.SHUFFLE_PARTITIONS, v)
Seq(s"${SQLConf.SHUFFLE_PARTITIONS}=$v")

// Configures a single property.
case (Some(k), Some(v)) =>
context.setConf(k, v)
Seq(s"$k=$v")

// Queries all key-value pairs that are set in the SQLConf of the context. Notice that different
// from Hive, here "SET -v" is an alias of "SET". (In Hive, "SET" returns all changed properties
// while "SET -v" returns all properties.)
case (Some("-v") | None, None) =>
context.getAllConfs.map { case (k, v) => s"$k=$v" }.toSeq

// Queries the deprecated "mapred.reduce.tasks" property.
case (Some(SQLConf.Deprecated.MAPRED_REDUCE_TASKS), None) =>
logWarning(
s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
s"showing ${SQLConf.SHUFFLE_PARTITIONS} instead.")
Seq(s"${SQLConf.SHUFFLE_PARTITIONS}=${context.numShufflePartitions}")

// Queries a single property.
case (Some(k), None) =>
Seq(s"$k=${context.getConf(k, "<undefined>")}")

case _ =>
throw new IllegalArgumentException()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@

package org.apache.spark.sql.hive.thriftserver

import scala.collection.JavaConversions._

import java.io.IOException
import java.util.{List => JList}
import javax.security.auth.login.LoginException

import scala.collection.JavaConversions._

import org.apache.commons.logging.Log
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.shims.ShimLoader
import org.apache.hive.service.Service.STATE
import org.apache.hive.service.auth.HiveAuthFactory
import org.apache.hive.service.cli.CLIService
import org.apache.hive.service.cli._
import org.apache.hive.service.{AbstractService, Service, ServiceException}

import org.apache.spark.sql.hive.HiveContext
Expand Down Expand Up @@ -57,6 +57,15 @@ private[hive] class SparkSQLCLIService(hiveContext: HiveContext)

initCompositeService(hiveConf)
}

override def getInfo(sessionHandle: SessionHandle, getInfoType: GetInfoType): GetInfoValue = {
getInfoType match {
case GetInfoType.CLI_SERVER_NAME => new GetInfoValue("Spark SQL")
case GetInfoType.CLI_DBMS_NAME => new GetInfoValue("Spark SQL")
case GetInfoType.CLI_DBMS_VER => new GetInfoValue(hiveContext.sparkContext.version)
case _ => super.getInfo(sessionHandle, getInfoType)
}
}
}

private[thriftserver] trait ReflectedCompositeService { this: AbstractService =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@

package org.apache.spark.sql.hive.thriftserver

import scala.collection.JavaConversions._

import org.apache.spark.scheduler.StatsReportListener
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{Logging, SparkConf, SparkContext}
import scala.collection.JavaConversions._

/** A singleton object for the master program. The slaves should not access this. */
private[hive] object SparkSQLEnv extends Logging {
Expand All @@ -31,8 +32,10 @@ private[hive] object SparkSQLEnv extends Logging {

def init() {
if (hiveContext == null) {
sparkContext = new SparkContext(new SparkConf()
.setAppName(s"SparkSQL::${java.net.InetAddress.getLocalHost.getHostName}"))
val sparkConf = new SparkConf()
.setAppName(s"SparkSQL::${java.net.InetAddress.getLocalHost.getHostName}")
.set("spark.sql.hive.version", "0.12.0")
sparkContext = new SparkContext(sparkConf)

sparkContext.addSparkListener(new StatsReportListener())
hiveContext = new HiveContext(sparkContext)
Expand Down
Loading

0 comments on commit d354161

Please sign in to comment.