Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update #7

Merged
merged 36 commits into from
Sep 16, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
b8634df
[SPARK-3160] [SPARK-3494] [mllib] DecisionTree: eliminate pre-alloca…
jkbradley Sep 12, 2014
f116f76
[SPARK-2558][DOCS] Add --queue example to YARN doc
kramimus Sep 12, 2014
5333776
[PySpark] Add blank line so that Python RDD.top() docstring renders c…
rnowling Sep 12, 2014
8194fc6
[SPARK-3481] [SQL] Eliminate the error log in local Hive comparison test
chenghao-intel Sep 12, 2014
eae81b0
MAINTENANCE: Automated closing of pull requests.
pwendell Sep 12, 2014
15a5645
[SPARK-3427] [GraphX] Avoid active vertex tracking in static PageRank
ankurdave Sep 12, 2014
1d76796
SPARK-3014. Log a more informative messages in a couple failure scena…
sryza Sep 12, 2014
af25838
[SPARK-3217] Add Guava to classpath when SPARK_PREPEND_CLASSES is set.
Sep 12, 2014
25311c2
[SPARK-3456] YarnAllocator on alpha can lose container requests to RM
tgravescs Sep 13, 2014
71af030
[SPARK-3094] [PySpark] compatitable with PyPy
davies Sep 13, 2014
885d162
[SPARK-3500] [SQL] use JavaSchemaRDD as SchemaRDD._jschema_rdd
davies Sep 13, 2014
6d887db
[SPARK-3515][SQL] Moves test suite setup code to beforeAll rather tha…
liancheng Sep 13, 2014
2584ea5
[SPARK-3469] Make sure all TaskCompletionListener are called even wit…
rxin Sep 13, 2014
e11eeb7
[SQL][Docs] Update SQL programming guide to show the correct default …
yhuai Sep 13, 2014
feaa370
SPARK-3470 [CORE] [STREAMING] Add Closeable / close() to Java context…
srowen Sep 13, 2014
b4dded4
Proper indent for the previous commit.
rxin Sep 13, 2014
a523cea
[SQL] [Docs] typo fixes
nchammas Sep 13, 2014
184cd51
[SPARK-3481][SQL] Removes the evil MINOR HACK
liancheng Sep 13, 2014
7404924
[SPARK-3294][SQL] Eliminates boxing costs from in-memory columnar sto…
liancheng Sep 13, 2014
0f8c4ed
[SQL] Decrease partitions when testing
marmbrus Sep 13, 2014
2aea0da
[SPARK-3030] [PySpark] Reuse Python worker
davies Sep 13, 2014
4e3fbe8
[SPARK-3463] [PySpark] aggregate and show spilled bytes in Python
davies Sep 14, 2014
c243b21
SPARK-3039: Allow spark to be built using avro-mapred for hadoop2
bbossy Sep 15, 2014
f493f79
[SPARK-3452] Maven build should skip publishing artifacts people shou…
ScrapCodes Sep 15, 2014
cc14644
[SPARK-3410] The priority of shutdownhook for ApplicationMaster shoul…
sarutak Sep 15, 2014
fe2b1d6
[SPARK-3425] do not set MaxPermSize for OpenJDK 1.8
Sep 15, 2014
e59fac1
[SPARK-3518] Remove wasted statement in JsonProtocol
sarutak Sep 15, 2014
37d9252
[SPARK-2714] DAGScheduler logs jobid when runJob finishes
YanTangZhai Sep 15, 2014
3b93128
[SPARK-3396][MLLIB] Use SquaredL2Updater in LogisticRegressionWithSGD
BigCrunsh Sep 16, 2014
983d6a9
[MLlib] Update SVD documentation in IndexedRowMatrix
rezazadeh Sep 16, 2014
fdb302f
[SPARK-3516] [mllib] DecisionTree: Add minInstancesPerNode, minInfoGa…
Sep 16, 2014
da33acb
[SPARK-2951] [PySpark] support unpickle array.array for Python 2.6
davies Sep 16, 2014
60050f4
[SPARK-1087] Move python traceback utilities into new traceback_utils…
staple Sep 16, 2014
d428ac6
[SPARK-3540] Add reboot-slaves functionality to the ec2 script
rxin Sep 16, 2014
ecf0c02
[SPARK-3433][BUILD] Fix for Mima false-positives with @DeveloperAPI a…
ScrapCodes Sep 16, 2014
febafef
[SPARK-3040] pick up a more proper local ip address for Utils.findLoc…
advancedxy Sep 16, 2014
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,20 @@

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-install-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<!-- Use the shade plugin to create a big JAR with all the dependencies -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down
1 change: 1 addition & 0 deletions bin/compute-classpath.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ if [ -n "$SPARK_PREPEND_CLASSES" ]; then
echo "NOTE: SPARK_PREPEND_CLASSES is set, placing locally compiled Spark"\
"classes ahead of assembly." >&2
CLASSPATH="$CLASSPATH:$FWDIR/core/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/core/target/jars/*"
CLASSPATH="$CLASSPATH:$FWDIR/repl/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/mllib/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/bagel/target/scala-$SCALA_VERSION/classes"
Expand Down
2 changes: 1 addition & 1 deletion bin/spark-class
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ else
exit 1
fi
fi
JAVA_VERSION=$("$RUNNER" -version 2>&1 | sed 's/java version "\(.*\)\.\(.*\)\..*"/\1\2/; 1q')
JAVA_VERSION=$("$RUNNER" -version 2>&1 | sed 's/.* version "\(.*\)\.\(.*\)\..*"/\1\2/; 1q')

# Set JAVA_OPTS to be able to load native libraries and to set heap size
if [ "$JAVA_VERSION" -ge 18 ]; then
Expand Down
27 changes: 27 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,33 @@
</execution>
</executions>
</plugin>
<!--
Copy guava to the build directory. This is needed to make the SPARK_PREPEND_CLASSES
option work in compute-classpath.sh, since it would put the non-shaded Spark classes in
the runtime classpath.
-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<useSubDirectoryPerType>true</useSubDirectoryPerType>
<includeArtifactIds>guava</includeArtifactIds>
<silent>true</silent>
</configuration>
</execution>
</executions>
</plugin>
</plugins>

<resources>
Expand Down
3 changes: 0 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1072,11 +1072,8 @@ class SparkContext(config: SparkConf) extends Logging {
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
val start = System.nanoTime
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
resultHandler, localProperties.get)
logInfo(
"Job finished: " + callSite.shortForm + ", took " + (System.nanoTime - start) / 1e9 + " s")
rdd.doCheckpoint()
}

Expand Down
8 changes: 8 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,14 @@ class SparkEnv (
pythonWorkers.get(key).foreach(_.stopWorker(worker))
}
}

private[spark]
def releasePythonWorker(pythonExec: String, envVars: Map[String, String], worker: Socket) {
synchronized {
val key = (pythonExec, envVars)
pythonWorkers.get(key).foreach(_.releaseWorker(worker))
}
}
}

object SparkEnv extends Logging {
Expand Down
18 changes: 15 additions & 3 deletions core/src/main/scala/org/apache/spark/TaskContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.collection.mutable.ArrayBuffer

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.util.TaskCompletionListener
import org.apache.spark.util.{TaskCompletionListenerException, TaskCompletionListener}


/**
Expand All @@ -41,7 +41,7 @@ class TaskContext(
val attemptId: Long,
val runningLocally: Boolean = false,
private[spark] val taskMetrics: TaskMetrics = TaskMetrics.empty)
extends Serializable {
extends Serializable with Logging {

@deprecated("use partitionId", "0.8.1")
def splitId = partitionId
Expand Down Expand Up @@ -103,8 +103,20 @@ class TaskContext(
/** Marks the task as completed and triggers the listeners. */
private[spark] def markTaskCompleted(): Unit = {
completed = true
val errorMsgs = new ArrayBuffer[String](2)
// Process complete callbacks in the reverse order of registration
onCompleteCallbacks.reverse.foreach { _.onTaskCompletion(this) }
onCompleteCallbacks.reverse.foreach { listener =>
try {
listener.onTaskCompletion(this)
} catch {
case e: Throwable =>
errorMsgs += e.getMessage
logError("Error in TaskCompletionListener", e)
}
}
if (errorMsgs.nonEmpty) {
throw new TaskCompletionListenerException(errorMsgs)
}
}

/** Marks the task for interruption, i.e. cancellation. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.api.java

import java.io.Closeable
import java.util
import java.util.{Map => JMap}

Expand All @@ -40,7 +41,9 @@ import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD, RDD}
* A Java-friendly version of [[org.apache.spark.SparkContext]] that returns
* [[org.apache.spark.api.java.JavaRDD]]s and works with Java collections instead of Scala ones.
*/
class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWorkaround {
class JavaSparkContext(val sc: SparkContext)
extends JavaSparkContextVarargsWorkaround with Closeable {

/**
* Create a JavaSparkContext that loads settings from system properties (for instance, when
* launching with ./bin/spark-submit).
Expand Down Expand Up @@ -534,6 +537,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
sc.stop()
}

override def close(): Unit = stop()

/**
* Get Spark's home location from either a value set through the constructor,
* or the spark.home Java property, or the SPARK_HOME environment variable
Expand Down
62 changes: 50 additions & 12 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.nio.charset.Charset
import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections}

import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.language.existentials
import scala.reflect.ClassTag
import scala.util.{Try, Success, Failure}
Expand Down Expand Up @@ -52,6 +53,7 @@ private[spark] class PythonRDD(
extends RDD[Array[Byte]](parent) {

val bufferSize = conf.getInt("spark.buffer.size", 65536)
val reuse_worker = conf.getBoolean("spark.python.worker.reuse", true)

override def getPartitions = parent.partitions

Expand All @@ -63,19 +65,26 @@ private[spark] class PythonRDD(
val localdir = env.blockManager.diskBlockManager.localDirs.map(
f => f.getPath()).mkString(",")
envVars += ("SPARK_LOCAL_DIRS" -> localdir) // it's also used in monitor thread
if (reuse_worker) {
envVars += ("SPARK_REUSE_WORKER" -> "1")
}
val worker: Socket = env.createPythonWorker(pythonExec, envVars.toMap)

// Start a thread to feed the process input from our parent's iterator
val writerThread = new WriterThread(env, worker, split, context)

var complete_cleanly = false
context.addTaskCompletionListener { context =>
writerThread.shutdownOnTaskCompletion()

// Cleanup the worker socket. This will also cause the Python worker to exit.
try {
worker.close()
} catch {
case e: Exception => logWarning("Failed to close worker socket", e)
if (reuse_worker && complete_cleanly) {
env.releasePythonWorker(pythonExec, envVars.toMap, worker)
} else {
try {
worker.close()
} catch {
case e: Exception =>
logWarning("Failed to close worker socket", e)
}
}
}

Expand Down Expand Up @@ -115,6 +124,10 @@ private[spark] class PythonRDD(
val total = finishTime - startTime
logInfo("Times: total = %s, boot = %s, init = %s, finish = %s".format(total, boot,
init, finish))
val memoryBytesSpilled = stream.readLong()
val diskBytesSpilled = stream.readLong()
context.taskMetrics.memoryBytesSpilled += memoryBytesSpilled
context.taskMetrics.diskBytesSpilled += diskBytesSpilled
read()
case SpecialLengths.PYTHON_EXCEPTION_THROWN =>
// Signals that an exception has been thrown in python
Expand All @@ -133,6 +146,7 @@ private[spark] class PythonRDD(
stream.readFully(update)
accumulator += Collections.singletonList(update)
}
complete_cleanly = true
null
}
} catch {
Expand Down Expand Up @@ -195,29 +209,45 @@ private[spark] class PythonRDD(
PythonRDD.writeUTF(include, dataOut)
}
// Broadcast variables
dataOut.writeInt(broadcastVars.length)
val oldBids = PythonRDD.getWorkerBroadcasts(worker)
val newBids = broadcastVars.map(_.id).toSet
// number of different broadcasts
val cnt = oldBids.diff(newBids).size + newBids.diff(oldBids).size
dataOut.writeInt(cnt)
for (bid <- oldBids) {
if (!newBids.contains(bid)) {
// remove the broadcast from worker
dataOut.writeLong(- bid - 1) // bid >= 0
oldBids.remove(bid)
}
}
for (broadcast <- broadcastVars) {
dataOut.writeLong(broadcast.id)
dataOut.writeInt(broadcast.value.length)
dataOut.write(broadcast.value)
if (!oldBids.contains(broadcast.id)) {
// send new broadcast
dataOut.writeLong(broadcast.id)
dataOut.writeInt(broadcast.value.length)
dataOut.write(broadcast.value)
oldBids.add(broadcast.id)
}
}
dataOut.flush()
// Serialized command:
dataOut.writeInt(command.length)
dataOut.write(command)
// Data values
PythonRDD.writeIteratorToStream(parent.iterator(split, context), dataOut)
dataOut.writeInt(SpecialLengths.END_OF_DATA_SECTION)
dataOut.flush()
} catch {
case e: Exception if context.isCompleted || context.isInterrupted =>
logDebug("Exception thrown after task completion (likely due to cleanup)", e)
worker.shutdownOutput()

case e: Exception =>
// We must avoid throwing exceptions here, because the thread uncaught exception handler
// will kill the whole executor (see org.apache.spark.executor.Executor).
_exception = e
} finally {
Try(worker.shutdownOutput()) // kill Python worker process
worker.shutdownOutput()
}
}
}
Expand Down Expand Up @@ -278,6 +308,14 @@ private object SpecialLengths {
private[spark] object PythonRDD extends Logging {
val UTF8 = Charset.forName("UTF-8")

// remember the broadcasts sent to each worker
private val workerBroadcasts = new mutable.WeakHashMap[Socket, mutable.Set[Long]]()
private def getWorkerBroadcasts(worker: Socket) = {
synchronized {
workerBroadcasts.getOrElseUpdate(worker, new mutable.HashSet[Long]())
}
}

/**
* Adapter for calling SparkContext#runJob from Python.
*
Expand Down
Loading