Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into yarn-settings
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Or committed Dec 10, 2014
2 parents dcd1316 + 2b9b726 commit 36e0753
Show file tree
Hide file tree
Showing 151 changed files with 3,697 additions and 1,920 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*.ipr
*.iml
*.iws
*.pyc
.idea/
.idea_modules/
sbt/*.jar
Expand Down Expand Up @@ -49,6 +50,8 @@ dependency-reduced-pom.xml
checkpoint
derby.log
dist/
dev/create-release/*txt
dev/create-release/*new
spark-*-bin-*.tgz
unit-tests.log
/lib/
Expand Down
3 changes: 2 additions & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,8 @@ THE SOFTWARE.

========================================================================
For Scala Interpreter classes (all .scala files in repl/src/main/scala
except for Main.Scala, SparkHelper.scala and ExecutorClassLoader.scala):
except for Main.Scala, SparkHelper.scala and ExecutorClassLoader.scala),
and for SerializableMapWrapper in JavaUtils.scala:
========================================================================

Copyright (c) 2002-2013 EPFL
Expand Down
10 changes: 0 additions & 10 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -169,16 +169,6 @@
</build>

<profiles>
<profile>
<id>yarn-alpha</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn-alpha_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>yarn</id>
<dependencies>
Expand Down
21 changes: 21 additions & 0 deletions bin/beeline.cmd
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
@echo off

rem
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements. See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License. You may obtain a copy of the License at
rem
rem http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.
rem

set SPARK_HOME=%~dp0..
cmd /V /E /C %SPARK_HOME%\bin\spark-class.cmd org.apache.hive.beeline.BeeLine %*
4 changes: 3 additions & 1 deletion core/src/main/java/org/apache/spark/SparkJobInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@

package org.apache.spark;

import java.io.Serializable;

/**
* Exposes information about Spark Jobs.
*
* This interface is not designed to be implemented outside of Spark. We may add additional methods
* which may break binary compatibility with outside implementations.
*/
public interface SparkJobInfo {
public interface SparkJobInfo extends Serializable {
int jobId();
int[] stageIds();
JobExecutionStatus status();
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/java/org/apache/spark/SparkStageInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@

package org.apache.spark;

import java.io.Serializable;

/**
* Exposes information about Spark Stages.
*
* This interface is not designed to be implemented outside of Spark. We may add additional methods
* which may break binary compatibility with outside implementations.
*/
public interface SparkStageInfo {
public interface SparkStageInfo extends Serializable {
int stageId();
int currentAttemptId();
long submissionTime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,6 @@ span.additional-metric-title {

/* Hide all additional metrics by default. This is done here rather than using JavaScript to
* avoid slow page loads for stage pages with large numbers (e.g., thousands) of tasks. */
.scheduler_delay, .gc_time, .deserialization_time, .serialization_time, .getting_result_time {
.scheduler_delay, .deserialization_time, .serialization_time, .getting_result_time {
display: none;
}
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1758,7 +1758,7 @@ object SparkContext extends Logging {

@deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " +
"backward compatibility.", "1.3.0")
def writableWritableConverter[T <: Writable]() =
def writableWritableConverter[T <: Writable](): WritableConverter[T] =
WritableConverter.writableWritableConverter()

/**
Expand Down Expand Up @@ -2017,15 +2017,15 @@ object WritableConverter {
simpleWritableConverter[Boolean, BooleanWritable](_.get)

implicit def bytesWritableConverter(): WritableConverter[Array[Byte]] = {
simpleWritableConverter[Array[Byte], BytesWritable](bw =>
simpleWritableConverter[Array[Byte], BytesWritable] { bw =>
// getBytes method returns array which is longer then data to be returned
Arrays.copyOfRange(bw.getBytes, 0, bw.getLength)
)
}
}

implicit def stringWritableConverter(): WritableConverter[String] =
simpleWritableConverter[String, Text](_.toString)

implicit def writableWritableConverter[T <: Writable]() =
implicit def writableWritableConverter[T <: Writable](): WritableConverter[T] =
new WritableConverter[T](_.runtimeClass.asInstanceOf[Class[T]], _.asInstanceOf[T])
}
17 changes: 10 additions & 7 deletions core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
* mapping to that key.
*/
def groupBy[K](f: JFunction[T, K]): JavaPairRDD[K, JIterable[T]] = {
implicit val ctagK: ClassTag[K] = fakeClassTag
def groupBy[U](f: JFunction[T, U]): JavaPairRDD[U, JIterable[T]] = {
// The type parameter is U instead of K in order to work around a compiler bug; see SPARK-4459
implicit val ctagK: ClassTag[U] = fakeClassTag
implicit val ctagV: ClassTag[JList[T]] = fakeClassTag
JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f)(fakeClassTag)))
}
Expand All @@ -221,10 +222,11 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
* mapping to that key.
*/
def groupBy[K](f: JFunction[T, K], numPartitions: Int): JavaPairRDD[K, JIterable[T]] = {
implicit val ctagK: ClassTag[K] = fakeClassTag
def groupBy[U](f: JFunction[T, U], numPartitions: Int): JavaPairRDD[U, JIterable[T]] = {
// The type parameter is U instead of K in order to work around a compiler bug; see SPARK-4459
implicit val ctagK: ClassTag[U] = fakeClassTag
implicit val ctagV: ClassTag[JList[T]] = fakeClassTag
JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(fakeClassTag[K])))
JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(fakeClassTag[U])))
}

/**
Expand Down Expand Up @@ -458,8 +460,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Creates tuples of the elements in this RDD by applying `f`.
*/
def keyBy[K](f: JFunction[T, K]): JavaPairRDD[K, T] = {
implicit val ctag: ClassTag[K] = fakeClassTag
def keyBy[U](f: JFunction[T, U]): JavaPairRDD[U, T] = {
// The type parameter is U instead of K in order to work around a compiler bug; see SPARK-4459
implicit val ctag: ClassTag[U] = fakeClassTag
JavaPairRDD.fromRDD(rdd.keyBy(f))
}

Expand Down
62 changes: 60 additions & 2 deletions core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package org.apache.spark.api.java

import com.google.common.base.Optional

import scala.collection.convert.Wrappers.MapWrapper
import java.{util => ju}
import scala.collection.mutable

private[spark] object JavaUtils {
def optionToOptional[T](option: Option[T]): Optional[T] =
Expand All @@ -32,7 +33,64 @@ private[spark] object JavaUtils {
def mapAsSerializableJavaMap[A, B](underlying: collection.Map[A, B]) =
new SerializableMapWrapper(underlying)

// Implementation is copied from scala.collection.convert.Wrappers.MapWrapper,
// but implements java.io.Serializable. It can't just be subclassed to make it
// Serializable since the MapWrapper class has no no-arg constructor. This class
// doesn't need a no-arg constructor though.
class SerializableMapWrapper[A, B](underlying: collection.Map[A, B])
extends MapWrapper(underlying) with java.io.Serializable
extends ju.AbstractMap[A, B] with java.io.Serializable { self =>

override def size = underlying.size

override def get(key: AnyRef): B = try {
underlying get key.asInstanceOf[A] match {
case None => null.asInstanceOf[B]
case Some(v) => v
}
} catch {
case ex: ClassCastException => null.asInstanceOf[B]
}

override def entrySet: ju.Set[ju.Map.Entry[A, B]] = new ju.AbstractSet[ju.Map.Entry[A, B]] {
def size = self.size

def iterator = new ju.Iterator[ju.Map.Entry[A, B]] {
val ui = underlying.iterator
var prev : Option[A] = None

def hasNext = ui.hasNext

def next() = {
val (k, v) = ui.next
prev = Some(k)
new ju.Map.Entry[A, B] {
import scala.util.hashing.byteswap32
def getKey = k
def getValue = v
def setValue(v1 : B) = self.put(k, v1)
override def hashCode = byteswap32(k.hashCode) + (byteswap32(v.hashCode) << 16)
override def equals(other: Any) = other match {
case e: ju.Map.Entry[_, _] => k == e.getKey && v == e.getValue
case _ => false
}
}
}

def remove() {
prev match {
case Some(k) =>
underlying match {
case mm: mutable.Map[a, _] =>
mm remove k
prev = None
case _ =>
throw new UnsupportedOperationException("remove")
}
case _ =>
throw new IllegalStateException("next must be called at least once before remove")
}
}
}
}
}
}
5 changes: 5 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,11 @@ object SparkSubmit {
sysProps.getOrElseUpdate(k, v)
}

// Ignore invalid spark.driver.host in cluster modes.
if (deployMode == CLUSTER) {
sysProps -= ("spark.driver.host")
}

// Resolve paths in certain spark properties
val pathConfigs = Seq(
"spark.jars",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ private[spark] class AppClient(
val fullId = appId + "/" + id
logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, hostPort,
cores))
master ! ExecutorStateChanged(appId, id, ExecutorState.RUNNING, None, None)
listener.executorAdded(fullId, workerId, hostPort, cores, memory)

case ExecutorUpdated(id, state, message, exitStatus) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,6 @@ private[spark] class ExecutorRunner(
Files.write(header, stderr, UTF_8)
stderrAppender = FileAppender(process.getErrorStream, stderr, conf)

state = ExecutorState.RUNNING
worker ! ExecutorStateChanged(appId, execId, state, None, None)
// Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
// or with nonzero exit code
val exitCode = process.waitFor()
Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ import org.apache.spark.input.StreamFileInputFormat
import org.apache.spark.{ Partition, SparkContext }

private[spark] class BinaryFileRDD[T](
sc: SparkContext,
inputFormatClass: Class[_ <: StreamFileInputFormat[T]],
keyClass: Class[String],
valueClass: Class[T],
@transient conf: Configuration,
minPartitions: Int)
sc: SparkContext,
inputFormatClass: Class[_ <: StreamFileInputFormat[T]],
keyClass: Class[String],
valueClass: Class[T],
@transient conf: Configuration,
minPartitions: Int)
extends NewHadoopRDD[String, T](sc, inputFormatClass, keyClass, valueClass, conf) {

override def getPartitions: Array[Partition] = {
Expand Down
35 changes: 0 additions & 35 deletions core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala

This file was deleted.

34 changes: 0 additions & 34 deletions core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala

This file was deleted.

Loading

0 comments on commit 36e0753

Please sign in to comment.