Skip to content

Commit

Permalink
[SPARK-29724][SPARK-29726][WEBUI][SQL] Support JDBC/ODBC tab for Hist…
Browse files Browse the repository at this point in the history
…oryServer WebUI

### What changes were proposed in this pull request?

 Support JDBC/ODBC tab for HistoryServer WebUI. Currently from Historyserver we can't access the JDBC/ODBC tab for thrift server applications. In this PR, I am doing 2 main changes
1. Refactor existing thrift server listener to support kvstore
2. Add history server plugin for thrift server listener and tab.

### Why are the changes needed?
Users can access Thriftserver tab from History server for both running and finished applications,

### Does this PR introduce any user-facing change?
Support for JDBC/ODBC tab  for the WEBUI from History server

### How was this patch tested?
Add UT and Manual tests
1. Start Thriftserver and Historyserver
```
sbin/stop-thriftserver.sh
sbin/stop-historyserver.sh
sbin/start-thriftserver.sh
sbin/start-historyserver.sh
```
2. Launch beeline
`bin/beeline -u jdbc:hive2://localhost:10000`

3. Run queries

Go to the JDBC/ODBC page of the WebUI from History server

![image](https://user-images.githubusercontent.com/23054875/68365501-cf013700-0156-11ea-84b4-fda8008c92c4.png)

Closes #26378 from shahidki31/ThriftKVStore.

Authored-by: shahid <shahidki31@gmail.com>
Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com>
  • Loading branch information
shahidki31 authored and gengliangwang committed Nov 30, 2019
1 parent 9351e3e commit b182ed8
Show file tree
Hide file tree
Showing 23 changed files with 913 additions and 287 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -352,10 +352,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val ui = SparkUI.create(None, new HistoryAppStatusStore(conf, kvstore), conf, secManager,
app.info.name, HistoryServer.getAttemptURI(appId, attempt.info.attemptId),
attempt.info.startTime.getTime(), attempt.info.appSparkVersion)
loadPlugins().foreach(_.setupUI(ui))

val loadedUI = LoadedAppUI(ui)
// place the tab in UI based on the display order
loadPlugins().toSeq.sortBy(_.displayOrder).foreach(_.setupUI(ui))

val loadedUI = LoadedAppUI(ui)
synchronized {
activeUIs((appId, attemptId)) = loadedUI
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,9 @@ private[spark] trait AppHistoryServerPlugin {
* Sets up UI of this plugin to rebuild the history UI.
*/
def setupUI(ui: SparkUI): Unit

/**
* The position of a plugin tab relative to the other plugin tabs in the history UI.
*/
def displayOrder: Int = Integer.MAX_VALUE
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,7 @@ class SQLHistoryServerPlugin extends AppHistoryServerPlugin {
new SQLTab(sqlStatusStore, ui)
}
}

override def displayOrder: Int = 0
}

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.apache.spark.sql.hive.thriftserver.ui.HiveThriftServer2HistoryServerPlugin
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ package org.apache.spark.sql.hive.thriftserver
import java.util.Locale
import java.util.concurrent.atomic.AtomicBoolean

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hive.service.cli.thrift.{ThriftBinaryCLIService, ThriftHttpCLIService}
Expand All @@ -32,12 +29,11 @@ import org.apache.spark.SparkContext
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.UI.UI_ENABLED
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerJobStart}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerTab
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.hive.thriftserver.ui._
import org.apache.spark.status.ElementTrackingStore
import org.apache.spark.util.{ShutdownHookManager, Utils}

/**
Expand All @@ -47,6 +43,7 @@ import org.apache.spark.util.{ShutdownHookManager, Utils}
object HiveThriftServer2 extends Logging {
var uiTab: Option[ThriftServerTab] = None
var listener: HiveThriftServer2Listener = _
var eventManager: HiveThriftServer2EventManager = _

/**
* :: DeveloperApi ::
Expand All @@ -62,14 +59,21 @@ object HiveThriftServer2 extends Logging {

server.init(executionHive.conf)
server.start()
listener = new HiveThriftServer2Listener(server, sqlContext.conf)
sqlContext.sparkContext.addSparkListener(listener)
uiTab = if (sqlContext.sparkContext.getConf.get(UI_ENABLED)) {
Some(new ThriftServerTab(sqlContext.sparkContext))
createListenerAndUI(server, sqlContext.sparkContext)
server
}

private def createListenerAndUI(server: HiveThriftServer2, sc: SparkContext): Unit = {
val kvStore = sc.statusStore.store.asInstanceOf[ElementTrackingStore]
eventManager = new HiveThriftServer2EventManager(sc)
listener = new HiveThriftServer2Listener(kvStore, sc.conf, Some(server))
sc.listenerBus.addToStatusQueue(listener)
uiTab = if (sc.getConf.get(UI_ENABLED)) {
Some(new ThriftServerTab(new HiveThriftServer2AppStatusStore(kvStore, Some(listener)),
ThriftServerTab.getSparkUI(sc)))
} else {
None
}
server
}

def main(args: Array[String]): Unit = {
Expand Down Expand Up @@ -101,13 +105,7 @@ object HiveThriftServer2 extends Logging {
server.init(executionHive.conf)
server.start()
logInfo("HiveThriftServer2 started")
listener = new HiveThriftServer2Listener(server, SparkSQLEnv.sqlContext.conf)
SparkSQLEnv.sparkContext.addSparkListener(listener)
uiTab = if (SparkSQLEnv.sparkContext.getConf.get(UI_ENABLED)) {
Some(new ThriftServerTab(SparkSQLEnv.sparkContext))
} else {
None
}
createListenerAndUI(server, SparkSQLEnv.sparkContext)
// If application was killed before HiveThriftServer2 start successfully then SparkSubmit
// process can not exit, so check whether if SparkContext was stopped.
if (SparkSQLEnv.sparkContext.stopped.get()) {
Expand All @@ -121,179 +119,10 @@ object HiveThriftServer2 extends Logging {
}
}

private[thriftserver] class SessionInfo(
val sessionId: String,
val startTimestamp: Long,
val ip: String,
val userName: String) {
var finishTimestamp: Long = 0L
var totalExecution: Int = 0
def totalTime: Long = {
if (finishTimestamp == 0L) {
System.currentTimeMillis - startTimestamp
} else {
finishTimestamp - startTimestamp
}
}
}

private[thriftserver] object ExecutionState extends Enumeration {
val STARTED, COMPILED, CANCELED, FAILED, FINISHED, CLOSED = Value
type ExecutionState = Value
}

private[thriftserver] class ExecutionInfo(
val statement: String,
val sessionId: String,
val startTimestamp: Long,
val userName: String) {
var finishTimestamp: Long = 0L
var closeTimestamp: Long = 0L
var executePlan: String = ""
var detail: String = ""
var state: ExecutionState.Value = ExecutionState.STARTED
val jobId: ArrayBuffer[String] = ArrayBuffer[String]()
var groupId: String = ""
def totalTime(endTime: Long): Long = {
if (endTime == 0L) {
System.currentTimeMillis - startTimestamp
} else {
endTime - startTimestamp
}
}
}


/**
* An inner sparkListener called in sc.stop to clean up the HiveThriftServer2
*/
private[thriftserver] class HiveThriftServer2Listener(
val server: HiveServer2,
val conf: SQLConf) extends SparkListener {

override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
server.stop()
}
private val sessionList = new mutable.LinkedHashMap[String, SessionInfo]
private val executionList = new mutable.LinkedHashMap[String, ExecutionInfo]
private val retainedStatements = conf.getConf(SQLConf.THRIFTSERVER_UI_STATEMENT_LIMIT)
private val retainedSessions = conf.getConf(SQLConf.THRIFTSERVER_UI_SESSION_LIMIT)

def getOnlineSessionNum: Int = synchronized {
sessionList.count(_._2.finishTimestamp == 0)
}

def isExecutionActive(execInfo: ExecutionInfo): Boolean = {
!(execInfo.state == ExecutionState.FAILED ||
execInfo.state == ExecutionState.CANCELED ||
execInfo.state == ExecutionState.CLOSED)
}

/**
* When an error or a cancellation occurs, we set the finishTimestamp of the statement.
* Therefore, when we count the number of running statements, we need to exclude errors and
* cancellations and count all statements that have not been closed so far.
*/
def getTotalRunning: Int = synchronized {
executionList.count {
case (_, v) => isExecutionActive(v)
}
}

def getSessionList: Seq[SessionInfo] = synchronized { sessionList.values.toSeq }

def getSession(sessionId: String): Option[SessionInfo] = synchronized {
sessionList.get(sessionId)
}

def getExecutionList: Seq[ExecutionInfo] = synchronized { executionList.values.toSeq }

override def onJobStart(jobStart: SparkListenerJobStart): Unit = synchronized {
for {
props <- Option(jobStart.properties)
groupId <- Option(props.getProperty(SparkContext.SPARK_JOB_GROUP_ID))
(_, info) <- executionList if info.groupId == groupId
} {
info.jobId += jobStart.jobId.toString
info.groupId = groupId
}
}

def onSessionCreated(ip: String, sessionId: String, userName: String = "UNKNOWN"): Unit = {
synchronized {
val info = new SessionInfo(sessionId, System.currentTimeMillis, ip, userName)
sessionList.put(sessionId, info)
trimSessionIfNecessary()
}
}

def onSessionClosed(sessionId: String): Unit = synchronized {
sessionList(sessionId).finishTimestamp = System.currentTimeMillis
trimSessionIfNecessary()
}

def onStatementStart(
id: String,
sessionId: String,
statement: String,
groupId: String,
userName: String = "UNKNOWN"): Unit = synchronized {
val info = new ExecutionInfo(statement, sessionId, System.currentTimeMillis, userName)
info.state = ExecutionState.STARTED
executionList.put(id, info)
trimExecutionIfNecessary()
sessionList(sessionId).totalExecution += 1
executionList(id).groupId = groupId
}

def onStatementParsed(id: String, executionPlan: String): Unit = synchronized {
executionList(id).executePlan = executionPlan
executionList(id).state = ExecutionState.COMPILED
}

def onStatementCanceled(id: String): Unit = synchronized {
executionList(id).finishTimestamp = System.currentTimeMillis
executionList(id).state = ExecutionState.CANCELED
trimExecutionIfNecessary()
}

def onStatementError(id: String, errorMsg: String, errorTrace: String): Unit = synchronized {
executionList(id).finishTimestamp = System.currentTimeMillis
executionList(id).detail = errorMsg
executionList(id).state = ExecutionState.FAILED
trimExecutionIfNecessary()
}

def onStatementFinish(id: String): Unit = synchronized {
executionList(id).finishTimestamp = System.currentTimeMillis
executionList(id).state = ExecutionState.FINISHED
trimExecutionIfNecessary()
}

def onOperationClosed(id: String): Unit = synchronized {
executionList(id).closeTimestamp = System.currentTimeMillis
executionList(id).state = ExecutionState.CLOSED
}

private def trimExecutionIfNecessary() = {
if (executionList.size > retainedStatements) {
val toRemove = math.max(retainedStatements / 10, 1)
executionList.filter(_._2.finishTimestamp != 0).take(toRemove).foreach { s =>
executionList.remove(s._1)
}
}
}

private def trimSessionIfNecessary() = {
if (sessionList.size > retainedSessions) {
val toRemove = math.max(retainedSessions / 10, 1)
sessionList.filter(_._2.finishTimestamp != 0).take(toRemove).foreach { s =>
sessionList.remove(s._1)
}
}

}
}
}

private[hive] class HiveThriftServer2(sqlContext: SQLContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ private[hive] class SparkExecuteStatementOperation(
// RDDs will be cleaned automatically upon garbage collection.
logInfo(s"Close statement with $statementId")
cleanup(OperationState.CLOSED)
HiveThriftServer2.listener.onOperationClosed(statementId)
HiveThriftServer2.eventManager.onOperationClosed(statementId)
}

def addNonNullColumnValue(from: SparkRow, to: ArrayBuffer[Any], ordinal: Int): Unit = {
Expand Down Expand Up @@ -195,7 +195,7 @@ private[hive] class SparkExecuteStatementOperation(
setState(OperationState.PENDING)
statementId = UUID.randomUUID().toString
logInfo(s"Submitting query '$statement' with $statementId")
HiveThriftServer2.listener.onStatementStart(
HiveThriftServer2.eventManager.onStatementStart(
statementId,
parentSession.getSessionHandle.getSessionId.toString,
statement,
Expand Down Expand Up @@ -245,14 +245,14 @@ private[hive] class SparkExecuteStatementOperation(
case rejected: RejectedExecutionException =>
logError("Error submitting query in background, query rejected", rejected)
setState(OperationState.ERROR)
HiveThriftServer2.listener.onStatementError(
HiveThriftServer2.eventManager.onStatementError(
statementId, rejected.getMessage, SparkUtils.exceptionString(rejected))
throw new HiveSQLException("The background threadpool cannot accept" +
" new task for execution, please retry the operation", rejected)
case NonFatal(e) =>
logError(s"Error executing query in background", e)
setState(OperationState.ERROR)
HiveThriftServer2.listener.onStatementError(
HiveThriftServer2.eventManager.onStatementError(
statementId, e.getMessage, SparkUtils.exceptionString(e))
throw new HiveSQLException(e)
}
Expand Down Expand Up @@ -284,7 +284,8 @@ private[hive] class SparkExecuteStatementOperation(
"in this session.")
case _ =>
}
HiveThriftServer2.listener.onStatementParsed(statementId, result.queryExecution.toString())
HiveThriftServer2.eventManager.onStatementParsed(statementId,
result.queryExecution.toString())
iter = {
if (sqlContext.getConf(SQLConf.THRIFTSERVER_INCREMENTAL_COLLECT.key).toBoolean) {
resultList = None
Expand Down Expand Up @@ -315,12 +316,12 @@ private[hive] class SparkExecuteStatementOperation(
setState(OperationState.ERROR)
e match {
case hiveException: HiveSQLException =>
HiveThriftServer2.listener.onStatementError(
HiveThriftServer2.eventManager.onStatementError(
statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException))
throw hiveException
case _ =>
val root = ExceptionUtils.getRootCause(e)
HiveThriftServer2.listener.onStatementError(
HiveThriftServer2.eventManager.onStatementError(
statementId, root.getMessage, SparkUtils.exceptionString(root))
throw new HiveSQLException("Error running query: " + root.toString, root)
}
Expand All @@ -329,7 +330,7 @@ private[hive] class SparkExecuteStatementOperation(
synchronized {
if (!getStatus.getState.isTerminal) {
setState(OperationState.FINISHED)
HiveThriftServer2.listener.onStatementFinish(statementId)
HiveThriftServer2.eventManager.onStatementFinish(statementId)
}
}
sqlContext.sparkContext.clearJobGroup()
Expand All @@ -341,7 +342,7 @@ private[hive] class SparkExecuteStatementOperation(
if (!getStatus.getState.isTerminal) {
logInfo(s"Cancel query with $statementId")
cleanup(OperationState.CANCELED)
HiveThriftServer2.listener.onStatementCanceled(statementId)
HiveThriftServer2.eventManager.onStatementCanceled(statementId)
}
}
}
Expand Down
Loading

0 comments on commit b182ed8

Please sign in to comment.