Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
str-janus committed Dec 10, 2014
2 parents 9156a57 + faa8fd8 commit 277d367
Show file tree
Hide file tree
Showing 97 changed files with 976 additions and 1,345 deletions.
3 changes: 2 additions & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,8 @@ THE SOFTWARE.

========================================================================
For Scala Interpreter classes (all .scala files in repl/src/main/scala
except for Main.Scala, SparkHelper.scala and ExecutorClassLoader.scala):
except for Main.Scala, SparkHelper.scala and ExecutorClassLoader.scala),
and for SerializableMapWrapper in JavaUtils.scala:
========================================================================

Copyright (c) 2002-2013 EPFL
Expand Down
10 changes: 0 additions & 10 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -169,16 +169,6 @@
</build>

<profiles>
<profile>
<id>yarn-alpha</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn-alpha_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>yarn</id>
<dependencies>
Expand Down
7 changes: 7 additions & 0 deletions bin/spark-shell
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ source "$FWDIR"/bin/utils.sh
SUBMIT_USAGE_FUNCTION=usage
gatherSparkSubmitOpts "$@"

# SPARK-4161: scala does not assume use of the java classpath,
# so we need to add the "-Dscala.usejavacp=true" flag mnually. We
# do this specifically for the Spark shell because the scala REPL
# has its own class loader, and any additional classpath specified
# through spark.driver.extraClassPath is not automatically propagated.
SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Dscala.usejavacp=true"

function main() {
if $cygwin; then
# Workaround for issue involving JLine and Cygwin
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/java/org/apache/spark/SparkJobInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@

package org.apache.spark;

import java.io.Serializable;

/**
* Exposes information about Spark Jobs.
*
* This interface is not designed to be implemented outside of Spark. We may add additional methods
* which may break binary compatibility with outside implementations.
*/
public interface SparkJobInfo {
public interface SparkJobInfo extends Serializable {
int jobId();
int[] stageIds();
JobExecutionStatus status();
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/java/org/apache/spark/SparkStageInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@

package org.apache.spark;

import java.io.Serializable;

/**
* Exposes information about Spark Stages.
*
* This interface is not designed to be implemented outside of Spark. We may add additional methods
* which may break binary compatibility with outside implementations.
*/
public interface SparkStageInfo {
public interface SparkStageInfo extends Serializable {
int stageId();
int currentAttemptId();
long submissionTime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,6 @@ span.additional-metric-title {

/* Hide all additional metrics by default. This is done here rather than using JavaScript to
* avoid slow page loads for stage pages with large numbers (e.g., thousands) of tasks. */
.scheduler_delay, .gc_time, .deserialization_time, .serialization_time, .getting_result_time {
.scheduler_delay, .deserialization_time, .serialization_time, .getting_result_time {
display: none;
}
14 changes: 8 additions & 6 deletions core/src/main/scala/org/apache/spark/Accumulators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark

import java.io.{ObjectInputStream, Serializable}
import java.util.concurrent.atomic.AtomicLong
import java.lang.ThreadLocal

import scala.collection.generic.Growable
import scala.collection.mutable.Map
Expand Down Expand Up @@ -278,10 +279,12 @@ object AccumulatorParam {

// TODO: The multi-thread support in accumulators is kind of lame; check
// if there's a more intuitive way of doing it right
private object Accumulators {
private[spark] object Accumulators {
// TODO: Use soft references? => need to make readObject work properly then
val originals = Map[Long, Accumulable[_, _]]()
val localAccums = Map[Thread, Map[Long, Accumulable[_, _]]]()
val localAccums = new ThreadLocal[Map[Long, Accumulable[_, _]]]() {
override protected def initialValue() = Map[Long, Accumulable[_, _]]()
}
var lastId: Long = 0

def newId(): Long = synchronized {
Expand All @@ -293,22 +296,21 @@ private object Accumulators {
if (original) {
originals(a.id) = a
} else {
val accums = localAccums.getOrElseUpdate(Thread.currentThread, Map())
accums(a.id) = a
localAccums.get()(a.id) = a
}
}

// Clear the local (non-original) accumulators for the current thread
def clear() {
synchronized {
localAccums.remove(Thread.currentThread)
localAccums.get.clear
}
}

// Get the values of the local accumulators for the current thread (by ID)
def values: Map[Long, Any] = synchronized {
val ret = Map[Long, Any]()
for ((id, accum) <- localAccums.getOrElse(Thread.currentThread, Map())) {
for ((id, accum) <- localAccums.get) {
ret(id) = accum.localValue
}
return ret
Expand Down
18 changes: 13 additions & 5 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -357,8 +357,12 @@ class SparkContext(config: SparkConf) extends Logging {
}

// Optionally scale number of executors dynamically based on workload. Exposed for testing.
private val dynamicAllocationEnabled = conf.getBoolean("spark.dynamicAllocation.enabled", false)
private val dynamicAllocationTesting = conf.getBoolean("spark.dynamicAllocation.testing", false)
private[spark] val executorAllocationManager: Option[ExecutorAllocationManager] =
if (conf.getBoolean("spark.dynamicAllocation.enabled", false)) {
if (dynamicAllocationEnabled) {
assert(master.contains("yarn") || dynamicAllocationTesting,
"Dynamic allocation of executors is currently only supported in YARN mode")
Some(new ExecutorAllocationManager(this))
} else {
None
Expand Down Expand Up @@ -989,6 +993,8 @@ class SparkContext(config: SparkConf) extends Logging {
*/
@DeveloperApi
def requestExecutors(numAdditionalExecutors: Int): Boolean = {
assert(master.contains("yarn") || dynamicAllocationTesting,
"Requesting executors is currently only supported in YARN mode")
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.requestExecutors(numAdditionalExecutors)
Expand All @@ -1005,6 +1011,8 @@ class SparkContext(config: SparkConf) extends Logging {
*/
@DeveloperApi
def killExecutors(executorIds: Seq[String]): Boolean = {
assert(master.contains("yarn") || dynamicAllocationTesting,
"Killing executors is currently only supported in YARN mode")
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.killExecutors(executorIds)
Expand Down Expand Up @@ -1758,7 +1766,7 @@ object SparkContext extends Logging {

@deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " +
"backward compatibility.", "1.3.0")
def writableWritableConverter[T <: Writable]() =
def writableWritableConverter[T <: Writable](): WritableConverter[T] =
WritableConverter.writableWritableConverter()

/**
Expand Down Expand Up @@ -2017,15 +2025,15 @@ object WritableConverter {
simpleWritableConverter[Boolean, BooleanWritable](_.get)

implicit def bytesWritableConverter(): WritableConverter[Array[Byte]] = {
simpleWritableConverter[Array[Byte], BytesWritable](bw =>
simpleWritableConverter[Array[Byte], BytesWritable] { bw =>
// getBytes method returns array which is longer then data to be returned
Arrays.copyOfRange(bw.getBytes, 0, bw.getLength)
)
}
}

implicit def stringWritableConverter(): WritableConverter[String] =
simpleWritableConverter[String, Text](_.toString)

implicit def writableWritableConverter[T <: Writable]() =
implicit def writableWritableConverter[T <: Writable](): WritableConverter[T] =
new WritableConverter[T](_.runtimeClass.asInstanceOf[Class[T]], _.asInstanceOf[T])
}
17 changes: 10 additions & 7 deletions core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
* mapping to that key.
*/
def groupBy[K](f: JFunction[T, K]): JavaPairRDD[K, JIterable[T]] = {
implicit val ctagK: ClassTag[K] = fakeClassTag
def groupBy[U](f: JFunction[T, U]): JavaPairRDD[U, JIterable[T]] = {
// The type parameter is U instead of K in order to work around a compiler bug; see SPARK-4459
implicit val ctagK: ClassTag[U] = fakeClassTag
implicit val ctagV: ClassTag[JList[T]] = fakeClassTag
JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f)(fakeClassTag)))
}
Expand All @@ -221,10 +222,11 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
* mapping to that key.
*/
def groupBy[K](f: JFunction[T, K], numPartitions: Int): JavaPairRDD[K, JIterable[T]] = {
implicit val ctagK: ClassTag[K] = fakeClassTag
def groupBy[U](f: JFunction[T, U], numPartitions: Int): JavaPairRDD[U, JIterable[T]] = {
// The type parameter is U instead of K in order to work around a compiler bug; see SPARK-4459
implicit val ctagK: ClassTag[U] = fakeClassTag
implicit val ctagV: ClassTag[JList[T]] = fakeClassTag
JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(fakeClassTag[K])))
JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(fakeClassTag[U])))
}

/**
Expand Down Expand Up @@ -458,8 +460,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Creates tuples of the elements in this RDD by applying `f`.
*/
def keyBy[K](f: JFunction[T, K]): JavaPairRDD[K, T] = {
implicit val ctag: ClassTag[K] = fakeClassTag
def keyBy[U](f: JFunction[T, U]): JavaPairRDD[U, T] = {
// The type parameter is U instead of K in order to work around a compiler bug; see SPARK-4459
implicit val ctag: ClassTag[U] = fakeClassTag
JavaPairRDD.fromRDD(rdd.keyBy(f))
}

Expand Down
62 changes: 60 additions & 2 deletions core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package org.apache.spark.api.java

import com.google.common.base.Optional

import scala.collection.convert.Wrappers.MapWrapper
import java.{util => ju}
import scala.collection.mutable

private[spark] object JavaUtils {
def optionToOptional[T](option: Option[T]): Optional[T] =
Expand All @@ -32,7 +33,64 @@ private[spark] object JavaUtils {
def mapAsSerializableJavaMap[A, B](underlying: collection.Map[A, B]) =
new SerializableMapWrapper(underlying)

// Implementation is copied from scala.collection.convert.Wrappers.MapWrapper,
// but implements java.io.Serializable. It can't just be subclassed to make it
// Serializable since the MapWrapper class has no no-arg constructor. This class
// doesn't need a no-arg constructor though.
class SerializableMapWrapper[A, B](underlying: collection.Map[A, B])
extends MapWrapper(underlying) with java.io.Serializable
extends ju.AbstractMap[A, B] with java.io.Serializable { self =>

override def size = underlying.size

override def get(key: AnyRef): B = try {
underlying get key.asInstanceOf[A] match {
case None => null.asInstanceOf[B]
case Some(v) => v
}
} catch {
case ex: ClassCastException => null.asInstanceOf[B]
}

override def entrySet: ju.Set[ju.Map.Entry[A, B]] = new ju.AbstractSet[ju.Map.Entry[A, B]] {
def size = self.size

def iterator = new ju.Iterator[ju.Map.Entry[A, B]] {
val ui = underlying.iterator
var prev : Option[A] = None

def hasNext = ui.hasNext

def next() = {
val (k, v) = ui.next
prev = Some(k)
new ju.Map.Entry[A, B] {
import scala.util.hashing.byteswap32
def getKey = k
def getValue = v
def setValue(v1 : B) = self.put(k, v1)
override def hashCode = byteswap32(k.hashCode) + (byteswap32(v.hashCode) << 16)
override def equals(other: Any) = other match {
case e: ju.Map.Entry[_, _] => k == e.getKey && v == e.getValue
case _ => false
}
}
}

def remove() {
prev match {
case Some(k) =>
underlying match {
case mm: mutable.Map[a, _] =>
mm remove k
prev = None
case _ =>
throw new UnsupportedOperationException("remove")
}
case _ =>
throw new IllegalStateException("next must be called at least once before remove")
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.ui.{WebUIPage, UIUtils}
private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {

private val pageSize = 20
private val plusOrMinus = 2

def render(request: HttpServletRequest): Seq[Node] = {
val requestedPage = Option(request.getParameter("page")).getOrElse("1").toInt
Expand All @@ -39,6 +40,9 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
val last = Math.min(actualFirst + pageSize, allApps.size) - 1
val pageCount = allApps.size / pageSize + (if (allApps.size % pageSize > 0) 1 else 0)

val secondPageFromLeft = 2
val secondPageFromRight = pageCount - 1

val appTable = UIUtils.listingTable(appHeader, appRow, apps)
val providerConfig = parent.getProviderConfig()
val content =
Expand All @@ -48,13 +52,38 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
{providerConfig.map { case (k, v) => <li><strong>{k}:</strong> {v}</li> }}
</ul>
{
// This displays the indices of pages that are within `plusOrMinus` pages of
// the current page. Regardless of where the current page is, this also links
// to the first and last page. If the current page +/- `plusOrMinus` is greater
// than the 2nd page from the first page or less than the 2nd page from the last
// page, `...` will be displayed.
if (allApps.size > 0) {
val leftSideIndices =
rangeIndices(actualPage - plusOrMinus until actualPage, 1 < _)
val rightSideIndices =
rangeIndices(actualPage + 1 to actualPage + plusOrMinus, _ < pageCount)

<h4>
Showing {actualFirst + 1}-{last + 1} of {allApps.size}
<span style="float: right">
{if (actualPage > 1) <a href={"/?page=" + (actualPage - 1)}>&lt;</a>}
{if (actualPage < pageCount) <a href={"/?page=" + (actualPage + 1)}>&gt;</a>}
</span>
<span style="float: right">
{
if (actualPage > 1) {
<a href={"/?page=" + (actualPage - 1)}>&lt; </a>
<a href={"/?page=1"}>1</a>
}
}
{if (actualPage - plusOrMinus > secondPageFromLeft) " ... "}
{leftSideIndices}
{actualPage}
{rightSideIndices}
{if (actualPage + plusOrMinus < secondPageFromRight) " ... "}
{
if (actualPage < pageCount) {
<a href={"/?page=" + pageCount}>{pageCount}</a>
<a href={"/?page=" + (actualPage + 1)}> &gt;</a>
}
}
</span>
</h4> ++
appTable
} else {
Expand All @@ -81,6 +110,10 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
"Spark User",
"Last Updated")

private def rangeIndices(range: Seq[Int], condition: Int => Boolean): Seq[Node] = {
range.filter(condition).map(nextPage => <a href={"/?page=" + nextPage}> {nextPage} </a>)
}

private def appRow(info: ApplicationHistoryInfo): Seq[Node] = {
val uiAddress = HistoryServer.UI_PATH_PREFIX + s"/${info.id}"
val startTime = UIUtils.formatDate(info.startTime)
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,6 @@ private[spark] class Executor(
val startGCTime = gcTime

try {
Accumulators.clear()
val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
updateDependencies(taskFiles, taskJars)
task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
Expand Down Expand Up @@ -278,6 +277,8 @@ private[spark] class Executor(
env.shuffleMemoryManager.releaseMemoryForThisThread()
// Release memory used by this thread for unrolling blocks
env.blockManager.memoryStore.releaseUnrollMemoryForThisThread()
// Release memory used by this thread for accumulators
Accumulators.clear()
runningTasks.remove(taskId)
}
}
Expand Down
Loading

0 comments on commit 277d367

Please sign in to comment.