diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala index b25016be4bea5..8d494929c4c8c 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala @@ -40,8 +40,10 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String var daemon: Process = null val daemonHost = InetAddress.getByAddress(Array(127, 0, 0, 1)) var daemonPort: Int = 0 - var daemonWorkers = new mutable.WeakHashMap[Socket, Int]() - var idleWorkers = new mutable.Queue[Socket]() + val daemonWorkers = new mutable.WeakHashMap[Socket, Int]() + val idleWorkers = new mutable.Queue[Socket]() + var lastActivity = 0L + new MonitorThread().start() var simpleWorkers = new mutable.WeakHashMap[Socket, Process]() @@ -52,8 +54,10 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String def create(): Socket = { if (useDaemon) { - if (idleWorkers.length > 0) { - return idleWorkers.dequeue() + idleWorkers.synchronized { + if (idleWorkers.size > 0) { + return idleWorkers.dequeue() + } } createThroughDaemon() } else { @@ -203,6 +207,35 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String } } + /** + * Monitor all the idle workers, kill them after timeout. + */ + private class MonitorThread extends Thread(s"Idle Worker Monitor for $pythonExec") { + + setDaemon(true) + + override def run() { + while (true) { + idleWorkers.synchronized { + if (lastActivity + IDLE_WORKER_TIMEOUT_MS < System.currentTimeMillis()) { + while (idleWorkers.length > 0) { + val worker = idleWorkers.dequeue() + try { + // the Python worker will exit after closing the socket + worker.close() + } catch { + case e: Exception => + logWarning("Failed to close worker socket", e) + } + } + lastActivity = System.currentTimeMillis() + } + } + Thread.sleep(10000) + } + } + } + private def stopDaemon() { synchronized { if (useDaemon) { @@ -242,7 +275,10 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String def releaseWorker(worker: Socket) { if (useDaemon && envVars.get("SPARK_REUSE_WORKER").isDefined) { + idleWorkers.synchronized { + lastActivity = System.currentTimeMillis() idleWorkers.enqueue(worker) + } } else { // Cleanup the worker socket. This will also cause the Python worker to exit. try { @@ -257,4 +293,5 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String private object PythonWorkerFactory { val PROCESS_WAIT_TIMEOUT_MS = 10000 + val IDLE_WORKER_TIMEOUT_MS = 60000 } diff --git a/python/run-tests b/python/run-tests index f2d017ca99c8a..7b1ee3e1cddba 100755 --- a/python/run-tests +++ b/python/run-tests @@ -50,7 +50,7 @@ echo "Running PySpark tests. Output is in python/unit-tests.log." # Try to test with Python 2.6, since that's the minimum version that we support: if [ $(which python2.6) ]; then - export PYSPARK_PYTHON="pypy" + export PYSPARK_PYTHON="python2.6" fi echo "Testing with Python version:"