Skip to content

Commit

Permalink
add
Browse files Browse the repository at this point in the history
  • Loading branch information
shahidki31 committed Nov 5, 2019
1 parent e965c5f commit 5c2932e
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,10 @@ object HiveThriftServer2 extends Logging {
server.start()
val kvStore = SparkSQLEnv.sqlContext.sparkContext
.statusStore.store.asInstanceOf[ElementTrackingStore]
listener = new HiveThriftServer2Listener(kvStore, Some(server), Some(sqlContext))
sqlContext.sparkContext.addSparkListener(listener)
uiTab = if (sqlContext.sparkContext.getConf.get(UI_ENABLED)) {
val sc = sqlContext.sparkContext
listener = new HiveThriftServer2Listener(kvStore, Some(server), Some(sqlContext), Some(sc))
sc.addSparkListener(listener)
uiTab = if (sc.getConf.get(UI_ENABLED)) {
Some(new ThriftServerTab(new HiveThriftServer2AppStatusStore(kvStore, Some(listener)),
getSparkUI(sqlContext.sparkContext)))
} else {
Expand Down Expand Up @@ -108,9 +109,11 @@ object HiveThriftServer2 extends Logging {
server.start()
logInfo("HiveThriftServer2 started")
val kvStore = SparkSQLEnv.sparkContext.statusStore.store.asInstanceOf[ElementTrackingStore]
listener = new HiveThriftServer2Listener(kvStore, Some(server), Some(SparkSQLEnv.sqlContext))
SparkSQLEnv.sparkContext.addSparkListener(listener)
uiTab = if (SparkSQLEnv.sparkContext.getConf.get(UI_ENABLED)) {
val sc = SparkSQLEnv.sparkContext
listener = new HiveThriftServer2Listener(kvStore, Some(server),
Some(SparkSQLEnv.sqlContext), Some(sc))
sc.addSparkListener(listener)
uiTab = if (sc.getConf.get(UI_ENABLED)) {
Some(new ThriftServerTab(new HiveThriftServer2AppStatusStore(kvStore, Some(listener)),
getSparkUI(SparkSQLEnv.sparkContext)))
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.ui.SparkUI
class HiveThriftServer2HistoryServerPlugin extends AppHistoryServerPlugin {

override def createListeners(conf: SparkConf, store: ElementTrackingStore): Seq[SparkListener] = {
Seq(new HiveThriftServer2Listener(store, None, None, Some(conf), false))
Seq(new HiveThriftServer2Listener(store, None, None, None, Some(conf), false))
}

override def setupUI(ui: SparkUI): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ private[thriftserver] class HiveThriftServer2Listener(
kvstore: ElementTrackingStore,
server: Option[HiveServer2],
sqlContext: Option[SQLContext],
sc: Option[SparkContext],
sparkConf: Option[SparkConf] = None,
live: Boolean = true) extends SparkListener {

Expand All @@ -56,12 +57,6 @@ private[thriftserver] class HiveThriftServer2Listener(
}
}

private val sc: Option[SparkContext] = if (live) {
Some(sqlContext.get.sparkContext)
} else {
None
}

kvstore.addTrigger(classOf[SessionInfo], retainedSessions) { count =>
cleanupSession(count)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class HiveThriftServer2ListenerSuite extends SparkFunSuite
val sqlConf = new SQLConf
when(sqlContext.conf).thenReturn(sqlConf)
when(sqlContext.sparkContext).thenReturn(sc)
val listener = new HiveThriftServer2Listener(kvstore, Some(server), Some(sqlContext))
val listener = new HiveThriftServer2Listener(kvstore, Some(server), Some(sqlContext), Some(sc))
new HiveThriftServer2AppStatusStore(kvstore, Some(listener))
}

Expand Down

0 comments on commit 5c2932e

Please sign in to comment.