Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into dtrunner-update
Browse files Browse the repository at this point in the history
  • Loading branch information
jkbradley committed Oct 11, 2014
2 parents ba567ab + 90f73fc commit 7f3d60f
Show file tree
Hide file tree
Showing 46 changed files with 617 additions and 608 deletions.
51 changes: 38 additions & 13 deletions bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -50,22 +50,47 @@ fi

. "$FWDIR"/bin/load-spark-env.sh

# Figure out which Python executable to use
# In Spark <= 1.1, setting IPYTHON=1 would cause the driver to be launched using the `ipython`
# executable, while the worker would still be launched using PYSPARK_PYTHON.
#
# In Spark 1.2, we removed the documentation of the IPYTHON and IPYTHON_OPTS variables and added
# PYSPARK_DRIVER_PYTHON and PYSPARK_DRIVER_PYTHON_OPTS to allow IPython to be used for the driver.
# Now, users can simply set PYSPARK_DRIVER_PYTHON=ipython to use IPython and set
# PYSPARK_DRIVER_PYTHON_OPTS to pass options when starting the Python driver
# (e.g. PYSPARK_DRIVER_PYTHON_OPTS='notebook'). This supports full customization of the IPython
# and executor Python executables.
#
# For backwards-compatibility, we retain the old IPYTHON and IPYTHON_OPTS variables.

# Determine the Python executable to use if PYSPARK_PYTHON or PYSPARK_DRIVER_PYTHON isn't set:
if hash python2.7 2>/dev/null; then
# Attempt to use Python 2.7, if installed:
DEFAULT_PYTHON="python2.7"
else
DEFAULT_PYTHON="python"
fi

# Determine the Python executable to use for the driver:
if [[ -n "$IPYTHON_OPTS" || "$IPYTHON" == "1" ]]; then
# If IPython options are specified, assume user wants to run IPython
# (for backwards-compatibility)
PYSPARK_DRIVER_PYTHON_OPTS="$PYSPARK_DRIVER_PYTHON_OPTS $IPYTHON_OPTS"
PYSPARK_DRIVER_PYTHON="ipython"
elif [[ -z "$PYSPARK_DRIVER_PYTHON" ]]; then
PYSPARK_DRIVER_PYTHON="${PYSPARK_PYTHON:-"$DEFAULT_PYTHON"}"
fi

# Determine the Python executable to use for the executors:
if [[ -z "$PYSPARK_PYTHON" ]]; then
if [[ "$IPYTHON" = "1" || -n "$IPYTHON_OPTS" ]]; then
# for backward compatibility
PYSPARK_PYTHON="ipython"
if [[ $PYSPARK_DRIVER_PYTHON == *ipython* && $DEFAULT_PYTHON != "python2.7" ]]; then
echo "IPython requires Python 2.7+; please install python2.7 or set PYSPARK_PYTHON" 1>&2
exit 1
else
PYSPARK_PYTHON="python"
PYSPARK_PYTHON="$DEFAULT_PYTHON"
fi
fi
export PYSPARK_PYTHON

if [[ -z "$PYSPARK_PYTHON_OPTS" && -n "$IPYTHON_OPTS" ]]; then
# for backward compatibility
PYSPARK_PYTHON_OPTS="$IPYTHON_OPTS"
fi

# Add the PySpark classes to the Python path:
export PYTHONPATH="$SPARK_HOME/python/:$PYTHONPATH"
export PYTHONPATH="$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH"
Expand Down Expand Up @@ -93,9 +118,9 @@ if [[ -n "$SPARK_TESTING" ]]; then
unset YARN_CONF_DIR
unset HADOOP_CONF_DIR
if [[ -n "$PYSPARK_DOC_TEST" ]]; then
exec "$PYSPARK_PYTHON" -m doctest $1
exec "$PYSPARK_DRIVER_PYTHON" -m doctest $1
else
exec "$PYSPARK_PYTHON" $1
exec "$PYSPARK_DRIVER_PYTHON" $1
fi
exit
fi
Expand All @@ -111,5 +136,5 @@ if [[ "$1" =~ \.py$ ]]; then
else
# PySpark shell requires special handling downstream
export PYSPARK_SHELL=1
exec "$PYSPARK_PYTHON" $PYSPARK_PYTHON_OPTS
exec "$PYSPARK_DRIVER_PYTHON" $PYSPARK_DRIVER_PYTHON_OPTS
fi
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import scala.collection.JavaConversions._
import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider}
import com.google.common.io.Files

import org.apache.spark.util.Utils

/**
* Utilities for tests. Included in main codebase since it's used by multiple
* projects.
Expand All @@ -42,8 +44,7 @@ private[spark] object TestUtils {
* in order to avoid interference between tests.
*/
def createJarWithClasses(classNames: Seq[String], value: String = ""): URL = {
val tempDir = Files.createTempDir()
tempDir.deleteOnExit()
val tempDir = Utils.createTempDir()
val files = for (name <- classNames) yield createCompiledClass(name, tempDir, value)
val jarFile = new File(tempDir, "testJar-%s.jar".format(System.currentTimeMillis()))
createJar(files, jarFile)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,12 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1)))

// Create and start the worker
val pb = new ProcessBuilder(Seq(pythonExec, "-u", "-m", "pyspark.worker"))
val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.worker"))
val workerEnv = pb.environment()
workerEnv.putAll(envVars)
workerEnv.put("PYTHONPATH", pythonPath)
// This is equivalent to setting the -u flag; we use it because ipython doesn't support -u:
workerEnv.put("PYTHONUNBUFFERED", "YES")
val worker = pb.start()

// Redirect worker stdout and stderr
Expand Down Expand Up @@ -149,10 +151,12 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String

try {
// Create and start the daemon
val pb = new ProcessBuilder(Seq(pythonExec, "-u", "-m", "pyspark.daemon"))
val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.daemon"))
val workerEnv = pb.environment()
workerEnv.putAll(envVars)
workerEnv.put("PYTHONPATH", pythonPath)
// This is equivalent to setting the -u flag; we use it because ipython doesn't support -u:
workerEnv.put("PYTHONUNBUFFERED", "YES")
daemon = pb.start()

val in = new DataInputStream(daemon.getInputStream)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf)
println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
System.exit(-1)

case AssociationErrorEvent(cause, _, remoteAddress, _) =>
case AssociationErrorEvent(cause, _, remoteAddress, _, _) =>
println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.")
println(s"Cause was: $cause")
System.exit(-1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ object PythonRunner {
val pythonFile = args(0)
val pyFiles = args(1)
val otherArgs = args.slice(2, args.length)
val pythonExec = sys.env.get("PYSPARK_PYTHON").getOrElse("python") // TODO: get this from conf
val pythonExec =
sys.env.getOrElse("PYSPARK_DRIVER_PYTHON", sys.env.getOrElse("PYSPARK_PYTHON", "python"))

// Format python file paths before adding them to the PYTHONPATH
val formattedPythonFile = formatPath(pythonFile)
Expand All @@ -57,6 +58,7 @@ object PythonRunner {
val builder = new ProcessBuilder(Seq(pythonExec, formattedPythonFile) ++ otherArgs)
val env = builder.environment()
env.put("PYTHONPATH", pythonPath)
// This is equivalent to setting the -u flag; we use it because ipython doesn't support -u:
env.put("PYTHONUNBUFFERED", "YES") // value is needed to be set to a non-empty string
env.put("PYSPARK_GATEWAY_PORT", "" + gatewayServer.getListeningPort)
builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ private[spark] class AppClient(
logWarning(s"Connection to $address failed; waiting for master to reconnect...")
markDisconnected()

case AssociationErrorEvent(cause, _, address, _) if isPossibleMaster(address) =>
case AssociationErrorEvent(cause, _, address, _, _) if isPossibleMaster(address) =>
logWarning(s"Could not connect to $address: $cause")

case StopAppClient =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ private[spark] class WorkerWatcher(workerUrl: String)
case AssociatedEvent(localAddress, remoteAddress, inbound) if isWorker(remoteAddress) =>
logInfo(s"Successfully connected to $workerUrl")

case AssociationErrorEvent(cause, localAddress, remoteAddress, inbound)
case AssociationErrorEvent(cause, localAddress, remoteAddress, inbound, _)
if isWorker(remoteAddress) =>
// These logs may not be seen if the worker (and associated pipe) has died
logError(s"Could not initialize connection to worker $workerUrl. Exiting.")
Expand Down
16 changes: 15 additions & 1 deletion core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,27 @@ sealed abstract class ManagedBuffer {
final class FileSegmentManagedBuffer(val file: File, val offset: Long, val length: Long)
extends ManagedBuffer {

/**
* Memory mapping is expensive and can destabilize the JVM (SPARK-1145, SPARK-3889).
* Avoid unless there's a good reason not to.
*/
private val MIN_MEMORY_MAP_BYTES = 2 * 1024 * 1024;

override def size: Long = length

override def nioByteBuffer(): ByteBuffer = {
var channel: FileChannel = null
try {
channel = new RandomAccessFile(file, "r").getChannel
channel.map(MapMode.READ_ONLY, offset, length)
// Just copy the buffer if it's sufficiently small, as memory mapping has a high overhead.
if (length < MIN_MEMORY_MAP_BYTES) {
val buf = ByteBuffer.allocate(length.toInt)
channel.read(buf, offset)
buf.flip()
buf
} else {
channel.map(MapMode.READ_ONLY, offset, length)
}
} catch {
case e: IOException =>
Try(channel.size).toOption match {
Expand Down
55 changes: 38 additions & 17 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,20 @@ private[spark] object Utils extends Logging {
private val shutdownDeletePaths = new scala.collection.mutable.HashSet[String]()
private val shutdownDeleteTachyonPaths = new scala.collection.mutable.HashSet[String]()

// Add a shutdown hook to delete the temp dirs when the JVM exits
Runtime.getRuntime.addShutdownHook(new Thread("delete Spark temp dirs") {
override def run(): Unit = Utils.logUncaughtExceptions {
logDebug("Shutdown hook called")
shutdownDeletePaths.foreach { dirPath =>
try {
Utils.deleteRecursively(new File(dirPath))
} catch {
case e: Exception => logError(s"Exception while deleting Spark temp dir: $dirPath", e)
}
}
}
})

// Register the path to be deleted via shutdown hook
def registerShutdownDeleteDir(file: File) {
val absolutePath = file.getAbsolutePath()
Expand Down Expand Up @@ -252,14 +266,6 @@ private[spark] object Utils extends Logging {
}

registerShutdownDeleteDir(dir)

// Add a shutdown hook to delete the temp dir when the JVM exits
Runtime.getRuntime.addShutdownHook(new Thread("delete Spark temp dir " + dir) {
override def run() {
// Attempt to delete if some patch which is parent of this is not already registered.
if (! hasRootAsShutdownDeleteDir(dir)) Utils.deleteRecursively(dir)
}
})
dir
}

Expand Down Expand Up @@ -666,15 +672,30 @@ private[spark] object Utils extends Logging {
*/
def deleteRecursively(file: File) {
if (file != null) {
if (file.isDirectory() && !isSymlink(file)) {
for (child <- listFilesSafely(file)) {
deleteRecursively(child)
try {
if (file.isDirectory && !isSymlink(file)) {
var savedIOException: IOException = null
for (child <- listFilesSafely(file)) {
try {
deleteRecursively(child)
} catch {
// In case of multiple exceptions, only last one will be thrown
case ioe: IOException => savedIOException = ioe
}
}
if (savedIOException != null) {
throw savedIOException
}
shutdownDeletePaths.synchronized {
shutdownDeletePaths.remove(file.getAbsolutePath)
}
}
}
if (!file.delete()) {
// Delete can also fail if the file simply did not exist
if (file.exists()) {
throw new IOException("Failed to delete: " + file.getAbsolutePath)
} finally {
if (!file.delete()) {
// Delete can also fail if the file simply did not exist
if (file.exists()) {
throw new IOException("Failed to delete: " + file.getAbsolutePath)
}
}
}
}
Expand Down Expand Up @@ -713,7 +734,7 @@ private[spark] object Utils extends Logging {
*/
def doesDirectoryContainAnyNewFiles(dir: File, cutoff: Long): Boolean = {
if (!dir.isDirectory) {
throw new IllegalArgumentException("$dir is not a directory!")
throw new IllegalArgumentException(s"$dir is not a directory!")
}
val filesAndDirs = dir.listFiles()
val cutoffTimeInMillis = System.currentTimeMillis - (cutoff * 1000)
Expand Down
4 changes: 1 addition & 3 deletions core/src/test/scala/org/apache/spark/FileServerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark
import java.io._
import java.util.jar.{JarEntry, JarOutputStream}

import com.google.common.io.Files
import org.scalatest.FunSuite

import org.apache.spark.SparkContext._
Expand All @@ -41,8 +40,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
override def beforeAll() {
super.beforeAll()

tmpDir = Files.createTempDir()
tmpDir.deleteOnExit()
tmpDir = Utils.createTempDir()
val testTempDir = new File(tmpDir, "test")
testTempDir.mkdir()

Expand Down
4 changes: 1 addition & 3 deletions core/src/test/scala/org/apache/spark/FileSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.io.{File, FileWriter}

import scala.io.Source

import com.google.common.io.Files
import org.apache.hadoop.io._
import org.apache.hadoop.io.compress.DefaultCodec
import org.apache.hadoop.mapred.{JobConf, FileAlreadyExistsException, FileSplit, TextInputFormat, TextOutputFormat}
Expand All @@ -39,8 +38,7 @@ class FileSuite extends FunSuite with LocalSparkContext {

override def beforeEach() {
super.beforeEach()
tempDir = Files.createTempDir()
tempDir.deleteOnExit()
tempDir = Utils.createTempDir()
}

override def afterEach() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
val masterTracker = new MapOutputTrackerMaster(conf)
val actorSystem = ActorSystem("test")
val actorRef = TestActorRef[MapOutputTrackerMasterActor](
new MapOutputTrackerMasterActor(masterTracker, newConf))(actorSystem)
Props(new MapOutputTrackerMasterActor(masterTracker, newConf)))(actorSystem)
val masterActor = actorRef.underlyingActor

// Frame size should be ~123B, and no exception should be thrown
Expand All @@ -164,7 +164,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
val masterTracker = new MapOutputTrackerMaster(conf)
val actorSystem = ActorSystem("test")
val actorRef = TestActorRef[MapOutputTrackerMasterActor](
new MapOutputTrackerMasterActor(masterTracker, newConf))(actorSystem)
Props(new MapOutputTrackerMasterActor(masterTracker, newConf)))(actorSystem)
val masterActor = actorRef.underlyingActor

// Frame size should be ~1.1MB, and MapOutputTrackerMasterActor should throw exception.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import org.apache.spark.deploy.SparkSubmit._
import org.apache.spark.util.Utils
import org.scalatest.FunSuite
import org.scalatest.Matchers
import com.google.common.io.Files

class SparkSubmitSuite extends FunSuite with Matchers {
def beforeAll() {
Expand Down Expand Up @@ -332,7 +331,7 @@ class SparkSubmitSuite extends FunSuite with Matchers {
}

def forConfDir(defaults: Map[String, String]) (f: String => Unit) = {
val tmpDir = Files.createTempDir()
val tmpDir = Utils.createTempDir()

val defaultsConf = new File(tmpDir.getAbsolutePath, "spark-defaults.conf")
val writer = new OutputStreamWriter(new FileOutputStream(defaultsConf))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import java.io.FileOutputStream

import scala.collection.immutable.IndexedSeq

import com.google.common.io.Files

import org.scalatest.BeforeAndAfterAll
import org.scalatest.FunSuite

Expand Down Expand Up @@ -66,9 +64,7 @@ class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll {
* 3) Does the contents be the same.
*/
test("Correctness of WholeTextFileRecordReader.") {

val dir = Files.createTempDir()
dir.deleteOnExit()
val dir = Utils.createTempDir()
println(s"Local disk address is ${dir.toString}.")

WholeTextFileRecordReaderSuite.files.foreach { case (filename, contents) =>
Expand Down
Loading

0 comments on commit 7f3d60f

Please sign in to comment.