From cbc05bd3387b4cc656413829d6a0239806c86807 Mon Sep 17 00:00:00 2001 From: Anirudh Ramanathan Date: Fri, 24 Feb 2017 14:10:51 -0700 Subject: [PATCH] Richer logging and better error handling in driver pod watch (#154) * pod-watch progress around watch events * Simplify return * comments (cherry picked from commit d81c0845c28b895c5ebaaeca7911338f0ef5429e) --- .../kubernetes/LoggingPodStatusWatcher.scala | 43 ++++++++++++------- 1 file changed, 28 insertions(+), 15 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/LoggingPodStatusWatcher.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/LoggingPodStatusWatcher.scala index b7a29fedcbd2d..17c3db8331ac4 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/LoggingPodStatusWatcher.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/LoggingPodStatusWatcher.scala @@ -50,27 +50,30 @@ private[kubernetes] class LoggingPodStatusWatcher(podCompletedFuture: CountDownL } private var pod: Option[Pod] = Option.empty - private var prevPhase: String = null private def phase: String = pod.map(_.getStatus().getPhase()).getOrElse("unknown") + private def status: String = pod.map(_.getStatus().getContainerStatuses().toString()) + .getOrElse("unknown") override def eventReceived(action: Action, pod: Pod): Unit = { this.pod = Option(pod) - - logShortStatus() - if (prevPhase != phase) { - logLongStatus() - } - prevPhase = phase - - if (phase == "Succeeded" || phase == "Failed") { - podCompletedFuture.countDown() - scheduler.shutdown() + action match { + case Action.DELETED => + closeWatch() + + case Action.ERROR => + closeWatch() + + case _ => + logLongStatus() + if (hasCompleted()) { + closeWatch() + } } } override def onClose(e: KubernetesClientException): Unit = { - scheduler.shutdown() - logDebug(s"Stopped watching application $appId with last-observed phase $phase") + logDebug(s"Stopping watching application $appId with last-observed phase $phase") + closeWatch() } private def logShortStatus() = { @@ -78,7 +81,16 @@ private[kubernetes] class LoggingPodStatusWatcher(podCompletedFuture: CountDownL } private def logLongStatus() = { - logInfo("Phase changed, new state: " + pod.map(formatPodState(_)).getOrElse("unknown")) + logInfo("State changed, new state: " + pod.map(formatPodState(_)).getOrElse("unknown")) + } + + private def hasCompleted(): Boolean = { + phase == "Succeeded" || phase == "Failed" + } + + private def closeWatch(): Unit = { + podCompletedFuture.countDown() + scheduler.shutdown() } private def formatPodState(pod: Pod): String = { @@ -103,7 +115,8 @@ private[kubernetes] class LoggingPodStatusWatcher(podCompletedFuture: CountDownL .asScala .map(_.getImage) .mkString(", ")), - ("phase", pod.getStatus.getPhase()) + ("phase", pod.getStatus.getPhase()), + ("status", pod.getStatus.getContainerStatuses().toString) ) // Use more loggable format if value is null or empty