Skip to content

Commit

Permalink
[SPARK-23670][SQL] Fix memory leak on SparkPlanGraphWrapper
Browse files Browse the repository at this point in the history
Clean up SparkPlanGraphWrapper objects from InMemoryStore together with cleaning up SQLExecutionUIData
existing unit test was extended to check also SparkPlanGraphWrapper object count

vanzin

Author: myroslavlisniak <acnipin@gmail.com>

Closes #20813 from myroslavlisniak/master.

(cherry picked from commit c2632ed)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
  • Loading branch information
myroslavlisniak authored and Marcelo Vanzin committed Mar 16, 2018
1 parent 99f5c0b commit d9e1f70
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,10 @@ class SQLAppStatusListener(

val view = kvstore.view(classOf[SQLExecutionUIData]).index("completionTime").first(0L)
val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt)(_.completionTime.isDefined)
toDelete.foreach { e => kvstore.delete(e.getClass(), e.executionId) }
toDelete.foreach { e =>
kvstore.delete(e.getClass(), e.executionId)
kvstore.delete(classOf[SparkPlanGraphWrapper], e.executionId)
}
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ class SQLAppStatusStore(
store.count(classOf[SQLExecutionUIData])
}

def planGraphCount(): Long = {
store.count(classOf[SparkPlanGraphWrapper])
}

def executionMetrics(executionId: Long): Map[Long, String] = {
def metricsFromStore(): Option[Map[Long, String]] = {
val exec = store.read(classOf[SQLExecutionUIData], executionId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,7 @@ class SQLAppStatusListenerMemoryLeakSuite extends SparkFunSuite {
sc.listenerBus.waitUntilEmpty(10000)
val statusStore = spark.sharedState.statusStore
assert(statusStore.executionsCount() <= 50)
assert(statusStore.planGraphCount() <= 50)
// No live data should be left behind after all executions end.
assert(statusStore.listener.get.noLiveData())
}
Expand Down

0 comments on commit d9e1f70

Please sign in to comment.