From b182ed83f6c8d06c54ce169754c31c170088d08c Mon Sep 17 00:00:00 2001 From: shahid Date: Fri, 29 Nov 2019 19:44:31 -0800 Subject: [PATCH] [SPARK-29724][SPARK-29726][WEBUI][SQL] Support JDBC/ODBC tab for HistoryServer WebUI ### What changes were proposed in this pull request? Support JDBC/ODBC tab for HistoryServer WebUI. Currently from Historyserver we can't access the JDBC/ODBC tab for thrift server applications. In this PR, I am doing 2 main changes 1. Refactor existing thrift server listener to support kvstore 2. Add history server plugin for thrift server listener and tab. ### Why are the changes needed? Users can access Thriftserver tab from History server for both running and finished applications, ### Does this PR introduce any user-facing change? Support for JDBC/ODBC tab for the WEBUI from History server ### How was this patch tested? Add UT and Manual tests 1. Start Thriftserver and Historyserver ``` sbin/stop-thriftserver.sh sbin/stop-historyserver.sh sbin/start-thriftserver.sh sbin/start-historyserver.sh ``` 2. Launch beeline `bin/beeline -u jdbc:hive2://localhost:10000` 3. Run queries Go to the JDBC/ODBC page of the WebUI from History server ![image](https://user-images.githubusercontent.com/23054875/68365501-cf013700-0156-11ea-84b4-fda8008c92c4.png) Closes #26378 from shahidki31/ThriftKVStore. Authored-by: shahid Signed-off-by: Gengliang Wang --- .../deploy/history/FsHistoryProvider.scala | 5 +- .../spark/status/AppHistoryServerPlugin.scala | 5 + .../execution/ui/SQLHistoryServerPlugin.scala | 3 + ...apache.spark.status.AppHistoryServerPlugin | 1 + .../hive/thriftserver/HiveThriftServer2.scala | 203 +---------- .../SparkExecuteStatementOperation.scala | 19 +- .../SparkGetCatalogsOperation.scala | 10 +- .../SparkGetColumnsOperation.scala | 10 +- .../SparkGetFunctionsOperation.scala | 10 +- .../SparkGetSchemasOperation.scala | 10 +- .../SparkGetTableTypesOperation.scala | 10 +- .../SparkGetTablesOperation.scala | 10 +- .../SparkGetTypeInfoOperation.scala | 10 +- .../thriftserver/SparkSQLSessionManager.scala | 4 +- .../ui/HiveThriftServer2AppStatusStore.scala | 132 ++++++++ .../ui/HiveThriftServer2EventManager.scala | 113 +++++++ ...HiveThriftServer2HistoryServerPlugin.scala | 40 +++ .../ui/HiveThriftServer2Listener.scala | 315 ++++++++++++++++++ .../thriftserver/ui/ThriftServerPage.scala | 30 +- .../ui/ThriftServerSessionPage.scala | 13 +- .../thriftserver/ui/ThriftServerTab.scala | 15 +- .../ui/HiveThriftServer2ListenerSuite.scala | 164 +++++++++ .../ui/ThriftServerPageSuite.scala | 68 ++-- 23 files changed, 913 insertions(+), 287 deletions(-) create mode 100644 sql/hive-thriftserver/src/main/resources/META-INF/services/org.apache.spark.status.AppHistoryServerPlugin create mode 100644 sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2AppStatusStore.scala create mode 100644 sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2EventManager.scala create mode 100644 sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2HistoryServerPlugin.scala create mode 100644 sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala create mode 100644 sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index e2f3314bc8595..a3776b3ad756d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -352,10 +352,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val ui = SparkUI.create(None, new HistoryAppStatusStore(conf, kvstore), conf, secManager, app.info.name, HistoryServer.getAttemptURI(appId, attempt.info.attemptId), attempt.info.startTime.getTime(), attempt.info.appSparkVersion) - loadPlugins().foreach(_.setupUI(ui)) - val loadedUI = LoadedAppUI(ui) + // place the tab in UI based on the display order + loadPlugins().toSeq.sortBy(_.displayOrder).foreach(_.setupUI(ui)) + val loadedUI = LoadedAppUI(ui) synchronized { activeUIs((appId, attemptId)) = loadedUI } diff --git a/core/src/main/scala/org/apache/spark/status/AppHistoryServerPlugin.scala b/core/src/main/scala/org/apache/spark/status/AppHistoryServerPlugin.scala index d144a0e998fa1..2e9a31d5ac69c 100644 --- a/core/src/main/scala/org/apache/spark/status/AppHistoryServerPlugin.scala +++ b/core/src/main/scala/org/apache/spark/status/AppHistoryServerPlugin.scala @@ -35,4 +35,9 @@ private[spark] trait AppHistoryServerPlugin { * Sets up UI of this plugin to rebuild the history UI. */ def setupUI(ui: SparkUI): Unit + + /** + * The position of a plugin tab relative to the other plugin tabs in the history UI. + */ + def displayOrder: Int = Integer.MAX_VALUE } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLHistoryServerPlugin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLHistoryServerPlugin.scala index 522d0cf79bffa..5bf1ce5eb8a90 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLHistoryServerPlugin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLHistoryServerPlugin.scala @@ -33,4 +33,7 @@ class SQLHistoryServerPlugin extends AppHistoryServerPlugin { new SQLTab(sqlStatusStore, ui) } } + + override def displayOrder: Int = 0 } + diff --git a/sql/hive-thriftserver/src/main/resources/META-INF/services/org.apache.spark.status.AppHistoryServerPlugin b/sql/hive-thriftserver/src/main/resources/META-INF/services/org.apache.spark.status.AppHistoryServerPlugin new file mode 100644 index 0000000000000..96d990372ee4c --- /dev/null +++ b/sql/hive-thriftserver/src/main/resources/META-INF/services/org.apache.spark.status.AppHistoryServerPlugin @@ -0,0 +1 @@ +org.apache.spark.sql.hive.thriftserver.ui.HiveThriftServer2HistoryServerPlugin diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index 9517a599be633..f15193b0dc3cc 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -20,9 +20,6 @@ package org.apache.spark.sql.hive.thriftserver import java.util.Locale import java.util.concurrent.atomic.AtomicBoolean -import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer - import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hive.service.cli.thrift.{ThriftBinaryCLIService, ThriftHttpCLIService} @@ -32,12 +29,11 @@ import org.apache.spark.SparkContext import org.apache.spark.annotation.DeveloperApi import org.apache.spark.internal.Logging import org.apache.spark.internal.config.UI.UI_ENABLED -import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerJobStart} import org.apache.spark.sql.SQLContext import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ -import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerTab -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.hive.thriftserver.ui._ +import org.apache.spark.status.ElementTrackingStore import org.apache.spark.util.{ShutdownHookManager, Utils} /** @@ -47,6 +43,7 @@ import org.apache.spark.util.{ShutdownHookManager, Utils} object HiveThriftServer2 extends Logging { var uiTab: Option[ThriftServerTab] = None var listener: HiveThriftServer2Listener = _ + var eventManager: HiveThriftServer2EventManager = _ /** * :: DeveloperApi :: @@ -62,14 +59,21 @@ object HiveThriftServer2 extends Logging { server.init(executionHive.conf) server.start() - listener = new HiveThriftServer2Listener(server, sqlContext.conf) - sqlContext.sparkContext.addSparkListener(listener) - uiTab = if (sqlContext.sparkContext.getConf.get(UI_ENABLED)) { - Some(new ThriftServerTab(sqlContext.sparkContext)) + createListenerAndUI(server, sqlContext.sparkContext) + server + } + + private def createListenerAndUI(server: HiveThriftServer2, sc: SparkContext): Unit = { + val kvStore = sc.statusStore.store.asInstanceOf[ElementTrackingStore] + eventManager = new HiveThriftServer2EventManager(sc) + listener = new HiveThriftServer2Listener(kvStore, sc.conf, Some(server)) + sc.listenerBus.addToStatusQueue(listener) + uiTab = if (sc.getConf.get(UI_ENABLED)) { + Some(new ThriftServerTab(new HiveThriftServer2AppStatusStore(kvStore, Some(listener)), + ThriftServerTab.getSparkUI(sc))) } else { None } - server } def main(args: Array[String]): Unit = { @@ -101,13 +105,7 @@ object HiveThriftServer2 extends Logging { server.init(executionHive.conf) server.start() logInfo("HiveThriftServer2 started") - listener = new HiveThriftServer2Listener(server, SparkSQLEnv.sqlContext.conf) - SparkSQLEnv.sparkContext.addSparkListener(listener) - uiTab = if (SparkSQLEnv.sparkContext.getConf.get(UI_ENABLED)) { - Some(new ThriftServerTab(SparkSQLEnv.sparkContext)) - } else { - None - } + createListenerAndUI(server, SparkSQLEnv.sparkContext) // If application was killed before HiveThriftServer2 start successfully then SparkSubmit // process can not exit, so check whether if SparkContext was stopped. if (SparkSQLEnv.sparkContext.stopped.get()) { @@ -121,179 +119,10 @@ object HiveThriftServer2 extends Logging { } } - private[thriftserver] class SessionInfo( - val sessionId: String, - val startTimestamp: Long, - val ip: String, - val userName: String) { - var finishTimestamp: Long = 0L - var totalExecution: Int = 0 - def totalTime: Long = { - if (finishTimestamp == 0L) { - System.currentTimeMillis - startTimestamp - } else { - finishTimestamp - startTimestamp - } - } - } - private[thriftserver] object ExecutionState extends Enumeration { val STARTED, COMPILED, CANCELED, FAILED, FINISHED, CLOSED = Value type ExecutionState = Value } - - private[thriftserver] class ExecutionInfo( - val statement: String, - val sessionId: String, - val startTimestamp: Long, - val userName: String) { - var finishTimestamp: Long = 0L - var closeTimestamp: Long = 0L - var executePlan: String = "" - var detail: String = "" - var state: ExecutionState.Value = ExecutionState.STARTED - val jobId: ArrayBuffer[String] = ArrayBuffer[String]() - var groupId: String = "" - def totalTime(endTime: Long): Long = { - if (endTime == 0L) { - System.currentTimeMillis - startTimestamp - } else { - endTime - startTimestamp - } - } - } - - - /** - * An inner sparkListener called in sc.stop to clean up the HiveThriftServer2 - */ - private[thriftserver] class HiveThriftServer2Listener( - val server: HiveServer2, - val conf: SQLConf) extends SparkListener { - - override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { - server.stop() - } - private val sessionList = new mutable.LinkedHashMap[String, SessionInfo] - private val executionList = new mutable.LinkedHashMap[String, ExecutionInfo] - private val retainedStatements = conf.getConf(SQLConf.THRIFTSERVER_UI_STATEMENT_LIMIT) - private val retainedSessions = conf.getConf(SQLConf.THRIFTSERVER_UI_SESSION_LIMIT) - - def getOnlineSessionNum: Int = synchronized { - sessionList.count(_._2.finishTimestamp == 0) - } - - def isExecutionActive(execInfo: ExecutionInfo): Boolean = { - !(execInfo.state == ExecutionState.FAILED || - execInfo.state == ExecutionState.CANCELED || - execInfo.state == ExecutionState.CLOSED) - } - - /** - * When an error or a cancellation occurs, we set the finishTimestamp of the statement. - * Therefore, when we count the number of running statements, we need to exclude errors and - * cancellations and count all statements that have not been closed so far. - */ - def getTotalRunning: Int = synchronized { - executionList.count { - case (_, v) => isExecutionActive(v) - } - } - - def getSessionList: Seq[SessionInfo] = synchronized { sessionList.values.toSeq } - - def getSession(sessionId: String): Option[SessionInfo] = synchronized { - sessionList.get(sessionId) - } - - def getExecutionList: Seq[ExecutionInfo] = synchronized { executionList.values.toSeq } - - override def onJobStart(jobStart: SparkListenerJobStart): Unit = synchronized { - for { - props <- Option(jobStart.properties) - groupId <- Option(props.getProperty(SparkContext.SPARK_JOB_GROUP_ID)) - (_, info) <- executionList if info.groupId == groupId - } { - info.jobId += jobStart.jobId.toString - info.groupId = groupId - } - } - - def onSessionCreated(ip: String, sessionId: String, userName: String = "UNKNOWN"): Unit = { - synchronized { - val info = new SessionInfo(sessionId, System.currentTimeMillis, ip, userName) - sessionList.put(sessionId, info) - trimSessionIfNecessary() - } - } - - def onSessionClosed(sessionId: String): Unit = synchronized { - sessionList(sessionId).finishTimestamp = System.currentTimeMillis - trimSessionIfNecessary() - } - - def onStatementStart( - id: String, - sessionId: String, - statement: String, - groupId: String, - userName: String = "UNKNOWN"): Unit = synchronized { - val info = new ExecutionInfo(statement, sessionId, System.currentTimeMillis, userName) - info.state = ExecutionState.STARTED - executionList.put(id, info) - trimExecutionIfNecessary() - sessionList(sessionId).totalExecution += 1 - executionList(id).groupId = groupId - } - - def onStatementParsed(id: String, executionPlan: String): Unit = synchronized { - executionList(id).executePlan = executionPlan - executionList(id).state = ExecutionState.COMPILED - } - - def onStatementCanceled(id: String): Unit = synchronized { - executionList(id).finishTimestamp = System.currentTimeMillis - executionList(id).state = ExecutionState.CANCELED - trimExecutionIfNecessary() - } - - def onStatementError(id: String, errorMsg: String, errorTrace: String): Unit = synchronized { - executionList(id).finishTimestamp = System.currentTimeMillis - executionList(id).detail = errorMsg - executionList(id).state = ExecutionState.FAILED - trimExecutionIfNecessary() - } - - def onStatementFinish(id: String): Unit = synchronized { - executionList(id).finishTimestamp = System.currentTimeMillis - executionList(id).state = ExecutionState.FINISHED - trimExecutionIfNecessary() - } - - def onOperationClosed(id: String): Unit = synchronized { - executionList(id).closeTimestamp = System.currentTimeMillis - executionList(id).state = ExecutionState.CLOSED - } - - private def trimExecutionIfNecessary() = { - if (executionList.size > retainedStatements) { - val toRemove = math.max(retainedStatements / 10, 1) - executionList.filter(_._2.finishTimestamp != 0).take(toRemove).foreach { s => - executionList.remove(s._1) - } - } - } - - private def trimSessionIfNecessary() = { - if (sessionList.size > retainedSessions) { - val toRemove = math.max(retainedSessions / 10, 1) - sessionList.filter(_._2.finishTimestamp != 0).take(toRemove).foreach { s => - sessionList.remove(s._1) - } - } - - } - } } private[hive] class HiveThriftServer2(sqlContext: SQLContext) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 68197a9de8566..76d07848f79a9 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -77,7 +77,7 @@ private[hive] class SparkExecuteStatementOperation( // RDDs will be cleaned automatically upon garbage collection. logInfo(s"Close statement with $statementId") cleanup(OperationState.CLOSED) - HiveThriftServer2.listener.onOperationClosed(statementId) + HiveThriftServer2.eventManager.onOperationClosed(statementId) } def addNonNullColumnValue(from: SparkRow, to: ArrayBuffer[Any], ordinal: Int): Unit = { @@ -195,7 +195,7 @@ private[hive] class SparkExecuteStatementOperation( setState(OperationState.PENDING) statementId = UUID.randomUUID().toString logInfo(s"Submitting query '$statement' with $statementId") - HiveThriftServer2.listener.onStatementStart( + HiveThriftServer2.eventManager.onStatementStart( statementId, parentSession.getSessionHandle.getSessionId.toString, statement, @@ -245,14 +245,14 @@ private[hive] class SparkExecuteStatementOperation( case rejected: RejectedExecutionException => logError("Error submitting query in background, query rejected", rejected) setState(OperationState.ERROR) - HiveThriftServer2.listener.onStatementError( + HiveThriftServer2.eventManager.onStatementError( statementId, rejected.getMessage, SparkUtils.exceptionString(rejected)) throw new HiveSQLException("The background threadpool cannot accept" + " new task for execution, please retry the operation", rejected) case NonFatal(e) => logError(s"Error executing query in background", e) setState(OperationState.ERROR) - HiveThriftServer2.listener.onStatementError( + HiveThriftServer2.eventManager.onStatementError( statementId, e.getMessage, SparkUtils.exceptionString(e)) throw new HiveSQLException(e) } @@ -284,7 +284,8 @@ private[hive] class SparkExecuteStatementOperation( "in this session.") case _ => } - HiveThriftServer2.listener.onStatementParsed(statementId, result.queryExecution.toString()) + HiveThriftServer2.eventManager.onStatementParsed(statementId, + result.queryExecution.toString()) iter = { if (sqlContext.getConf(SQLConf.THRIFTSERVER_INCREMENTAL_COLLECT.key).toBoolean) { resultList = None @@ -315,12 +316,12 @@ private[hive] class SparkExecuteStatementOperation( setState(OperationState.ERROR) e match { case hiveException: HiveSQLException => - HiveThriftServer2.listener.onStatementError( + HiveThriftServer2.eventManager.onStatementError( statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException)) throw hiveException case _ => val root = ExceptionUtils.getRootCause(e) - HiveThriftServer2.listener.onStatementError( + HiveThriftServer2.eventManager.onStatementError( statementId, root.getMessage, SparkUtils.exceptionString(root)) throw new HiveSQLException("Error running query: " + root.toString, root) } @@ -329,7 +330,7 @@ private[hive] class SparkExecuteStatementOperation( synchronized { if (!getStatus.getState.isTerminal) { setState(OperationState.FINISHED) - HiveThriftServer2.listener.onStatementFinish(statementId) + HiveThriftServer2.eventManager.onStatementFinish(statementId) } } sqlContext.sparkContext.clearJobGroup() @@ -341,7 +342,7 @@ private[hive] class SparkExecuteStatementOperation( if (!getStatus.getState.isTerminal) { logInfo(s"Cancel query with $statementId") cleanup(OperationState.CANCELED) - HiveThriftServer2.listener.onStatementCanceled(statementId) + HiveThriftServer2.eventManager.onStatementCanceled(statementId) } } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala index 6c8a5b00992da..2945cfd200e46 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetCatalogsOperation.scala @@ -44,7 +44,7 @@ private[hive] class SparkGetCatalogsOperation( override def close(): Unit = { super.close() - HiveThriftServer2.listener.onOperationClosed(statementId) + HiveThriftServer2.eventManager.onOperationClosed(statementId) } override def runInternal(): Unit = { @@ -56,7 +56,7 @@ private[hive] class SparkGetCatalogsOperation( val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader Thread.currentThread().setContextClassLoader(executionHiveClassLoader) - HiveThriftServer2.listener.onStatementStart( + HiveThriftServer2.eventManager.onStatementStart( statementId, parentSession.getSessionHandle.getSessionId.toString, logMsg, @@ -74,16 +74,16 @@ private[hive] class SparkGetCatalogsOperation( setState(OperationState.ERROR) e match { case hiveException: HiveSQLException => - HiveThriftServer2.listener.onStatementError( + HiveThriftServer2.eventManager.onStatementError( statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException)) throw hiveException case _ => val root = ExceptionUtils.getRootCause(e) - HiveThriftServer2.listener.onStatementError( + HiveThriftServer2.eventManager.onStatementError( statementId, root.getMessage, SparkUtils.exceptionString(root)) throw new HiveSQLException("Error getting catalogs: " + root.toString, root) } } - HiveThriftServer2.listener.onStatementFinish(statementId) + HiveThriftServer2.eventManager.onStatementFinish(statementId) } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala index f845a2285b9a3..ff7cbfeae13be 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala @@ -63,7 +63,7 @@ private[hive] class SparkGetColumnsOperation( override def close(): Unit = { super.close() - HiveThriftServer2.listener.onOperationClosed(statementId) + HiveThriftServer2.eventManager.onOperationClosed(statementId) } override def runInternal(): Unit = { @@ -78,7 +78,7 @@ private[hive] class SparkGetColumnsOperation( val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader Thread.currentThread().setContextClassLoader(executionHiveClassLoader) - HiveThriftServer2.listener.onStatementStart( + HiveThriftServer2.eventManager.onStatementStart( statementId, parentSession.getSessionHandle.getSessionId.toString, logMsg, @@ -135,17 +135,17 @@ private[hive] class SparkGetColumnsOperation( setState(OperationState.ERROR) e match { case hiveException: HiveSQLException => - HiveThriftServer2.listener.onStatementError( + HiveThriftServer2.eventManager.onStatementError( statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException)) throw hiveException case _ => val root = ExceptionUtils.getRootCause(e) - HiveThriftServer2.listener.onStatementError( + HiveThriftServer2.eventManager.onStatementError( statementId, root.getMessage, SparkUtils.exceptionString(root)) throw new HiveSQLException("Error getting columns: " + root.toString, root) } } - HiveThriftServer2.listener.onStatementFinish(statementId) + HiveThriftServer2.eventManager.onStatementFinish(statementId) } private def addToRowSet( diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala index 1cdd8918421bb..d9c12b6ca9e64 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetFunctionsOperation.scala @@ -54,7 +54,7 @@ private[hive] class SparkGetFunctionsOperation( override def close(): Unit = { super.close() - HiveThriftServer2.listener.onOperationClosed(statementId) + HiveThriftServer2.eventManager.onOperationClosed(statementId) } override def runInternal(): Unit = { @@ -81,7 +81,7 @@ private[hive] class SparkGetFunctionsOperation( authorizeMetaGets(HiveOperationType.GET_FUNCTIONS, privObjs, cmdStr) } - HiveThriftServer2.listener.onStatementStart( + HiveThriftServer2.eventManager.onStatementStart( statementId, parentSession.getSessionHandle.getSessionId.toString, logMsg, @@ -110,16 +110,16 @@ private[hive] class SparkGetFunctionsOperation( setState(OperationState.ERROR) e match { case hiveException: HiveSQLException => - HiveThriftServer2.listener.onStatementError( + HiveThriftServer2.eventManager.onStatementError( statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException)) throw hiveException case _ => val root = ExceptionUtils.getRootCause(e) - HiveThriftServer2.listener.onStatementError( + HiveThriftServer2.eventManager.onStatementError( statementId, root.getMessage, SparkUtils.exceptionString(root)) throw new HiveSQLException("Error getting functions: " + root.toString, root) } } - HiveThriftServer2.listener.onStatementFinish(statementId) + HiveThriftServer2.eventManager.onStatementFinish(statementId) } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala index 928610a6bcff9..db19880d1b99f 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala @@ -50,7 +50,7 @@ private[hive] class SparkGetSchemasOperation( override def close(): Unit = { super.close() - HiveThriftServer2.listener.onOperationClosed(statementId) + HiveThriftServer2.eventManager.onOperationClosed(statementId) } override def runInternal(): Unit = { @@ -68,7 +68,7 @@ private[hive] class SparkGetSchemasOperation( authorizeMetaGets(HiveOperationType.GET_TABLES, null, cmdStr) } - HiveThriftServer2.listener.onStatementStart( + HiveThriftServer2.eventManager.onStatementStart( statementId, parentSession.getSessionHandle.getSessionId.toString, logMsg, @@ -93,16 +93,16 @@ private[hive] class SparkGetSchemasOperation( setState(OperationState.ERROR) e match { case hiveException: HiveSQLException => - HiveThriftServer2.listener.onStatementError( + HiveThriftServer2.eventManager.onStatementError( statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException)) throw hiveException case _ => val root = ExceptionUtils.getRootCause(e) - HiveThriftServer2.listener.onStatementError( + HiveThriftServer2.eventManager.onStatementError( statementId, root.getMessage, SparkUtils.exceptionString(root)) throw new HiveSQLException("Error getting schemas: " + root.toString, root) } } - HiveThriftServer2.listener.onStatementFinish(statementId) + HiveThriftServer2.eventManager.onStatementFinish(statementId) } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala index ec03f1e148e69..b4093e58d3c07 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTableTypesOperation.scala @@ -45,7 +45,7 @@ private[hive] class SparkGetTableTypesOperation( override def close(): Unit = { super.close() - HiveThriftServer2.listener.onOperationClosed(statementId) + HiveThriftServer2.eventManager.onOperationClosed(statementId) } override def runInternal(): Unit = { @@ -61,7 +61,7 @@ private[hive] class SparkGetTableTypesOperation( authorizeMetaGets(HiveOperationType.GET_TABLETYPES, null) } - HiveThriftServer2.listener.onStatementStart( + HiveThriftServer2.eventManager.onStatementStart( statementId, parentSession.getSessionHandle.getSessionId.toString, logMsg, @@ -80,16 +80,16 @@ private[hive] class SparkGetTableTypesOperation( setState(OperationState.ERROR) e match { case hiveException: HiveSQLException => - HiveThriftServer2.listener.onStatementError( + HiveThriftServer2.eventManager.onStatementError( statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException)) throw hiveException case _ => val root = ExceptionUtils.getRootCause(e) - HiveThriftServer2.listener.onStatementError( + HiveThriftServer2.eventManager.onStatementError( statementId, root.getMessage, SparkUtils.exceptionString(root)) throw new HiveSQLException("Error getting table types: " + root.toString, root) } } - HiveThriftServer2.listener.onStatementFinish(statementId) + HiveThriftServer2.eventManager.onStatementFinish(statementId) } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala index bf9cf7ad46d95..45c6d980aac47 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala @@ -59,7 +59,7 @@ private[hive] class SparkGetTablesOperation( override def close(): Unit = { super.close() - HiveThriftServer2.listener.onOperationClosed(statementId) + HiveThriftServer2.eventManager.onOperationClosed(statementId) } override def runInternal(): Unit = { @@ -85,7 +85,7 @@ private[hive] class SparkGetTablesOperation( authorizeMetaGets(HiveOperationType.GET_TABLES, privObjs, cmdStr) } - HiveThriftServer2.listener.onStatementStart( + HiveThriftServer2.eventManager.onStatementStart( statementId, parentSession.getSessionHandle.getSessionId.toString, logMsg, @@ -124,17 +124,17 @@ private[hive] class SparkGetTablesOperation( setState(OperationState.ERROR) e match { case hiveException: HiveSQLException => - HiveThriftServer2.listener.onStatementError( + HiveThriftServer2.eventManager.onStatementError( statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException)) throw hiveException case _ => val root = ExceptionUtils.getRootCause(e) - HiveThriftServer2.listener.onStatementError( + HiveThriftServer2.eventManager.onStatementError( statementId, root.getMessage, SparkUtils.exceptionString(root)) throw new HiveSQLException("Error getting tables: " + root.toString, root) } } - HiveThriftServer2.listener.onStatementFinish(statementId) + HiveThriftServer2.eventManager.onStatementFinish(statementId) } private def addToRowSet( diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTypeInfoOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTypeInfoOperation.scala index 0d263b09d57d3..dd5668a93f82d 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTypeInfoOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTypeInfoOperation.scala @@ -44,7 +44,7 @@ private[hive] class SparkGetTypeInfoOperation( override def close(): Unit = { super.close() - HiveThriftServer2.listener.onOperationClosed(statementId) + HiveThriftServer2.eventManager.onOperationClosed(statementId) } override def runInternal(): Unit = { @@ -60,7 +60,7 @@ private[hive] class SparkGetTypeInfoOperation( authorizeMetaGets(HiveOperationType.GET_TYPEINFO, null) } - HiveThriftServer2.listener.onStatementStart( + HiveThriftServer2.eventManager.onStatementStart( statementId, parentSession.getSessionHandle.getSessionId.toString, logMsg, @@ -98,16 +98,16 @@ private[hive] class SparkGetTypeInfoOperation( setState(OperationState.ERROR) e match { case hiveException: HiveSQLException => - HiveThriftServer2.listener.onStatementError( + HiveThriftServer2.eventManager.onStatementError( statementId, hiveException.getMessage, SparkUtils.exceptionString(hiveException)) throw hiveException case _ => val root = ExceptionUtils.getRootCause(e) - HiveThriftServer2.listener.onStatementError( + HiveThriftServer2.eventManager.onStatementError( statementId, root.getMessage, SparkUtils.exceptionString(root)) throw new HiveSQLException("Error getting type info: " + root.toString, root) } } - HiveThriftServer2.listener.onStatementFinish(statementId) + HiveThriftServer2.eventManager.onStatementFinish(statementId) } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala index 41b324d70c315..b3171897141c2 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLSessionManager.scala @@ -55,7 +55,7 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext: super.openSession(protocol, username, passwd, ipAddress, sessionConf, withImpersonation, delegationToken) val session = super.getSession(sessionHandle) - HiveThriftServer2.listener.onSessionCreated( + HiveThriftServer2.eventManager.onSessionCreated( session.getIpAddress, sessionHandle.getSessionId.toString, session.getUsername) val ctx = if (sqlContext.conf.hiveThriftServerSingleSession) { sqlContext @@ -74,7 +74,7 @@ private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext: } override def closeSession(sessionHandle: SessionHandle): Unit = { - HiveThriftServer2.listener.onSessionClosed(sessionHandle.getSessionId.toString) + HiveThriftServer2.eventManager.onSessionClosed(sessionHandle.getSessionId.toString) val ctx = sparkSqlOperationManager.sessionToContexts.getOrDefault(sessionHandle, sqlContext) ctx.sparkSession.sessionState.catalog.getTempViewNames().foreach(ctx.uncacheTable) super.closeSession(sessionHandle) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2AppStatusStore.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2AppStatusStore.scala new file mode 100644 index 0000000000000..5cb78f6e64650 --- /dev/null +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2AppStatusStore.scala @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.thriftserver.ui + +import com.fasterxml.jackson.annotation.JsonIgnore +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.ExecutionState +import org.apache.spark.status.KVUtils.KVIndexParam +import org.apache.spark.util.kvstore.{KVIndex, KVStore} + +/** + * Provides a view of a KVStore with methods that make it easy to query SQL-specific state. There's + * no state kept in this class, so it's ok to have multiple instances of it in an application. + */ +class HiveThriftServer2AppStatusStore( + store: KVStore, + val listener: Option[HiveThriftServer2Listener] = None) { + + def getSessionList: Seq[SessionInfo] = { + store.view(classOf[SessionInfo]).asScala.toSeq + } + + def getExecutionList: Seq[ExecutionInfo] = { + store.view(classOf[ExecutionInfo]).asScala.toSeq + } + + def getOnlineSessionNum: Int = { + store.view(classOf[SessionInfo]).asScala.count(_.finishTimestamp == 0) + } + + def getSession(sessionId: String): Option[SessionInfo] = { + try { + Some(store.read(classOf[SessionInfo], sessionId)) + } catch { + case _: NoSuchElementException => None + } + } + + def getExecution(executionId: String): Option[ExecutionInfo] = { + try { + Some(store.read(classOf[ExecutionInfo], executionId)) + } catch { + case _: NoSuchElementException => None + } + } + + /** + * When an error or a cancellation occurs, we set the finishTimestamp of the statement. + * Therefore, when we count the number of running statements, we need to exclude errors and + * cancellations and count all statements that have not been closed so far. + */ + def getTotalRunning: Int = { + store.view(classOf[ExecutionInfo]).asScala.count(_.isExecutionActive) + } + + def getSessionCount: Long = { + store.count(classOf[SessionInfo]) + } + + def getExecutionCount: Long = { + store.count(classOf[ExecutionInfo]) + } +} + +private[thriftserver] class SessionInfo( + @KVIndexParam val sessionId: String, + val startTimestamp: Long, + val ip: String, + val userName: String, + val finishTimestamp: Long, + val totalExecution: Long) { + @JsonIgnore @KVIndex("finishTime") + private def finishTimeIndex: Long = if (finishTimestamp > 0L ) finishTimestamp else -1L + def totalTime: Long = { + if (finishTimestamp == 0L) { + System.currentTimeMillis - startTimestamp + } else { + finishTimestamp - startTimestamp + } + } +} + +private[thriftserver] class ExecutionInfo( + @KVIndexParam val execId: String, + val statement: String, + val sessionId: String, + val startTimestamp: Long, + val userName: String, + val finishTimestamp: Long, + val closeTimestamp: Long, + val executePlan: String, + val detail: String, + val state: ExecutionState.Value, + val jobId: ArrayBuffer[String], + val groupId: String) { + @JsonIgnore @KVIndex("finishTime") + private def finishTimeIndex: Long = if (finishTimestamp > 0L && !isExecutionActive) { + finishTimestamp + } else -1L + + @JsonIgnore @KVIndex("isExecutionActive") + def isExecutionActive: Boolean = { + !(state == ExecutionState.FAILED || + state == ExecutionState.CANCELED || + state == ExecutionState.CLOSED) + } + + def totalTime(endTime: Long): Long = { + if (endTime == 0L) { + System.currentTimeMillis - startTimestamp + } else { + endTime - startTimestamp + } + } +} diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2EventManager.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2EventManager.scala new file mode 100644 index 0000000000000..fa04c67896a69 --- /dev/null +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2EventManager.scala @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive.thriftserver.ui + +import org.apache.spark.SparkContext +import org.apache.spark.scheduler.SparkListenerEvent + +/** + * This class manages events generated by the thriftserver application. It converts the + * operation and session events to listener events and post it into the live listener bus. + */ +private[thriftserver] class HiveThriftServer2EventManager(sc: SparkContext) { + + def postLiveListenerBus(event: SparkListenerEvent): Unit = { + sc.listenerBus.post(event) + } + + def onSessionCreated(ip: String, sessionId: String, userName: String = "UNKNOWN"): Unit = { + postLiveListenerBus(SparkListenerThriftServerSessionCreated(ip, sessionId, + userName, System.currentTimeMillis())) + } + + def onSessionClosed(sessionId: String): Unit = { + postLiveListenerBus(SparkListenerThriftServerSessionClosed(sessionId, + System.currentTimeMillis())) + } + + def onStatementStart( + id: String, + sessionId: String, + statement: String, + groupId: String, + userName: String = "UNKNOWN"): Unit = { + postLiveListenerBus(SparkListenerThriftServerOperationStart(id, sessionId, statement, groupId, + System.currentTimeMillis(), userName)) + } + + def onStatementParsed(id: String, executionPlan: String): Unit = { + postLiveListenerBus(SparkListenerThriftServerOperationParsed(id, executionPlan)) + } + + def onStatementCanceled(id: String): Unit = { + postLiveListenerBus(SparkListenerThriftServerOperationCanceled(id, System.currentTimeMillis())) + } + + def onStatementError(id: String, errorMsg: String, errorTrace: String): Unit = { + postLiveListenerBus(SparkListenerThriftServerOperationError(id, errorMsg, errorTrace, + System.currentTimeMillis())) + } + + def onStatementFinish(id: String): Unit = { + postLiveListenerBus(SparkListenerThriftServerOperationFinish(id, System.currentTimeMillis())) + + } + + def onOperationClosed(id: String): Unit = { + postLiveListenerBus(SparkListenerThriftServerOperationClosed(id, System.currentTimeMillis())) + } +} + +private[thriftserver] case class SparkListenerThriftServerSessionCreated( + ip: String, + sessionId: String, + userName: String, + startTime: Long) extends SparkListenerEvent + +private[thriftserver] case class SparkListenerThriftServerSessionClosed( + sessionId: String, finishTime: Long) extends SparkListenerEvent + +private[thriftserver] case class SparkListenerThriftServerOperationStart( + id: String, + sessionId: String, + statement: String, + groupId: String, + startTime: Long, + userName: String = "UNKNOWN") extends SparkListenerEvent + +private[thriftserver] case class SparkListenerThriftServerOperationParsed( + id: String, + executionPlan: String) extends SparkListenerEvent + +private[thriftserver] case class SparkListenerThriftServerOperationCanceled( + id: String, finishTime: Long) extends SparkListenerEvent + +private[thriftserver] case class SparkListenerThriftServerOperationError( + id: String, + errorMsg: String, + errorTrace: String, + finishTime: Long) extends SparkListenerEvent + +private[thriftserver] case class SparkListenerThriftServerOperationFinish( + id: String, + finishTime: Long) extends SparkListenerEvent + +private[thriftserver] case class SparkListenerThriftServerOperationClosed( + id: String, + closeTime: Long) extends SparkListenerEvent + + diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2HistoryServerPlugin.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2HistoryServerPlugin.scala new file mode 100644 index 0000000000000..aec4125801f68 --- /dev/null +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2HistoryServerPlugin.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.thriftserver.ui + +import org.apache.spark.SparkConf +import org.apache.spark.scheduler.SparkListener +import org.apache.spark.status.{AppHistoryServerPlugin, ElementTrackingStore} +import org.apache.spark.ui.SparkUI + +class HiveThriftServer2HistoryServerPlugin extends AppHistoryServerPlugin { + + override def createListeners(conf: SparkConf, store: ElementTrackingStore): Seq[SparkListener] = { + Seq(new HiveThriftServer2Listener(store, conf, None, false)) + } + + override def setupUI(ui: SparkUI): Unit = { + val store = new HiveThriftServer2AppStatusStore(ui.store.store) + if (store.getSessionCount > 0) { + new ThriftServerTab(store, ui) + } + } + + override def displayOrder: Int = 1 +} + diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala new file mode 100644 index 0000000000000..6d0a506fa94dc --- /dev/null +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.thriftserver.ui + +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +import org.apache.hive.service.server.HiveServer2 + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.internal.config.Status.LIVE_ENTITY_UPDATE_PERIOD +import org.apache.spark.scheduler._ +import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.ExecutionState +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.status.{ElementTrackingStore, KVUtils, LiveEntity} + +/** + * An inner sparkListener called in sc.stop to clean up the HiveThriftServer2 + */ +private[thriftserver] class HiveThriftServer2Listener( + kvstore: ElementTrackingStore, + sparkConf: SparkConf, + server: Option[HiveServer2], + live: Boolean = true) extends SparkListener { + + private val sessionList = new ConcurrentHashMap[String, LiveSessionData]() + private val executionList = new ConcurrentHashMap[String, LiveExecutionData]() + + private val (retainedStatements: Int, retainedSessions: Int) = { + (sparkConf.get(SQLConf.THRIFTSERVER_UI_STATEMENT_LIMIT), + sparkConf.get(SQLConf.THRIFTSERVER_UI_SESSION_LIMIT)) + } + + // How often to update live entities. -1 means "never update" when replaying applications, + // meaning only the last write will happen. For live applications, this avoids a few + // operations that we can live without when rapidly processing incoming events. + private val liveUpdatePeriodNs = if (live) sparkConf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L + + // Returns true if this listener has no live data. Exposed for tests only. + private[thriftserver] def noLiveData(): Boolean = { + sessionList.isEmpty && executionList.isEmpty + } + + kvstore.addTrigger(classOf[SessionInfo], retainedSessions) { count => + cleanupSession(count) + } + + kvstore.addTrigger(classOf[ExecutionInfo], retainedStatements) { count => + cleanupExecutions(count) + } + + kvstore.onFlush { + if (!live) { + flush((entity: LiveEntity) => updateStoreWithTriggerEnabled(entity)) + } + } + + override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { + if (live) { + server.foreach(_.stop()) + } + } + + override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + val properties = jobStart.properties + if (properties != null) { + val groupId = properties.getProperty(SparkContext.SPARK_JOB_GROUP_ID) + if (groupId != null) { + updateJobDetails(jobStart.jobId.toString, groupId) + } + } + } + + private def updateJobDetails(jobId: String, groupId: String): Unit = { + val execList = executionList.values().asScala.filter(_.groupId == groupId).toSeq + if (execList.nonEmpty) { + execList.foreach { exec => + exec.jobId += jobId.toString + updateLiveStore(exec) + } + } else { + // It may possible that event reordering happens, such a way that JobStart event come after + // Execution end event (Refer SPARK-27019). To handle that situation, if occurs in + // Thriftserver, following code will take care. Here will come only if JobStart event comes + // after Execution End event. + val storeExecInfo = kvstore.view(classOf[ExecutionInfo]).asScala.filter(_.groupId == groupId) + storeExecInfo.foreach { exec => + val liveExec = getOrCreateExecution(exec.execId, exec.statement, exec.sessionId, + exec.startTimestamp, exec.userName) + liveExec.jobId += jobId.toString + updateStoreWithTriggerEnabled(liveExec) + executionList.remove(liveExec.execId) + } + } + } + + override def onOtherEvent(event: SparkListenerEvent): Unit = { + event match { + case e: SparkListenerThriftServerSessionCreated => onSessionCreated(e) + case e: SparkListenerThriftServerSessionClosed => onSessionClosed(e) + case e: SparkListenerThriftServerOperationStart => onOperationStart(e) + case e: SparkListenerThriftServerOperationParsed => onOperationParsed(e) + case e: SparkListenerThriftServerOperationCanceled => onOperationCanceled(e) + case e: SparkListenerThriftServerOperationError => onOperationError(e) + case e: SparkListenerThriftServerOperationFinish => onOperationFinished(e) + case e: SparkListenerThriftServerOperationClosed => onOperationClosed(e) + case _ => // Ignore + } + } + + private def onSessionCreated(e: SparkListenerThriftServerSessionCreated): Unit = { + val session = getOrCreateSession(e.sessionId, e.startTime, e.ip, e.userName) + sessionList.put(e.sessionId, session) + updateLiveStore(session) + } + + private def onSessionClosed(e: SparkListenerThriftServerSessionClosed): Unit = { + val session = sessionList.get(e.sessionId) + session.finishTimestamp = e.finishTime + updateStoreWithTriggerEnabled(session) + sessionList.remove(e.sessionId) + } + + private def onOperationStart(e: SparkListenerThriftServerOperationStart): Unit = { + val info = getOrCreateExecution( + e.id, + e.statement, + e.sessionId, + e.startTime, + e.userName) + + info.state = ExecutionState.STARTED + executionList.put(e.id, info) + sessionList.get(e.sessionId).totalExecution += 1 + executionList.get(e.id).groupId = e.groupId + updateLiveStore(executionList.get(e.id)) + updateLiveStore(sessionList.get(e.sessionId)) + } + + private def onOperationParsed(e: SparkListenerThriftServerOperationParsed): Unit = { + executionList.get(e.id).executePlan = e.executionPlan + executionList.get(e.id).state = ExecutionState.COMPILED + updateLiveStore(executionList.get(e.id)) + } + + private def onOperationCanceled(e: SparkListenerThriftServerOperationCanceled): Unit = { + executionList.get(e.id).finishTimestamp = e.finishTime + executionList.get(e.id).state = ExecutionState.CANCELED + updateLiveStore(executionList.get(e.id)) + } + + private def onOperationError(e: SparkListenerThriftServerOperationError): Unit = { + executionList.get(e.id).finishTimestamp = e.finishTime + executionList.get(e.id).detail = e.errorMsg + executionList.get(e.id).state = ExecutionState.FAILED + updateLiveStore(executionList.get(e.id)) + } + + private def onOperationFinished(e: SparkListenerThriftServerOperationFinish): Unit = { + executionList.get(e.id).finishTimestamp = e.finishTime + executionList.get(e.id).state = ExecutionState.FINISHED + updateLiveStore(executionList.get(e.id)) + } + + private def onOperationClosed(e: SparkListenerThriftServerOperationClosed): Unit = { + executionList.get(e.id).closeTimestamp = e.closeTime + executionList.get(e.id).state = ExecutionState.CLOSED + updateStoreWithTriggerEnabled(executionList.get(e.id)) + executionList.remove(e.id) + } + + // Update both live and history stores. Trigger is enabled by default, hence + // it will cleanup the entity which exceeds the threshold. + def updateStoreWithTriggerEnabled(entity: LiveEntity): Unit = { + entity.write(kvstore, System.nanoTime(), checkTriggers = true) + } + + // Update only live stores. If trigger is enabled, it will cleanup entity + // which exceeds the threshold. + def updateLiveStore(entity: LiveEntity, trigger: Boolean = false): Unit = { + val now = System.nanoTime() + if (live && liveUpdatePeriodNs >= 0 && now - entity.lastWriteTime > liveUpdatePeriodNs) { + entity.write(kvstore, now, checkTriggers = trigger) + } + } + + /** Go through all `LiveEntity`s and use `entityFlushFunc(entity)` to flush them. */ + private def flush(entityFlushFunc: LiveEntity => Unit): Unit = { + sessionList.values.asScala.foreach(entityFlushFunc) + executionList.values.asScala.foreach(entityFlushFunc) + } + + private def getOrCreateSession( + sessionId: String, + startTime: Long, + ip: String, + username: String): LiveSessionData = { + sessionList.computeIfAbsent(sessionId, + (_: String) => new LiveSessionData(sessionId, startTime, ip, username)) + } + + private def getOrCreateExecution( + execId: String, statement: String, + sessionId: String, startTimestamp: Long, + userName: String): LiveExecutionData = { + executionList.computeIfAbsent(execId, + (_: String) => new LiveExecutionData(execId, statement, sessionId, startTimestamp, userName)) + } + + private def cleanupExecutions(count: Long): Unit = { + val countToDelete = calculateNumberToRemove(count, retainedStatements) + if (countToDelete <= 0L) { + return + } + val view = kvstore.view(classOf[ExecutionInfo]).index("finishTime").first(0L) + val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt) { j => + j.finishTimestamp != 0 + } + toDelete.foreach { j => kvstore.delete(j.getClass, j.execId) } + } + + private def cleanupSession(count: Long): Unit = { + val countToDelete = calculateNumberToRemove(count, retainedSessions) + if (countToDelete <= 0L) { + return + } + val view = kvstore.view(classOf[SessionInfo]).index("finishTime").first(0L) + val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt) { j => + j.finishTimestamp != 0L + } + + toDelete.foreach { j => kvstore.delete(j.getClass, j.sessionId) } + } + + /** + * Remove at least (retainedSize / 10) items to reduce friction. Because tracking may be done + * asynchronously, this method may return 0 in case enough items have been deleted already. + */ + private def calculateNumberToRemove(dataSize: Long, retainedSize: Long): Long = { + if (dataSize > retainedSize) { + math.max(retainedSize / 10L, dataSize - retainedSize) + } else { + 0L + } + } +} + +private[thriftserver] class LiveExecutionData( + val execId: String, + val statement: String, + val sessionId: String, + val startTimestamp: Long, + val userName: String) extends LiveEntity { + + var finishTimestamp: Long = 0L + var closeTimestamp: Long = 0L + var executePlan: String = "" + var detail: String = "" + var state: ExecutionState.Value = ExecutionState.STARTED + val jobId: ArrayBuffer[String] = ArrayBuffer[String]() + var groupId: String = "" + + override protected def doUpdate(): Any = { + new ExecutionInfo( + execId, + statement, + sessionId, + startTimestamp, + userName, + finishTimestamp, + closeTimestamp, + executePlan, + detail, + state, + jobId, + groupId) + } +} + +private[thriftserver] class LiveSessionData( + val sessionId: String, + val startTimeStamp: Long, + val ip: String, + val username: String) extends LiveEntity { + + var finishTimestamp: Long = 0L + var totalExecution: Int = 0 + + override protected def doUpdate(): Any = { + new SessionInfo( + sessionId, + startTimeStamp, + ip, + username, + finishTimestamp, + totalExecution) + } +} diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala index d3351f3d6ca14..adfda0c56585f 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPage.scala @@ -28,7 +28,6 @@ import scala.xml.{Node, Unparsed} import org.apache.commons.text.StringEscapeUtils import org.apache.spark.internal.Logging -import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.{ExecutionInfo, SessionInfo} import org.apache.spark.sql.hive.thriftserver.ui.ToolTips._ import org.apache.spark.ui._ import org.apache.spark.ui.UIUtils._ @@ -36,23 +35,24 @@ import org.apache.spark.util.Utils /** Page for Spark Web UI that shows statistics of the thrift server */ private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage("") with Logging { - - private val listener = parent.listener - private val startTime = Calendar.getInstance().getTime() + private val store = parent.store + private val startTime = parent.startTime /** Render the page */ def render(request: HttpServletRequest): Seq[Node] = { - val content = - listener.synchronized { // make sure all parts in this page are consistent - generateBasicStats() ++ -
++ + val content = store.synchronized { // make sure all parts in this page are consistent + generateBasicStats() ++ +
++

- {listener.getOnlineSessionNum} session(s) are online, - running {listener.getTotalRunning} SQL statement(s) + {store.getOnlineSessionNum} + session(s) are online, + running + {store.getTotalRunning} + SQL statement(s)

++ generateSessionStatsTable(request) ++ generateSQLStatsTable(request) - } + } UIUtils.headerSparkPage(request, "JDBC/ODBC Server", content, parent) } @@ -72,7 +72,7 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage("" /** Generate stats of batch statements of the thrift server program */ private def generateSQLStatsTable(request: HttpServletRequest): Seq[Node] = { - val numStatement = listener.getExecutionList.size + val numStatement = store.getExecutionList.size val table = if (numStatement > 0) { @@ -103,7 +103,7 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage("" Some(new SqlStatsPagedTable( request, parent, - listener.getExecutionList, + store.getExecutionList, "sqlserver", UIUtils.prependBaseUri(request, parent.basePath), parameterOtherTable, @@ -141,7 +141,7 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage("" /** Generate stats of batch sessions of the thrift server program */ private def generateSessionStatsTable(request: HttpServletRequest): Seq[Node] = { - val numSessions = listener.getSessionList.size + val numSessions = store.getSessionList.size val table = if (numSessions > 0) { val sessionTableTag = "sessionstat" @@ -171,7 +171,7 @@ private[ui] class ThriftServerPage(parent: ThriftServerTab) extends WebUIPage("" Some(new SessionStatsPagedTable( request, parent, - listener.getSessionList, + store.getSessionList, "sqlserver", UIUtils.prependBaseUri(request, parent.basePath), parameterOtherTable, diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerSessionPage.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerSessionPage.scala index aa2f495db5651..c46c3d6b68a43 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerSessionPage.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerSessionPage.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.hive.thriftserver.ui -import java.util.Calendar import javax.servlet.http.HttpServletRequest import scala.collection.JavaConverters._ @@ -31,18 +30,16 @@ import org.apache.spark.util.Utils /** Page for Spark Web UI that shows statistics of jobs running in the thrift server */ private[ui] class ThriftServerSessionPage(parent: ThriftServerTab) extends WebUIPage("session") with Logging { - - private val listener = parent.listener - private val startTime = Calendar.getInstance().getTime() + val store = parent.store + private val startTime = parent.startTime /** Render the page */ def render(request: HttpServletRequest): Seq[Node] = { val parameterId = request.getParameter("id") require(parameterId != null && parameterId.nonEmpty, "Missing id parameter") - val content = - listener.synchronized { // make sure all parts in this page are consistent - val sessionStat = listener.getSession(parameterId).getOrElse(null) + val content = store.synchronized { // make sure all parts in this page are consistent + val sessionStat = store.getSession(parameterId).getOrElse(null) require(sessionStat != null, "Invalid sessionID[" + parameterId + "]") generateBasicStats() ++ @@ -73,7 +70,7 @@ private[ui] class ThriftServerSessionPage(parent: ThriftServerTab) /** Generate stats of batch statements of the thrift server program */ private def generateSQLStatsTable(request: HttpServletRequest, sessionID: String): Seq[Node] = { - val executionList = listener.getExecutionList + val executionList = store.getExecutionList .filter(_.sessionId == sessionID) val numStatement = executionList.size val table = if (numStatement > 0) { diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerTab.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerTab.scala index 8efb2c3311cfe..6d783b1c555a7 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerTab.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerTab.scala @@ -19,28 +19,25 @@ package org.apache.spark.sql.hive.thriftserver.ui import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.internal.Logging -import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 -import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerTab._ import org.apache.spark.ui.{SparkUI, SparkUITab} /** * Spark Web UI tab that shows statistics of jobs running in the thrift server. * This assumes the given SparkContext has enabled its SparkUI. */ -private[thriftserver] class ThriftServerTab(sparkContext: SparkContext) - extends SparkUITab(getSparkUI(sparkContext), "sqlserver") with Logging { - +private[thriftserver] class ThriftServerTab( + val store: HiveThriftServer2AppStatusStore, + sparkUI: SparkUI) extends SparkUITab(sparkUI, "sqlserver") with Logging { override val name = "JDBC/ODBC Server" - val parent = getSparkUI(sparkContext) - val listener = HiveThriftServer2.listener + val parent = sparkUI + val startTime = sparkUI.store.applicationInfo().attempts.head.startTime attachPage(new ThriftServerPage(this)) attachPage(new ThriftServerSessionPage(this)) parent.attachTab(this) - def detach(): Unit = { - getSparkUI(sparkContext).detachTab(this) + sparkUI.detachTab(this) } } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala new file mode 100644 index 0000000000000..075032fa5d099 --- /dev/null +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.thriftserver.ui + +import java.util.Properties + +import org.mockito.Mockito.{mock, RETURNS_SMART_NULLS} +import org.scalatest.BeforeAndAfter + +import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.internal.config.Status.{ASYNC_TRACKING_ENABLED, LIVE_ENTITY_UPDATE_PERIOD} +import org.apache.spark.scheduler.SparkListenerJobStart +import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.status.ElementTrackingStore +import org.apache.spark.util.kvstore.InMemoryStore + +class HiveThriftServer2ListenerSuite extends SparkFunSuite with BeforeAndAfter { + + private var kvstore: ElementTrackingStore = _ + + after { + if (kvstore != null) { + kvstore.close() + kvstore = null + } + } + + Seq(true, false).foreach { live => + test(s"listener events should store successfully (live = $live)") { + val (statusStore: HiveThriftServer2AppStatusStore, + listener: HiveThriftServer2Listener) = createAppStatusStore(live) + + listener.onOtherEvent(SparkListenerThriftServerSessionCreated("localhost", "sessionId", + "user", System.currentTimeMillis())) + listener.onOtherEvent(SparkListenerThriftServerOperationStart("id", "sessionId", + "dummy query", "groupId", System.currentTimeMillis(), "user")) + listener.onOtherEvent(SparkListenerThriftServerOperationParsed("id", "dummy plan")) + listener.onJobStart(SparkListenerJobStart( + 0, + System.currentTimeMillis(), + Nil, + createProperties)) + listener.onOtherEvent(SparkListenerThriftServerOperationFinish("id", + System.currentTimeMillis())) + listener.onOtherEvent(SparkListenerThriftServerOperationClosed("id", + System.currentTimeMillis())) + + if (live) { + assert(statusStore.getOnlineSessionNum === 1) + } + + listener.onOtherEvent(SparkListenerThriftServerSessionClosed("sessionId", + System.currentTimeMillis())) + + if (!live) { + // To update history store + kvstore.close(false) + } + assert(statusStore.getOnlineSessionNum === 0) + assert(statusStore.getExecutionList.size === 1) + + val storeExecData = statusStore.getExecutionList.head + + assert(storeExecData.execId === "id") + assert(storeExecData.sessionId === "sessionId") + assert(storeExecData.executePlan === "dummy plan") + assert(storeExecData.jobId === Seq("0")) + assert(listener.noLiveData()) + } + } + + Seq(true, false).foreach { live => + test(s"cleanup session if exceeds the threshold (live = $live)") { + val (statusStore: HiveThriftServer2AppStatusStore, + listener: HiveThriftServer2Listener) = createAppStatusStore(true) + var time = 0 + listener.onOtherEvent(SparkListenerThriftServerSessionCreated("localhost", "sessionId1", + "user", time)) + time += 1 + listener.onOtherEvent(SparkListenerThriftServerSessionCreated("localhost", "sessionId2", + "user", time)) + time += 1 + listener.onOtherEvent(SparkListenerThriftServerSessionClosed("sessionId1", time)) + time += 1 + listener.onOtherEvent(SparkListenerThriftServerSessionClosed("sessionId2", time)) + listener.onOtherEvent(SparkListenerThriftServerSessionCreated("localhost", "sessionId3", + "user", time)) + time += 1 + listener.onOtherEvent(SparkListenerThriftServerSessionClosed("sessionId3", time)) + + if (!live) { + kvstore.close(false) + } + assert(statusStore.getOnlineSessionNum === 0) + assert(statusStore.getSessionCount === 1) + assert(statusStore.getSession("sessionId1") === None) + assert(listener.noLiveData()) + } + } + + test("update execution info when jobstart event come after execution end event") { + val (statusStore: HiveThriftServer2AppStatusStore, + listener: HiveThriftServer2Listener) = createAppStatusStore(true) + + listener.onOtherEvent(SparkListenerThriftServerSessionCreated("localhost", "sessionId", "user", + System.currentTimeMillis())) + listener.onOtherEvent(SparkListenerThriftServerOperationStart("id", "sessionId", "dummy query", + "groupId", System.currentTimeMillis(), "user")) + listener.onOtherEvent(SparkListenerThriftServerOperationParsed("id", "dummy plan")) + listener.onOtherEvent(SparkListenerThriftServerOperationFinish("id", + System.currentTimeMillis())) + listener.onOtherEvent(SparkListenerThriftServerOperationClosed("id", + System.currentTimeMillis())) + listener.onJobStart(SparkListenerJobStart( + 0, + System.currentTimeMillis(), + Nil, + createProperties)) + listener.onOtherEvent(SparkListenerThriftServerSessionClosed("sessionId", + System.currentTimeMillis())) + val exec = statusStore.getExecution("id") + assert(exec.isDefined) + assert(exec.get.jobId === Seq("0")) + assert(listener.noLiveData()) + } + + private def createProperties: Properties = { + val properties = new Properties() + properties.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "groupId") + properties + } + + private def createAppStatusStore(live: Boolean) = { + val sparkConf = new SparkConf() + sparkConf.set(ASYNC_TRACKING_ENABLED, false) + .set(SQLConf.THRIFTSERVER_UI_SESSION_LIMIT, 1) + .set(LIVE_ENTITY_UPDATE_PERIOD, 0L) + kvstore = new ElementTrackingStore(new InMemoryStore, sparkConf) + if (live) { + val server = mock(classOf[HiveThriftServer2], RETURNS_SMART_NULLS) + val listener = new HiveThriftServer2Listener(kvstore, sparkConf, Some(server)) + (new HiveThriftServer2AppStatusStore(kvstore, Some(listener)), listener) + } else { + (new HiveThriftServer2AppStatusStore(kvstore), + new HiveThriftServer2Listener(kvstore, sparkConf, None, false)) + } + } +} diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPageSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPageSuite.scala index 6f040d3f1095b..9f3c2957a182d 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPageSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerPageSuite.scala @@ -17,39 +17,64 @@ package org.apache.spark.sql.hive.thriftserver.ui -import java.util.Locale +import java.util.{Calendar, Locale} import javax.servlet.http.HttpServletRequest import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS} +import org.scalatest.BeforeAndAfter -import org.apache.spark.SparkFunSuite +import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.scheduler.SparkListenerJobStart -import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2 -import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.HiveThriftServer2Listener -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.hive.thriftserver._ +import org.apache.spark.status.ElementTrackingStore +import org.apache.spark.util.kvstore.InMemoryStore -class ThriftServerPageSuite extends SparkFunSuite { + +class ThriftServerPageSuite extends SparkFunSuite with BeforeAndAfter { + + private var kvstore: ElementTrackingStore = _ + + after { + if (kvstore != null) { + kvstore.close() + kvstore = null + } + } /** - * Run a dummy session and return the listener + * Run a dummy session and return the store */ - private def getListener: HiveThriftServer2Listener = { - val listener = new HiveThriftServer2Listener(mock(classOf[HiveThriftServer2]), new SQLConf) - - listener.onSessionCreated("localhost", "sessionid", "user") - listener.onStatementStart("id", "sessionid", "dummy query", "groupid", "user") - listener.onStatementParsed("id", "dummy plan") - listener.onJobStart(SparkListenerJobStart(0, System.currentTimeMillis(), Seq())) - listener.onStatementFinish("id") - listener.onOperationClosed("id") - listener.onSessionClosed("sessionid") - listener + private def getStatusStore: HiveThriftServer2AppStatusStore = { + kvstore = new ElementTrackingStore(new InMemoryStore, new SparkConf()) + val server = mock(classOf[HiveThriftServer2], RETURNS_SMART_NULLS) + val sparkConf = new SparkConf + + val listener = new HiveThriftServer2Listener(kvstore, sparkConf, Some(server)) + val statusStore = new HiveThriftServer2AppStatusStore(kvstore, Some(listener)) + + listener.onOtherEvent(SparkListenerThriftServerSessionCreated("localhost", "sessionid", "user", + System.currentTimeMillis())) + listener.onOtherEvent(SparkListenerThriftServerOperationStart("id", "sessionid", + "dummy query", "groupid", System.currentTimeMillis(), "user")) + listener.onOtherEvent(SparkListenerThriftServerOperationParsed("id", "dummy plan")) + listener.onOtherEvent(SparkListenerJobStart(0, System.currentTimeMillis(), Seq())) + listener.onOtherEvent(SparkListenerThriftServerOperationFinish("id", + System.currentTimeMillis())) + listener.onOtherEvent(SparkListenerThriftServerOperationClosed("id", + System.currentTimeMillis())) + listener.onOtherEvent(SparkListenerThriftServerSessionClosed("sessionid", + System.currentTimeMillis())) + + statusStore } test("thriftserver page should load successfully") { + val store = getStatusStore + val request = mock(classOf[HttpServletRequest]) val tab = mock(classOf[ThriftServerTab], RETURNS_SMART_NULLS) - when(tab.listener).thenReturn(getListener) + when(tab.startTime).thenReturn(Calendar.getInstance().getTime) + when(tab.store).thenReturn(store) when(tab.appName).thenReturn("testing") when(tab.headerTabs).thenReturn(Seq.empty) val page = new ThriftServerPage(tab) @@ -70,10 +95,13 @@ class ThriftServerPageSuite extends SparkFunSuite { } test("thriftserver session page should load successfully") { + val store = getStatusStore + val request = mock(classOf[HttpServletRequest]) when(request.getParameter("id")).thenReturn("sessionid") val tab = mock(classOf[ThriftServerTab], RETURNS_SMART_NULLS) - when(tab.listener).thenReturn(getListener) + when(tab.startTime).thenReturn(Calendar.getInstance().getTime) + when(tab.store).thenReturn(store) when(tab.appName).thenReturn("testing") when(tab.headerTabs).thenReturn(Seq.empty) val page = new ThriftServerSessionPage(tab)