From 5c2932e2d8162922c78fee3a4e2fa353bfa1472a Mon Sep 17 00:00:00 2001 From: shahid Date: Wed, 6 Nov 2019 01:01:18 +0530 Subject: [PATCH] add --- .../sql/hive/thriftserver/HiveThriftServer2.scala | 15 +++++++++------ .../HiveThriftServer2HistoryServerPlugin.scala | 2 +- .../thriftserver/HiveThriftServer2Listener.scala | 7 +------ .../HiveThriftServer2ListenerSuite.scala | 2 +- 4 files changed, 12 insertions(+), 14 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala index 3e0233f36dc40..f98f4895c7e24 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala @@ -61,9 +61,10 @@ object HiveThriftServer2 extends Logging { server.start() val kvStore = SparkSQLEnv.sqlContext.sparkContext .statusStore.store.asInstanceOf[ElementTrackingStore] - listener = new HiveThriftServer2Listener(kvStore, Some(server), Some(sqlContext)) - sqlContext.sparkContext.addSparkListener(listener) - uiTab = if (sqlContext.sparkContext.getConf.get(UI_ENABLED)) { + val sc = sqlContext.sparkContext + listener = new HiveThriftServer2Listener(kvStore, Some(server), Some(sqlContext), Some(sc)) + sc.addSparkListener(listener) + uiTab = if (sc.getConf.get(UI_ENABLED)) { Some(new ThriftServerTab(new HiveThriftServer2AppStatusStore(kvStore, Some(listener)), getSparkUI(sqlContext.sparkContext))) } else { @@ -108,9 +109,11 @@ object HiveThriftServer2 extends Logging { server.start() logInfo("HiveThriftServer2 started") val kvStore = SparkSQLEnv.sparkContext.statusStore.store.asInstanceOf[ElementTrackingStore] - listener = new HiveThriftServer2Listener(kvStore, Some(server), Some(SparkSQLEnv.sqlContext)) - SparkSQLEnv.sparkContext.addSparkListener(listener) - uiTab = if (SparkSQLEnv.sparkContext.getConf.get(UI_ENABLED)) { + val sc = SparkSQLEnv.sparkContext + listener = new HiveThriftServer2Listener(kvStore, Some(server), + Some(SparkSQLEnv.sqlContext), Some(sc)) + sc.addSparkListener(listener) + uiTab = if (sc.getConf.get(UI_ENABLED)) { Some(new ThriftServerTab(new HiveThriftServer2AppStatusStore(kvStore, Some(listener)), getSparkUI(SparkSQLEnv.sparkContext))) } else { diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2HistoryServerPlugin.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2HistoryServerPlugin.scala index 6010daaef6b04..1dcb3ee4f2f8a 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2HistoryServerPlugin.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2HistoryServerPlugin.scala @@ -26,7 +26,7 @@ import org.apache.spark.ui.SparkUI class HiveThriftServer2HistoryServerPlugin extends AppHistoryServerPlugin { override def createListeners(conf: SparkConf, store: ElementTrackingStore): Seq[SparkListener] = { - Seq(new HiveThriftServer2Listener(store, None, None, Some(conf), false)) + Seq(new HiveThriftServer2Listener(store, None, None, None, Some(conf), false)) } override def setupUI(ui: SparkUI): Unit = { diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Listener.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Listener.scala index 44018d158dfe2..e9a1227985757 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Listener.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Listener.scala @@ -38,6 +38,7 @@ private[thriftserver] class HiveThriftServer2Listener( kvstore: ElementTrackingStore, server: Option[HiveServer2], sqlContext: Option[SQLContext], + sc: Option[SparkContext], sparkConf: Option[SparkConf] = None, live: Boolean = true) extends SparkListener { @@ -56,12 +57,6 @@ private[thriftserver] class HiveThriftServer2Listener( } } - private val sc: Option[SparkContext] = if (live) { - Some(sqlContext.get.sparkContext) - } else { - None - } - kvstore.addTrigger(classOf[SessionInfo], retainedSessions) { count => cleanupSession(count) } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2ListenerSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2ListenerSuite.scala index b684178058857..03c09107df1b5 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2ListenerSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2ListenerSuite.scala @@ -56,7 +56,7 @@ class HiveThriftServer2ListenerSuite extends SparkFunSuite val sqlConf = new SQLConf when(sqlContext.conf).thenReturn(sqlConf) when(sqlContext.sparkContext).thenReturn(sc) - val listener = new HiveThriftServer2Listener(kvstore, Some(server), Some(sqlContext)) + val listener = new HiveThriftServer2Listener(kvstore, Some(server), Some(sqlContext), Some(sc)) new HiveThriftServer2AppStatusStore(kvstore, Some(listener)) }