Skip to content

Commit

Permalink
[SPARK-3741] Make ConnectionManager propagate errors properly and add…
Browse files Browse the repository at this point in the history
… mo...

...re logs to avoid Executors swallowing errors

This PR made the following changes:
* Register a callback to `Connection` so that the error will be propagated properly.
* Add more logs so that the errors won't be swallowed by Executors.
* Use trySuccess/tryFailure because `Promise` doesn't allow to call success/failure more than once.

Author: zsxwing <zsxwing@gmail.com>

Closes #2593 from zsxwing/SPARK-3741 and squashes the following commits:

1d5aed5 [zsxwing] Fix naming
0b8a61c [zsxwing] Merge branch 'master' into SPARK-3741
764aec5 [zsxwing] [SPARK-3741] Make ConnectionManager propagate errors properly and add more logs to avoid Executors swallowing errors
  • Loading branch information
zsxwing authored and JoshRosen committed Oct 9, 2014
1 parent 1e0aa4d commit 73bf3f2
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 69 deletions.
35 changes: 21 additions & 14 deletions core/src/main/scala/org/apache/spark/network/nio/Connection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@ package org.apache.spark.network.nio
import java.net._
import java.nio._
import java.nio.channels._
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.LinkedList

import org.apache.spark._

import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.util.control.NonFatal

private[nio]
abstract class Connection(val channel: SocketChannel, val selector: Selector,
Expand All @@ -51,7 +54,7 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector,

@volatile private var closed = false
var onCloseCallback: Connection => Unit = null
var onExceptionCallback: (Connection, Exception) => Unit = null
val onExceptionCallbacks = new ConcurrentLinkedQueue[(Connection, Throwable) => Unit]
var onKeyInterestChangeCallback: (Connection, Int) => Unit = null

val remoteAddress = getRemoteAddress()
Expand Down Expand Up @@ -130,20 +133,24 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector,
onCloseCallback = callback
}

def onException(callback: (Connection, Exception) => Unit) {
onExceptionCallback = callback
def onException(callback: (Connection, Throwable) => Unit) {
onExceptionCallbacks.add(callback)
}

def onKeyInterestChange(callback: (Connection, Int) => Unit) {
onKeyInterestChangeCallback = callback
}

def callOnExceptionCallback(e: Exception) {
if (onExceptionCallback != null) {
onExceptionCallback(this, e)
} else {
logError("Error in connection to " + getRemoteConnectionManagerId() +
" and OnExceptionCallback not registered", e)
def callOnExceptionCallbacks(e: Throwable) {
onExceptionCallbacks foreach {
callback =>
try {
callback(this, e)
} catch {
case NonFatal(e) => {
logWarning("Ignored error in onExceptionCallback", e)
}
}
}
}

Expand Down Expand Up @@ -323,7 +330,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
} catch {
case e: Exception => {
logError("Error connecting to " + address, e)
callOnExceptionCallback(e)
callOnExceptionCallbacks(e)
}
}
}
Expand All @@ -348,7 +355,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
} catch {
case e: Exception => {
logWarning("Error finishing connection to " + address, e)
callOnExceptionCallback(e)
callOnExceptionCallbacks(e)
}
}
true
Expand Down Expand Up @@ -393,7 +400,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
} catch {
case e: Exception => {
logWarning("Error writing in connection to " + getRemoteConnectionManagerId(), e)
callOnExceptionCallback(e)
callOnExceptionCallbacks(e)
close()
return false
}
Expand All @@ -420,7 +427,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
case e: Exception =>
logError("Exception while reading SendingConnection to " + getRemoteConnectionManagerId(),
e)
callOnExceptionCallback(e)
callOnExceptionCallbacks(e)
close()
}

Expand Down Expand Up @@ -577,7 +584,7 @@ private[spark] class ReceivingConnection(
} catch {
case e: Exception => {
logWarning("Error reading from connection to " + getRemoteConnectionManagerId(), e)
callOnExceptionCallback(e)
callOnExceptionCallbacks(e)
close()
return false
}
Expand Down
Loading

0 comments on commit 73bf3f2

Please sign in to comment.