From 3c72cd8bb3237454e0866deb274d80db1d07f897 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sat, 14 Mar 2015 21:08:07 -0400 Subject: [PATCH] address the comments --- .../spark/util/AsynchronousListenerBus.scala | 2 ++ .../main/scala/org/apache/spark/util/Utils.scala | 16 ++++++++++++++-- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala index ce6e314463baa..ce7887b76ff96 100644 --- a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala @@ -92,6 +92,8 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri * This first sends out all buffered events posted before this listener bus has started, then * listens for any additional events asynchronously while the listener bus is still running. * This should only be called once. + * + * @param sc Used to stop the SparkContext in case the listener thread dies. */ def start(sc: SparkContext) { if (started.compareAndSet(false, true)) { diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 0d7c5cf2d27a7..bf732faf77705 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1145,6 +1145,8 @@ private[spark] object Utils extends Logging { /** * Execute a block of code that evaluates to Unit, forwarding any uncaught exceptions to the * default UncaughtExceptionHandler + * + * NOTE: This method is to be called by the spark-started JVM process. */ def tryOrExit(block: => Unit) { try { @@ -1156,14 +1158,24 @@ private[spark] object Utils extends Logging { } /** - * Execute a block of code that evaluates to Unit, stop SparkContext is any uncaught exception + * Execute a block of code that evaluates to Unit, stop SparkContext is there is any uncaught + * exception + * + * NOTE: This method is to be called by the driver-side components to avoid stopping the + * user-started JVM process completely; in contrast, tryOrExit is to be called in the + * spark-started JVM process . */ def tryOrStopSparkContext(sc: SparkContext)(block: => Unit) { try { block } catch { case e: ControlThrowable => throw e - case t: Throwable => sc.stop() + case t: Throwable => + if (sc != null) { + logError(s"uncaught error in thread ${Thread.currentThread().getName}, stopping " + + "SparkContext", t) + sc.stop() + } } }